DEV Community

Rizwan Saleem
Rizwan Saleem

Posted on

Building a resilient data ingestion pipeline with streaming backpressure in Python

Building a resilient data ingestion pipeline with streaming backpressure in Python

Building a resilient data ingestion pipeline with streaming backpressure in Python

In modern data-driven applications, ingesting data from multiple sources while keeping throughput high and latency low is a common challenge. A robust ingestion pipeline must gracefully handle backpressure, retries, and out-of-order data without losing events. This tutorial walks you through designing, implementing, and operating a streaming data ingestion pipeline in Python that respects backpressure, uses asynchronous I/O, and provides observability hooks for operators.

What you’ll learn

  • How to architect a simple yet robust streaming ingestion pipeline with backpressure handling.
  • How to implement producers, a bounded queue, and consumers that process data with backpressure awareness.
  • How to apply idempotent writes and at-least-once delivery guarantees where appropriate.
  • How to add observability: metrics, logging, and structured tracing.
  • How to test the pipeline with synthetic workloads and failure scenarios.

Overview of the system

  • Data sources ( producers ): lightweight producers generating events in real time or from batch sources.
  • Ingestion layer ( broker / queue ): a bounded, asynchronous queue that provides backpressure by blocking producers when full.
  • Processing layer ( consumers ): workers that validate, transform, and route events to the sink.
  • Sinks: destinations like a database, data lake, or message bus.
  • Observability: metrics (throughput, latency, backpressure events), logs, and traces.

Prerequisites

  • Python 3.9+ (asyncio-friendly)
  • Basic familiarity with asyncio, queues, and exception handling
  • A local terminal to run the example
  • Optional: docker if you want to run a mock sink in a separate container

Code structure

  • pipeline.py: main pipeline components (Producer, Ingestor, Consumer, Sink)
  • sink_mock.py: a simple sink to simulate write latency and occasional failures
  • tests/test_pipeline.py: simple tests to exercise backpressure and retry logic
  • requirements.txt: dependencies (only standard library plus optional aiotrace if you want tracing)

Section 1: Design decisions

Backpressure

  • Use a bounded asyncio.Queue to cap in-flight events. Producers await upon enqueue when the queue is full, naturally pausing production and preventing unbounded memory growth.
  • Consumers pull from the queue and process items at their own pace. If processing lags, queue fills up and producers slow down.

Idempotency and delivery guarantees

  • At-least-once processing is acceptable for many ingestion pipelines if the sink can deduplicate. We’ll demonstrate idempotent writes on the sink and include an id field per event to aid deduplication.
  • Optional exactly-once requires more complex coordination; this example focuses on reliability and simplicity with at-least-once semantics.

Observability

  • Metrics: events enqueued, events processed, processing latency, queue size.
  • Logging: structured logs with event IDs and timestamps.
  • Tracing: optional; you can hook in with OpenTelemetry if you want.

Section 2: Implementing the pipeline

  • Acquire event structure
  • Implement Producer: generates events with a unique id and payload
  • Implement Ingestor: a bounded queue that enqueues events; blocks when full
  • Implement Consumer: processes events with a simulated transform and writes to sink
  • Implement Sink: mock sink that can succeed or fail to simulate transient errors
  • Implement retries with exponential backoff for transient sink failures

Code: pipeline.py

  • Note: this is a compact, runnable example focusing on backpressure and reliability.

from future import annotations

import asyncio
import random
import time
from dataclasses import dataclass
from typing import Any, Optional

@dataclass(frozen=True)
class Event:
id: str
source: str
payload: Any
timestamp: float

class SinkError(Exception):
pass

class Sink:
def init(self, failure_rate: float = 0.1, min_latency: float = 0.05, max_latency: float = 0.2):
self.failure_rate = failure_rate
self.min_latency = min_latency
self.max_latency = max_latency

async def write(self, event: Event) -> None:
    # Simulate I/O latency
    await asyncio.sleep(random.uniform(self.min_latency, self.max_latency))

    # Random transient failure
    if random.random() < self.failure_rate:
        raise SinkError(f"Transient failure writing event {event.id}")

    # For demonstration, just print or log
    # In real life, write to DB, data lake, or messaging system
    # print(f"Sink written: {event.id}")
Enter fullscreen mode Exit fullscreen mode

class Producer:
def init(self, source_name: str, burst: int = 5, interval: float = 0.01):
self.source = source_name
self.burst = burst
self.interval = interval
self._counter = 0

def _new_event(self) -> Event:
    self._counter += 1
    eid = f"{self.source}-{int(time.time()*1000)}-{self._counter}"
    payload = {"value": random.randint(0, 1000)}
    return Event(id=eid, source=self.source, payload=payload, timestamp=time.time())

async def generate(self, out_queue: asyncio.Queue[Event], total: int, *, stop_after: Optional[int] = None) -> None:
    produced = 0
    while produced < total:
        # Produce a small burst
        for _ in range(self.burst):
            if stop_after is not None and produced >= stop_after:
                return
            event = self._new_event()
            await out_queue.put(event)  # backpressure-aware: blocks if queue full
            produced += 1
        await asyncio.sleep(self.interval)
Enter fullscreen mode Exit fullscreen mode

class Ingestor:
def init(self, queue_size: int = 1000):
self.queue: asyncio.Queue[Event] = asyncio.Queue(maxsize=queue_size)

def get_queue(self) -> asyncio.Queue[Event]:
    return self.queue
Enter fullscreen mode Exit fullscreen mode

class Consumer:
def init(self, sink: Sink, max_concurrency: int = 4, max_retries: int = 3, base_backoff: float = 0.1):
self.sink = sink
self.semaphore = asyncio.Semaphore(max_concurrency)
self.max_retries = max_retries
self.base_backoff = base_backoff

async def _process(self, event: Event) -> None:
    # Simulate a transform step
    transformed = self._transform(event)

    # Attempt sink write with retries
    attempt = 0
    backoff = self.base_backoff
    while True:
        attempt += 1
        try:
            await self.sink.write(transformed)
            # success
            return
        except SinkError as e:
            if attempt > self.max_retries:
                # drop or escalate; for this demo, we log and drop
                print(f"[WARN] Dropping event {event.id} after {attempt} attempts: {e}")
                return
            else:
                await asyncio.sleep(backoff)
                backoff *= 2  # exponential backoff

def _transform(self, event: Event) -> Event:
    # Simple example: add a derived field
    new_payload = dict(event.payload)
    new_payload["processed_at"] = time.time()
    new_event = Event(id=event.id, source=event.source, payload=new_payload, timestamp=event.timestamp)
    return new_event

async def run(self, queue: asyncio.Queue[Event]) -> None:
    while True:
        event = await queue.get()
        async with self._limit_concurrency():
            asyncio.create_task(self._handle(event))
        queue.task_done()

async def _handle(self, event: Event) -> None:
    async with self.semaphore:
        await self._process(event)

# helper to use async context manager as a placeholder for per-task tracking
from contextlib import asynccontextmanager
@asynccontextmanager
async def _limit_concurrency(self):
    yield
Enter fullscreen mode Exit fullscreen mode

Section 3: Observability helpers

  • Simple counters and timers
  • Structured logging via print + timestamp (for simplicity in this example)
  • You can replace with a proper logging framework and metrics exporter

Add a small observability layer
class Metrics:
def init(self):
self.enqueued = 0
self.processed = 0
self.latencies: list[float] = []
self.queue_sizes: list[int] = []

def record_enqueue(self):
    self.enqueued += 1

def record_process(self, latency: float):
    self.processed += 1
    self.latencies.append(latency)

def record_queue_size(self, size: int):
    self.queue_sizes.append(size)
Enter fullscreen mode Exit fullscreen mode

async def monitor(queue: asyncio.Queue[Event], metrics: Metrics, interval: float = 1.0) -> None:
while True:
metrics.record_queue_size(queue.qsize())
await asyncio.sleep(interval)

Section 4: Putting it all together

  • Create pipeline components
  • Start producers, ingestion, consumers
  • Run for a while, then shut down gracefully

Code continuation (in same file, end-to-end runner)

async def run_pipeline():
ingestor = Ingestor(queue_size=200)
sink = Sink(failure_rate=0.15, min_latency=0.05, max_latency=0.15)
consumer = Consumer(sink, max_concurrency=4, max_retries=5, base_backoff=0.05)
producer = Producer(source_name="sensor-A", burst=8, interval=0.01)

queue = ingestor.get_queue()
metrics = Metrics()

# Start background monitor
monitor_task = asyncio.create_task(monitor(queue, metrics, interval=0.5))

# Start consumer loop
consumer_task = asyncio.create_task(consumer.run(queue))

# Start producer
total_events = 500
producer_task = asyncio.create_task(producer.generate(queue, total=total_events))

# Wait for producer to finish and queue to drain
await producer_task
await queue.join()  # wait until all items are processed

# Cancel long-running tasks
consumer_task.cancel()
monitor_task.cancel()
try:
    await consumer_task
except asyncio.CancelledError:
    pass
try:
    await monitor_task
except asyncio.CancelledError:
    pass

print(f"Enqueued: {metrics.enqueued}, Processed: {metrics.processed}")
if metrics.latencies:
    avg_latency = sum(metrics.latencies) / len(metrics.latencies)
    print(f"Average processing latency: {avg_latency:.3f}s")
Enter fullscreen mode Exit fullscreen mode

if name == "main":
asyncio.run(run_pipeline())

Notes on the code

  • The bounded queue enforces backpressure automatically. When the queue is full, producers await on put, effectively throttling production.
  • The Consumer uses a semaphore to bound concurrency, preventing too many parallel sink writes.
  • The Sink simulates transient failures; the Consumer retries with exponential backoff up to a limit.
  • All event IDs are preserved across transforms, enabling deduplication at the sink if you implement dedupe logic there.

Section 5: Testing strategies

  • Backpressure correctness

    • Increase producer burst and reduce queue size to observe producer blocking behavior.
    • Verify that queue.fill never exceeds maxsize and that the system stabilizes under high load.
  • Failure handling

    • Increase sink.failure_rate and verify that events are retried and eventually dropped after max_retries.
  • Latency and throughput

    • Collect throughput = processed events / wall time.
    • Track per-event latency from enqueue to successful sink write completion.
  • Idempotency checks

    • If the sink supports deduplication, feed duplicate events and ensure only one write occurs. If not, test that deduplication logic in the sink handles it.

Section 6: Practical considerations and extensions

  • Exactly-once delivery
    • Achieving exactly-once requires idempotent sinks or a transactional boundary. Consider storing a per-event attempt record and using a two-phase commit with the sink.
  • Real-world sinks
    • Replace Sink with actual targets: PostgreSQL, Apache Kafka, or a data warehouse. Use asynchronous clients or libraries that support backpressure (e.g., aiokafka for Kafka, asyncpg for PostgreSQL).
  • Observability stack
    • Swap print statements for a logging library (structlog or standard logging with JSON formatting).
    • Integrate OpenTelemetry for tracing across producers, ingestors, and consumers.
  • Scaling
    • Deploy multiple producer instances, a central queue with partitioning, and multiple consumer workers per partition.
    • Consider a distributed queue (e.g., Redis streams, Kafka topics) to maintain ordering guarantees per partition.

Illustrative example

  • Before you run, ensure you have Python 3.9+ installed.
  • Save the pipeline.py file with the code above.
  • Run: python pipeline.py
  • You should see enqueued and processed counts, plus occasional warnings when the sink transiently fails.

Further reading and resources

  • Understanding backpressure patterns in asynchronous systems
  • Designing idempotent sinks for event streams
  • Practical OpenTelemetry integration guides
  • Asyncio primitives: queues, semaphores, and task management

Would you like me to adapt this into a fully fleshed-out repository with unit tests, docker-compose for a mock sink, and a small dashboard to visualize metrics? If yes, tell me your preferred sink (PostgreSQL, Redis, or Kafka) and whether you want to run the mock sink locally in Docker.

-

Rizwan Saleem | https://rizwansaleem.co

Top comments (0)