DEV Community

Muhammad Sufiyan Baig
Muhammad Sufiyan Baig

Posted on

Stop Hand-Tuning ETL Batch Sizes. Use PID Control Instead.


You've done this before. You need to batch-process a large dataset. You pick a chunk size — maybe 1000, maybe 10000 — run a quick test, it looks fine, and you ship it.

Three weeks later, your pipeline is crawling at 15% CPU while you're paying for 8 cores. Or it's randomly OOM-crashing on Tuesday nights when the dataset is slightly wider than usual.

This is the static batch size problem, and it's more expensive than most teams realize.


What's actually happening

When you hard-code a batch size, you're making a bet:

"This number will be optimal on every run, on every machine, under every memory condition, forever."

That's never true. The optimal chunk size is a function of:

  • Current available memory
  • How heavy the transformation is for this batch
  • How many other jobs are competing for resources
  • Row width variation in the dataset

No static number wins across all these dimensions. You need continuous adaptation.


Enter PID control

PID (Proportional-Integral-Derivative) control is a feedback algorithm used in virtually every control system on the planet — thermostats, drone stabilizers, industrial robots, cruise control.

The idea: measure the current output, compare it to the target, and adjust the control variable to close the gap. Crucially, the adjustment accounts for:

  • P (Proportional): how far off you are right now
  • I (Integral): how long you've been off (accumulated error)
  • D (Derivative): whether you're getting closer or further away

Applied to ETL chunking:

Control Theory ETL Pipeline
Control variable chunk size
Measured variable processing latency per chunk
Setpoint target latency (e.g., 500ms)
PID output adjustment to chunk size

If chunks are processing in 200ms and your target is 500ms → the system can handle larger chunks → PID increases chunk size. If chunks are taking 900ms → too slow → PID decreases chunk size.


StreamChunk: PID control for Python data pipelines

# https://pypi.org/project/streamchunk/
pip install streamchunk
Enter fullscreen mode Exit fullscreen mode

Basic usage

from streamchunk import StreamChunker, FileSource

source = FileSource("events.csv")

chunker = StreamChunker(
    source,
    target_latency_ms=500,
    max_memory_pct=80,
    min_chunk_size=100,
    max_chunk_size=50_000,
)

for chunk, meta in chunker:
    result = transform(chunk)
    load(result)
    chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
Enter fullscreen mode Exit fullscreen mode

That's the full loop. No tuning. report_latency() feeds the actual processing time back into the PID controller, which computes the next chunk size. Convergence happens in 5–10 iterations — typically the first few seconds of a run.

The math (briefly)

error(t)  = target_latency_ms - actual_latency_ms
integral += error(t)
derivative = error(t) - error(t-1)

adjustment = kp*error(t) + ki*integral + kd*derivative
new_size   = clamp(current_size + adjustment, min_size, max_size)
Enter fullscreen mode Exit fullscreen mode

Default gains: kp=0.3, ki=0.05, kd=0.1. These are conservative by design — fast convergence without overshoot in bursty workloads.

Memory ceiling: the OOM killer

The PID loop is smooth and gradual. But what if memory spikes suddenly? StreamChunk adds a hard ceiling that overrides PID output entirely:

mem = psutil.virtual_memory().percent
if mem >= max_memory_pct:
    return max(min_chunk_size, current_size // 2)  # immediate halving
Enter fullscreen mode Exit fullscreen mode

This fires every call until memory drops below threshold. Set max_memory_pct=80 and OOM crashes become essentially impossible.


Going parallel

ParallelStreamChunker distributes work across a worker pool — each worker gets its own dataset partition and its own independent PID controller:

from streamchunk import ParallelStreamChunker

parallel = ParallelStreamChunker(
    data=dataset,
    processor=my_transform_func,
    mode="thread",   # or "process"
    n_workers=16,
    target_latency_ms=500,
    max_memory_pct=80,
)

results = parallel.run()
summary = parallel.summary()

print(f"Total rows: {summary['total_rows']:,}")
print(f"p95 latency: {summary['p95_latency_ms']:.1f}ms")
print(f"Throughput: {summary['rows_per_sec']:,} rows/sec")
Enter fullscreen mode Exit fullscreen mode

Thread vs Process: which one?

StreamChunk auto-detects your CPU topology:

from streamchunk import detect_cpu_threads
print(detect_cpu_threads())
# {'logical_threads': 16, 'physical_cores': 8,
#  'recommended_io': 16, 'recommended_cpu': 8}
Enter fullscreen mode Exit fullscreen mode

The recommended_io vs recommended_cpu distinction matters:

  • mode="thread" + I/O-bound work (Kafka, DB, API): Python's GIL releases during I/O waits. 16 threads = 16 concurrent I/O operations. 12–14× speedup on 8-core/16-thread.
  • mode="process" + CPU-bound work (transforms, ML inference): GIL bypassed entirely. Uses physical core count only to avoid hyperthreading overhead. 6–7× speedup.

Data sources

StreamChunk ships with five production sources and a clean extension API:

# Any iterable
from streamchunk.sources import GeneratorSource
source = GeneratorSource(range(10_000_000))

# CSV / JSONL (memory-efficient, never loads full file)
from streamchunk.sources import FileSource
source = FileSource("data.jsonl", format="jsonl")

# Apache Kafka (unbounded stream)
from streamchunk.sources import KafkaSource
source = KafkaSource("topic", "broker:9092",
    security_protocol="SSL",
    ssl_cafile="/certs/ca.pem")

# Any SQLAlchemy DB (parameterized, safe)
from streamchunk.sources import DatabaseSource
source = DatabaseSource(
    "postgresql://user:pass@host/db",
    "SELECT * FROM logs WHERE ts > :since",
    params={"since": cutoff}
)

# Paginated REST API
from streamchunk.sources import APISource
source = APISource("https://api.example.com/records",
    headers={"Authorization": f"Bearer {token}"},
    page_param="cursor")
Enter fullscreen mode Exit fullscreen mode

Building a custom source

Only two methods required:

from streamchunk.sources import BaseSource
from typing import List, Any

class RedisStreamSource(BaseSource):
    def pull(self, n: int) -> List[Any]:
        return self.redis.xread({"stream": self._cursor},
                                count=n, block=100)

    def is_exhausted(self) -> bool:
        return self._done
Enter fullscreen mode Exit fullscreen mode

Async support

Drop-in async iteration for asyncio-based pipelines:

async def run_async_pipeline():
    chunker = StreamChunker(source, target_latency_ms=300)
    async for chunk, meta in chunker.aiter():
        await async_transform(chunk)
        chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
Enter fullscreen mode Exit fullscreen mode

Prometheus + Grafana integration

from streamchunk import start_metrics_server

start_metrics_server(port=8000)  # starts HTTP /metrics endpoint

# Metrics exposed:
# streamchunk_rows_total (Counter)
# streamchunk_chunks_total (Counter)
# streamchunk_chunk_size_current (Gauge)
# streamchunk_memory_pct (Gauge)
# streamchunk_latency_ms (Histogram → p50/p95/p99 in Grafana)
Enter fullscreen mode Exit fullscreen mode

YAML config for deployment

# streamchunk.yaml
target_latency_ms: 500
max_memory_pct: 80
min_chunk_size: 100
max_chunk_size: 50000
initial_chunk_size: 1000
Enter fullscreen mode Exit fullscreen mode
chunker = StreamChunker.from_config("streamchunk.yaml", source)
Enter fullscreen mode Exit fullscreen mode

Useful when deploying across multiple environments (dev/staging/prod) with different hardware profiles.


Benchmark results

Tested on 8-core / 16-thread cloud instance, 10 representative pipelines:

Configuration Throughput
Single-threaded, no PID Baseline
Single-threaded + PID ~99% of baseline (overhead negligible)
Thread mode, 16 workers 12–14× baseline
Process mode, 8 workers 6–7× baseline

Pipeline-level impact:

Metric Before After
Runtime 4 hours 17–25 min
CPU utilization 12–20% 85–95%
OOM crash rate ~24% ~0%
Manual tuning Required Eliminated

PID overhead per chunk: ~0.1–0.5ms (dominated by psutil.virtual_memory() call, not the PID math itself).


Design patterns used

This library leans on classical OOP patterns:

Pattern Where
Iterator Protocol StreamChunker.__iter__() is a standard Python iterator
Observer report_latency() feeds user observations back to controller
Strategy mode="thread"/"process" swaps executor at runtime
Template Method BaseSource ABC — subclasses implement pull() and is_exhausted()
Factory Method StreamChunker.from_config() — alternative YAML constructor
Facade __init__.py exports — clean public API over complex internals
DTO ChunkMetadata dataclass — immutable per-chunk telemetry bundle

Quick reference

# Install — https://pypi.org/project/streamchunk/
pip install streamchunk
pip install "streamchunk[kafka,database,prometheus,pandas,async]"

# Single worker
StreamChunker(source, target_latency_ms=500, max_memory_pct=80)

# Parallel
ParallelStreamChunker(data, processor, mode="thread", n_workers=16)

# From config
StreamChunker.from_config("config.yaml", source)

# Async
async for chunk, meta in chunker.aiter(): ...

# Metrics server
start_metrics_server(port=8000)

# CPU info
detect_cpu_threads()  # → {logical, physical, recommended_io, recommended_cpu}

# Stats
chunker.stats()  # → {total_rows, total_chunks, avg_latency_ms, p95_latency_ms, ...}
Enter fullscreen mode Exit fullscreen mode

The takeaway

Manual batch size tuning is a solved problem — it just hasn't been widely recognized as a control theory problem yet. PID control gives you:

  • Automatic adaptation to any machine, any dataset, any workload
  • Hard memory ceiling that prevents OOM crashes
  • 12–14× throughput on I/O-bound work with zero extra configuration
  • Prometheus metrics and async support out of the box

StreamChunk v2.0.1 · MIT License · Python 3.8–3.13 · 📦 PyPI

If you've ever woken up at 2 AM because a batch size was wrong, this one's for you.


Drop a comment if you're doing something interesting with ETL pipelines — always curious how others are handling this.

Top comments (0)