DEV Community

Cover image for Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python
Alireza Razinejad
Alireza Razinejad

Posted on

Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python

As AI architectures transition from static data processing to real-time agentic monitoring, the data engineering pipelines feeding them must adapt. If you are building an AI-driven anomaly detection model, a security agent, or an automated network observability tool, you can't rely on historical batch logs. You need live, structural telemetry straight from the network edge.

When monitoring network traffic at scale, two industry standards dominate: NetFlow (stateful conversation aggregation) and sFlow (stateless packet-level sampling).

In this post, we will build a production-grade, asynchronous network ingestion engine using Python and Scapy that captures live wireless traffic, extracts 5-tuple flow metrics, and streams structured JSON events ready to be swallowed by a message broker (like Kafka) or an AI vector database.


The Architectural Challenge: Avoid the Bottleneck

The biggest pitfall when writing a packet sniffer in Python is blocking the execution loop. If your script captures a packet, processes it, formats a JSON string, and logs it sequentially, your script will choke and drop thousands of packets during a sudden burst of network traffic.

To feed an AI infrastructure safely, we must decouple the Capture Phase from the Processing Phase using a Producer-Consumer multi-threaded pattern.

[ Wireless Adapter ] 
       │
       ▼ (Raw Packets)
┌────────────────────────────────────────────────────────┐
│ PHASE 1: Capture (Scapy AsyncSniffer Thread)          │
│ - Sniffs raw frames in a non-blocking background loop   │
└────────────────────────────────────────────────────────┘
       │
       ▼ (Fast Handoff: .put_nowait)
┌────────────────────────────────────────────────────────┐
│ PHASE 2: Buffer (Thread-Safe bounded Queue)           │
│ - Acts as a shock absorber for network traffic bursts  │
└────────────────────────────────────────────────────────┘
       │
       ▼ (Distributed to Worker Pool)
┌────────────────────────────────────────────────────────┐
│ PHASE 3: Process & Emit (Concurrent Thread Workers)    │
│ - Worker 1  │  Worker 2  │  Worker 3  │  Worker 4      │
│ - Decodes Layers (IP, TCP, UDP)                        │
│ - Updates Stateful NetFlow Cache (Aggregates)          │
│ - Outputs Structured JSON Streaming Event              │
└────────────────────────────────────────────────────────┘

Enter fullscreen mode Exit fullscreen mode

Production-Grade Pipeline Implementation

Below is the complete, modern Python implementation utilizing Scapy's explicit layer APIs, asynchronous execution, and thread-safe buffering.

Prerequisites

Ensure you have the native packet capture library installed on your host system (libpcap for Linux/macOS, Npcap for Windows) and install Scapy:

pip install scapy

Enter fullscreen mode Exit fullscreen mode

The Ingestion Code

import os
import sys
import queue
import time
import json
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Tuple, Optional

# Explicitly target specific submodules for deterministic enterprise scopes
from scapy.all import AsyncSniffer, Packet, conf
from scapy.layers.inet import IP, TCP, UDP

# --- CONFIGURATION ---
MAX_QUEUE_SIZE = 10000
NUM_WORKERS = 4
# Replace with your specific active network interface index or string name
INTERFACE: Optional[str | int] = 24  


# --- DATA MODELS ---
@dataclass(frozen=True)
class FlowKey:
    src_ip: str
    dst_ip: str
    src_port: int
    dst_port: int
    protocol: int


@dataclass
class FlowMetrics:
    packet_count: int = 0
    byte_count: int = 0
    first_seen: float = 0.0
    last_seen: float = 0.0


# --- INGESTION PIPELINE ENGINE ---
class AIStreamIngestionPipeline:
    def __init__(self, interface: Optional[str | int] = None):
        self.interface = interface
        self.packet_queue: queue.Queue[Packet] = queue.Queue(maxsize=MAX_QUEUE_SIZE)
        self.flow_cache: Dict[FlowKey, FlowMetrics] = {}
        self.executor = ThreadPoolExecutor(max_workers=NUM_WORKERS)
        self.is_running = False
        self.sniffer: Optional[AsyncSniffer] = None

    def _extract_packet_meta(self, packet: Packet) -> Optional[Tuple[FlowKey, int]]:
        """Parses raw layers using container-optimized syntax."""
        if IP not in packet:
            return None

        ip_layer = packet[IP]
        src_port, dst_port = 0, 0

        # Safe inspection of Layer 4 using direct class indexing
        if TCP in packet:
            src_port = packet[TCP].sport
            dst_port = packet[TCP].dport
        elif UDP in packet:
            src_port = packet[UDP].sport
            dst_port = packet[UDP].dport

        key = FlowKey(
            src_ip=ip_layer.src,
            dst_ip=ip_layer.dst,
            src_port=src_port,
            dst_port=dst_port,
            protocol=ip_layer.proto
        )
        return key, len(packet)

    def _worker_process_queue(self) -> None:
        """Isolated consumer thread pool extracting flow features."""
        while self.is_running or not self.packet_queue.empty():
            try:
                packet = self.packet_queue.get(timeout=0.5)
            except queue.Empty:
                continue

            meta = self._extract_packet_meta(packet)
            if not meta:
                self.packet_queue.task_done()
                continue

            key, packet_len = meta
            current_time = time.time()

            # 1. Stateful Aggregation (NetFlow Telemetry Layer)
            if key not in self.flow_cache:
                self.flow_cache[key] = FlowMetrics(first_seen=current_time)

            metrics = self.flow_cache[key]
            metrics.packet_count += 1
            metrics.byte_count += packet_len
            metrics.last_seen = current_time

            # 2. Structured Streaming Payload (sFlow Event Layer)
            ingest_payload = {
                "event_type": "network_flow_sample",
                "timestamp": current_time,
                "flow_key": asdict(key),
                "packet_size_bytes": packet_len,
                "aggregate_metrics": asdict(metrics)
            }

            # Emit clean JSON to stdout (pipe into a stream wrapper, Kafka, or vector db)
            print(json.dumps(ingest_payload))
            self.packet_queue.task_done()

    def _enqueue_packet(self, packet: Packet) -> None:
        """High-speed producer callback. Hands off raw socket frames immediately."""
        try:
            self.packet_queue.put_nowait(packet)
        except queue.Full:
            # Backpressure handling: Drops edge frames gracefully if queue overflows
            pass

    def start(self) -> None:
        """Spawns workers and executes the non-blocking packet sniffer thread."""
        print(f"[*] Initializing pipeline on interface: {self.interface or 'Default Active OS Interface'}",
              file=sys.stderr)
        self.is_running = True

        for _ in range(NUM_WORKERS):
            self.executor.submit(self._worker_process_queue)

        self.sniffer = AsyncSniffer(
            iface=self.interface,
            prn=self._enqueue_packet,
            store=False  # Crucial: Drops raw historical packets out of memory instantly
        )
        self.sniffer.start()
        print("[*] Ingestion pipeline is live and streaming...", file=sys.stderr)

    def stop(self) -> None:
        """Gracefully signs off threads and closes open sockets."""
        print("\n[*] Shutting down ingestion pipeline gracefully...", file=sys.stderr)
        if self.sniffer:
            self.sniffer.stop()

        self.is_running = False
        self.executor.shutdown(wait=True)
        print("[*] Pipeline stopped cleanly.", file=sys.stderr)


if __name__ == "__main__":
    # Administrative privilege guard for raw socket access
    if os.name != 'nt' and os.getuid() != 0:
        print("[-] Critical Error: Root privileges required for raw network socket access.", file=sys.stderr)
        sys.exit(1)

    # Windows Device GUID Resolution block
    target_interface = INTERFACE
    if os.name == 'nt' and isinstance(INTERFACE, int):
        try:
            target_interface = conf.ifaces.dev_from_index(INTERFACE)
            print(f"[*] Resolved index {INTERFACE} to device: {target_interface.description}", file=sys.stderr)
        except Exception as e:
            print(f"[-] Warning: Could not resolve interface index {INTERFACE}: {e}. Falling back.", file=sys.stderr)

    pipeline = AIStreamIngestionPipeline(interface=target_interface)
    try:
        pipeline.start()
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pipeline.stop()

Enter fullscreen mode Exit fullscreen mode

Under the Hood: Why This Architecture Works for AI

1. Modern Container Layer Membership

Instead of executing high-overhead inspection functions like packet.haslayer(IP), this script checks containment explicitly using IP in packet. Scapy overloads Python’s internal __contains__ magic methods, ensuring layer validation occurs at the compiler layer.

2. Zero-Memory Aggregation (store=False)

By default, Scapy stores every single packet captured in an internal history list inside RAM. Running a default sniffer on an enterprise pipeline for an hour will result in a quick memory leak crash. By assigning store=False in our AsyncSniffer, we consume the packet from the adapter, pass it to our local queue, and free its memory instantly.

3. Backpressure Protection via Bounded Queues

Our ingestion loop forces a strict maxsize=10000 rule and uses a non-blocking put_nowait() execution. If the downstream data processor (e.g., your LLM evaluator or vector engine) experiences a latency hiccup, the pipeline intentionally drops incoming packets at the edge rather than letting the memory buffer balloon out of control.

4. Machine-Readable Feature Matrix Output

The pipeline aggregates connection metadata into a stateful 5-tuple format, while instantly streaming stateless structural JSON tokens. This gives your downstream AI immediate access to critical parameters:

{
  "event_type": "network_flow_sample", 
  "timestamp": 1719409375.42, 
  "flow_key": {
    "src_ip": "10.17.245.72", 
    "dst_ip": "142.250.180.142", 
    "src_port": 54211, 
    "dst_port": 443, 
    "protocol": 6
  }, 
  "packet_size_bytes": 1420, 
  "aggregate_metrics": {
    "packet_count": 42, 
    "byte_count": 59640, 
    "first_seen": 1719409371.11, 
    "last_seen": 1719409375.42
  }
}

Enter fullscreen mode Exit fullscreen mode

This structural metadata provides your AI system with a comprehensive macro-view map of network behavior, allowing agents to accurately cross-reference anomalies without drowning your entire feature store in costly, raw packet captures.

Top comments (0)