DEV Community

Cover image for ThreatWire: A Python Library for Real-Time Network Threat Detection.
Abhishake Reddy Onteddu
Abhishake Reddy Onteddu

Posted on

ThreatWire: A Python Library for Real-Time Network Threat Detection.

The Problem Every Security Engineer Knows
Building a network-level threat detector in Python means stitching together scapy for capture, writing custom protocol parsers, and building a bespoke signature engine — from scratch — every single time. Every team reinvents the same infrastructure, project after project.
threatwire changes that.
It's a streaming packet analysis pipeline that takes you from raw network packets to structured, MITRE ATT&CK-tagged threat alerts in a single, composable library.

What Is ThreatWire?
threatwire is an open-source Python library for real-time network packet inspection and threat signature matching — purpose-built for IDS/IPS pipelines.
It gives you three composable building blocks:

PacketStreamer — live capture or PCAP ingestion with BPF filter support and TCP stream reassembly
SignatureEngine — 1,200+ built-in threat rules using Aho-Corasick multi-pattern matching, with Suricata rule import support
ThreatEventBus — pub/sub alert routing with deduplication, async handler support, and Elastic Common Schema (ECS) output

Getting Started in Minutes
Install the core library:
bashpip install threatwire

With live capture support

pip install threatwire[capture]

With fast Aho-Corasick pattern matching

pip install threatwire[fast]

Everything

pip install threatwire[all]
A minimal live capture pipeline looks like this:
pythonfrom threatwire import ThreatPipeline

pipeline = ThreatPipeline(
interface="eth0",
bpf_filter="tcp or udp",
enable_builtin_rules=True,
)

@pipeline.on_alert(severity="high")
def handle_threat(alert):
print(f"[{alert.severity.value.upper()}] {alert.rule_name}")
print(f" {alert.src_ip} → {alert.dst_ip}")
print(f" Technique: {alert.technique_id}")
print(f" Confidence: {alert.confidence:.0%}")

pipeline.run()
That's it. You're capturing, analyzing, and alerting on live traffic in under 15 lines of Python.

The Three Core Modules

  1. PacketStreamer — Smarter Than Per-Packet Analysis Most detectors look at packets in isolation. ThreatWire's PacketStreamer reconstructs full TCP state machines via StreamReassembler, catching attacks that abuse low-rate patterns to evade threshold-based detectors. Real-world scenario: A slow SYN scan at 1 packet/second combined with DNS C2 tunneling. A naive per-packet detector sees nothing. ThreatWire flags the SYN-without-ACK pattern across the reconstructed stream. pythonstreamer = PacketStreamer( interface="eth0", bpf_filter="tcp or udp", reconstruct_streams=True, flow_timeout=120.0, )

for packet in streamer.stream():
if streamer.is_slow_syn_scan(packet.src_ip):
print(f"Slow SYN scan from {packet.src_ip}")

  1. SignatureEngine — 1,200+ Rules, Zero Boilerplate The SignatureEngine matches packet payloads and flow metadata against a curated ruleset using Aho-Corasick multi-pattern matching. It also imports Suricata rules, so your existing rule investments aren't lost. Built-in rules cover:

DNS C2 tunneling (dnscat2, iodine, dns2tcp)
HTTP C2 beaconing (Emotet, Cobalt Strike, Meterpreter)
SMB exploits (EternalBlue, brute force)
Credential theft (DCSync, Kerberoasting)
Ransomware IOCs, exploit kit patterns, TLS anomalies

Real-world scenario: Emotet beaconing via HTTP POST with randomized User-Agent strings, but a predictable 300-second interval and fixed URI structure. ThreatWire matches on both simultaneously — something a pure payload or pure frequency detector misses independently.
pythonengine = SignatureEngine(
enable_builtin=True,
rule_path="/etc/threatwire/rules",
suricata_rules="/etc/suricata/emerging.rules",
min_severity=AlertSeverity.MEDIUM,
)

alert = engine.match(packet)
if alert:
print(alert.severity, alert.technique_id, alert.confidence)

  1. ThreatEventBus — Route Alerts Without Drowning in Noise A DDoS amplification attack can generate 50,000 UDP alerts per second. Without smart deduplication, critical lateral movement alerts get buried in volumetric noise. The ThreatEventBus collapses repetitive alerts into rolling summaries while ensuring critical alerts route immediately. pythonbus = ThreatEventBus( dedup_window=30.0, volume_threshold=100, max_queue_size=10_000, )

@bus.subscribe(severity="critical")
async def on_critical(alert):
await pagerduty.trigger(alert.rule_name, alert.src_ip)

@bus.subscribe(severity="medium", rule_ids=["TW-SMB-001"])
def on_eternalblue(alert):
isolate_host(alert.src_ip)

Custom Rules — Python or JSON
You don't have to rely solely on built-in rules. Define custom detection logic as Python dataclasses:
pythonfrom threatwire.core.signature_engine import Rule
from threatwire.core.models import AlertSeverity

rule = Rule(
rule_id="ORG-001",
name="Plaintext password POST",
severity=AlertSeverity.CRITICAL,
technique_id="T1552",
tactic_id="TA0006",
tactic_name="Credential Access",
payload_patterns=[b"password=", b"passwd="],
regex_patterns=[r"password=[^&\s]{6,}"],
protocols=["tcp", "http"],
dst_ports=[80, 8080],
base_confidence=0.9,
)
engine.add_rule(rule)
Or drop JSON files into your rule_path directory for team-shared rule sets.

SIEM-Ready Alert Output
Every ThreatAlert serializes to Elastic Common Schema (ECS 8.x), so ingestion into Elasticsearch, Splunk, or any ECS-compatible SIEM is zero-friction:
json{
"@timestamp": 1714000000.0,
"event": { "kind": "alert", "severity": 5, "risk_score": 95 },
"rule": { "id": "TW-DNS-002", "name": "DNS tunneling — dnscat2 signature" },
"threat": {
"technique": { "id": "T1071.004" },
"tactic": { "id": "TA0011", "name": "Command and Control" },
"framework": "MITRE ATT&CK"
},
"source": { "ip": "192.168.1.100", "port": 54321 },
"destination": { "ip": "185.10.10.1", "port": 53 }
}
Ready-made handlers ship with the library for Slack, Elasticsearch, JSONL file logging, and Python's standard logging module.

MITRE ATT&CK Coverage
TacticIDTechniques CoveredReconnaissanceTA0043T1046 (Network Scan)Initial AccessTA0001T1189 (Exploit Kit)ExecutionTA0002T1059 (Scripting)Credential AccessTA0006T1003.006 (DCSync), T1110 (Brute Force), T1558.003 (Kerberoasting)Lateral MovementTA0008T1210 (EternalBlue)Command & ControlTA0011T1071.001 (HTTP), T1071.004 (DNS), T1090.003 (Tor)ImpactTA0040T1486 (Ransomware)

Who Is This For?
threatwire is for security engineers and developers who:

Build custom IDS/IPS pipelines and are tired of writing the same packet capture scaffolding
Want MITRE ATT&CK–mapped alerts out of the box, without building a correlation engine
Need to analyze PCAP files programmatically (incident response, forensics, red team validation)
Are integrating network telemetry into a SIEM and need structured ECS output without middleware

Get Started
bashpip install threatwire[all]

GitHub: github.com/ontedduabhishakereddy/threatwire
License: MIT

If you're building detection pipelines in Python, threatwire eliminates the boilerplate so you can focus on what actually matters — the detections.

Tags: python security networking ids ips threat-detection mitre-attack open-source cybersecurity infosec

Top comments (0)