DEV Community

Charles Givre
Charles Givre

Posted on • Originally published at gtkcyber.com

Building a Threat Hunting Pipeline with Python and Jupyter

Most threat hunting guides describe the process abstractly: form a hypothesis, search for evidence, iterate. That framing is accurate but stops short of the part that actually takes time: getting data into a shape you can interrogate, writing code that tests a specific hypothesis, and building something repeatable instead of a one-off notebook you can't read six weeks later.

This is what a working threat hunting pipeline looks like in Python and Jupyter.

Setting Up the Data Layer

Jupyter notebooks work well for hunt investigations because they combine code, output, and narrative in a single file. The risk is notebooks becoming unreadable ad-hoc sessions. Use consistent data loading patterns from the start.

Zeek logs include a #fields header. Parse it instead of hardcoding column names:

import pandas as pd
import numpy as np

def load_zeek_log(path):
    with open(path) as f:
        for line in f:
            if line.startswith('#fields'):
                cols = line.strip().split('\t')[1:]
                break
    return pd.read_csv(path, sep='\t', comment='#', names=cols, na_values=['-', '(empty)'])

df_conn = load_zeek_log('conn.log')
df_conn['ts'] = pd.to_datetime(df_conn['ts'], unit='s')

for col in ['orig_bytes', 'resp_bytes', 'duration']:
    df_conn[col] = pd.to_numeric(df_conn[col], errors='coerce')
Enter fullscreen mode Exit fullscreen mode

For Windows Event Log (.evtx), use python-evtx:

import json
from evtx import PyEvtxParser

def load_evtx(path):
    parser = PyEvtxParser(path)
    return pd.json_normalize(
        [json.loads(r['data']) for r in parser.records_json()]
    )

df_security = load_evtx('Security.evtx')
Enter fullscreen mode Exit fullscreen mode

For environments pulling from Sentinel, Splunk, or QRadar, MSTICpy (Microsoft Threat Intelligence Python Security Tools) provides a query interface that works across sources with consistent output DataFrames. The setup cost is real, but it pays off when a hunt hypothesis spans endpoint and network data from different platforms.

Hypothesis: Beaconing Detection

C2 beaconing (MITRE ATT&CK T1071.001) produces regular-interval outbound connections. The statistical signature is low variance in inter-arrival time (IAT) across many connections to the same destination IP.

The coefficient of variation (standard deviation divided by mean) captures this: a CV below 0.25 indicates connection intervals that are more regular than noise. A beacon firing every 60 seconds with minor jitter will cluster tightly. Legitimate traffic to the same host rarely does.

def compute_beacon_score(group):
    if len(group) < 15:
        return None
    group = group.sort_values('ts')
    iats = group['ts'].diff().dt.total_seconds().dropna()
    iat_mean = iats.mean()
    if iat_mean == 0:
        return None
    return pd.Series({
        'count': len(group),
        'iat_mean_s': round(iat_mean, 1),
        'iat_cv': round(iats.std() / iat_mean, 3),
        'total_bytes': group['orig_bytes'].sum()
    })

beacon_candidates = (
    df_conn[df_conn['proto'] == 'tcp']
    .groupby('id.resp_h', group_keys=False)
    .apply(compute_beacon_score)
    .dropna()
    .query('count >= 15 and iat_cv < 0.25')
    .sort_values('iat_cv')
)
Enter fullscreen mode Exit fullscreen mode

The total_bytes column narrows the list. Real C2 beacons tend to be small: keepalives averaging a few hundred bytes. A host showing a CV of 0.10 across 50 connections but totaling 20GB is probably a backup job, not a beacon. A host showing a CV of 0.08 across 200 connections totaling 400KB is worth a follow-up.

One known false positive: NTP, telemetry agents, and heartbeat services produce low-CV behavior by design. Filter known-good destinations by ASN or hostname before presenting results to analysts.

Hypothesis: Lateral Movement via SMB

Lateral movement over SMB (MITRE ATT&CK T1021.002) produces Windows Security Event ID 4624 (successful logon) with LogonType 3 (network logon) from an account hitting multiple distinct destinations. Administrators doing their job will appear here. Regular user accounts and service accounts should not.

# Event ID 4624 = successful logon; LogonType 3 = network
df_4624 = df_security[
    (df_security['Event.System.EventID'] == 4624) &
    (df_security['Event.EventData.LogonType'] == '3')
].copy()

# Aggregate per account over the full observation window
lateral_candidates = (
    df_4624
    .groupby('Event.EventData.SubjectUserName')
    .agg(
        distinct_hosts=('Event.EventData.WorkstationName', 'nunique'),
        source_ips=('Event.EventData.IpAddress', 'nunique'),
        logon_count=('Event.System.EventRecordID', 'count')
    )
    .query('distinct_hosts > 5 and logon_count > 20')
    .sort_values('distinct_hosts', ascending=False)
)
Enter fullscreen mode Exit fullscreen mode

Adjust the distinct_hosts threshold based on your environment's baseline. In a flat network with permissive SMB policies, the threshold may need to be higher. In an environment with strict segmentation, two or three unexpected hosts may be enough to investigate.

Structuring for Reuse

A hunt that runs once and disappears is a missed opportunity. A few patterns that help:

Keep data loading functions in a shared utility module and import them at the top of each notebook. This keeps notebooks focused on hypothesis testing, not boilerplate.

Use a timestamp in the notebook filename: hunt_beaconing_2026-04-27.ipynb. In three months, you want to know when the hunt ran and against which data window.

When a hunt produces findings, export the notebook as an HTML report for sharing:

jupyter nbconvert --to html hunt_beaconing_2026-04-27.ipynb --output-dir=./reports/
Enter fullscreen mode Exit fullscreen mode

For recurring hunts that run against fresh data on a schedule, papermill executes notebooks programmatically with injected parameters. Define the data window as a parameter, and you can run the same hunt notebook daily without opening a browser.

What Jupyter Doesn't Replace

Notebooks are for exploration and documentation. When a hunt hypothesis proves reliable, translate the logic into a production detection. Sigma is the right destination for detection logic that needs to run continuously, that others need to maintain, or that needs to deploy across different SIEM platforms. The notebook is where you prove the hypothesis works; Sigma or your SIEM's detection language is where it runs in production.

GTK Cyber's applied data science training covers building, calibrating, and operationalizing threat hunting pipelines with hands-on labs against realistic network and endpoint datasets, including exercises in the exact feature engineering and hypothesis-testing patterns described here.

Top comments (0)