
You've done this before. You need to batch-process a large dataset. You pick a chunk size — maybe 1000, maybe 10000 — run a quick test, it looks fine, and you ship it.
Three weeks later, your pipeline is crawling at 15% CPU while you're paying for 8 cores. Or it's randomly OOM-crashing on Tuesday nights when the dataset is slightly wider than usual.
This is the static batch size problem, and it's more expensive than most teams realize.
What's actually happening
When you hard-code a batch size, you're making a bet:
"This number will be optimal on every run, on every machine, under every memory condition, forever."
That's never true. The optimal chunk size is a function of:
- Current available memory
- How heavy the transformation is for this batch
- How many other jobs are competing for resources
- Row width variation in the dataset
No static number wins across all these dimensions. You need continuous adaptation.
Enter PID control
PID (Proportional-Integral-Derivative) control is a feedback algorithm used in virtually every control system on the planet — thermostats, drone stabilizers, industrial robots, cruise control.
The idea: measure the current output, compare it to the target, and adjust the control variable to close the gap. Crucially, the adjustment accounts for:
- P (Proportional): how far off you are right now
- I (Integral): how long you've been off (accumulated error)
- D (Derivative): whether you're getting closer or further away
Applied to ETL chunking:
| Control Theory | ETL Pipeline |
|---|---|
| Control variable | chunk size |
| Measured variable | processing latency per chunk |
| Setpoint | target latency (e.g., 500ms) |
| PID output | adjustment to chunk size |
If chunks are processing in 200ms and your target is 500ms → the system can handle larger chunks → PID increases chunk size. If chunks are taking 900ms → too slow → PID decreases chunk size.
StreamChunk: PID control for Python data pipelines
# https://pypi.org/project/streamchunk/
pip install streamchunk
Basic usage
from streamchunk import StreamChunker, FileSource
source = FileSource("events.csv")
chunker = StreamChunker(
source,
target_latency_ms=500,
max_memory_pct=80,
min_chunk_size=100,
max_chunk_size=50_000,
)
for chunk, meta in chunker:
result = transform(chunk)
load(result)
chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
That's the full loop. No tuning. report_latency() feeds the actual processing time back into the PID controller, which computes the next chunk size. Convergence happens in 5–10 iterations — typically the first few seconds of a run.
The math (briefly)
error(t) = target_latency_ms - actual_latency_ms
integral += error(t)
derivative = error(t) - error(t-1)
adjustment = kp*error(t) + ki*integral + kd*derivative
new_size = clamp(current_size + adjustment, min_size, max_size)
Default gains: kp=0.3, ki=0.05, kd=0.1. These are conservative by design — fast convergence without overshoot in bursty workloads.
Memory ceiling: the OOM killer
The PID loop is smooth and gradual. But what if memory spikes suddenly? StreamChunk adds a hard ceiling that overrides PID output entirely:
mem = psutil.virtual_memory().percent
if mem >= max_memory_pct:
return max(min_chunk_size, current_size // 2) # immediate halving
This fires every call until memory drops below threshold. Set max_memory_pct=80 and OOM crashes become essentially impossible.
Going parallel
ParallelStreamChunker distributes work across a worker pool — each worker gets its own dataset partition and its own independent PID controller:
from streamchunk import ParallelStreamChunker
parallel = ParallelStreamChunker(
data=dataset,
processor=my_transform_func,
mode="thread", # or "process"
n_workers=16,
target_latency_ms=500,
max_memory_pct=80,
)
results = parallel.run()
summary = parallel.summary()
print(f"Total rows: {summary['total_rows']:,}")
print(f"p95 latency: {summary['p95_latency_ms']:.1f}ms")
print(f"Throughput: {summary['rows_per_sec']:,} rows/sec")
Thread vs Process: which one?
StreamChunk auto-detects your CPU topology:
from streamchunk import detect_cpu_threads
print(detect_cpu_threads())
# {'logical_threads': 16, 'physical_cores': 8,
# 'recommended_io': 16, 'recommended_cpu': 8}
The recommended_io vs recommended_cpu distinction matters:
-
mode="thread"+ I/O-bound work (Kafka, DB, API): Python's GIL releases during I/O waits. 16 threads = 16 concurrent I/O operations. 12–14× speedup on 8-core/16-thread. -
mode="process"+ CPU-bound work (transforms, ML inference): GIL bypassed entirely. Uses physical core count only to avoid hyperthreading overhead. 6–7× speedup.
Data sources
StreamChunk ships with five production sources and a clean extension API:
# Any iterable
from streamchunk.sources import GeneratorSource
source = GeneratorSource(range(10_000_000))
# CSV / JSONL (memory-efficient, never loads full file)
from streamchunk.sources import FileSource
source = FileSource("data.jsonl", format="jsonl")
# Apache Kafka (unbounded stream)
from streamchunk.sources import KafkaSource
source = KafkaSource("topic", "broker:9092",
security_protocol="SSL",
ssl_cafile="/certs/ca.pem")
# Any SQLAlchemy DB (parameterized, safe)
from streamchunk.sources import DatabaseSource
source = DatabaseSource(
"postgresql://user:pass@host/db",
"SELECT * FROM logs WHERE ts > :since",
params={"since": cutoff}
)
# Paginated REST API
from streamchunk.sources import APISource
source = APISource("https://api.example.com/records",
headers={"Authorization": f"Bearer {token}"},
page_param="cursor")
Building a custom source
Only two methods required:
from streamchunk.sources import BaseSource
from typing import List, Any
class RedisStreamSource(BaseSource):
def pull(self, n: int) -> List[Any]:
return self.redis.xread({"stream": self._cursor},
count=n, block=100)
def is_exhausted(self) -> bool:
return self._done
Async support
Drop-in async iteration for asyncio-based pipelines:
async def run_async_pipeline():
chunker = StreamChunker(source, target_latency_ms=300)
async for chunk, meta in chunker.aiter():
await async_transform(chunk)
chunker.report_latency(meta.chunk_id, meta.elapsed_ms)
Prometheus + Grafana integration
from streamchunk import start_metrics_server
start_metrics_server(port=8000) # starts HTTP /metrics endpoint
# Metrics exposed:
# streamchunk_rows_total (Counter)
# streamchunk_chunks_total (Counter)
# streamchunk_chunk_size_current (Gauge)
# streamchunk_memory_pct (Gauge)
# streamchunk_latency_ms (Histogram → p50/p95/p99 in Grafana)
YAML config for deployment
# streamchunk.yaml
target_latency_ms: 500
max_memory_pct: 80
min_chunk_size: 100
max_chunk_size: 50000
initial_chunk_size: 1000
chunker = StreamChunker.from_config("streamchunk.yaml", source)
Useful when deploying across multiple environments (dev/staging/prod) with different hardware profiles.
Benchmark results
Tested on 8-core / 16-thread cloud instance, 10 representative pipelines:
| Configuration | Throughput |
|---|---|
| Single-threaded, no PID | Baseline |
| Single-threaded + PID | ~99% of baseline (overhead negligible) |
| Thread mode, 16 workers | 12–14× baseline |
| Process mode, 8 workers | 6–7× baseline |
Pipeline-level impact:
| Metric | Before | After |
|---|---|---|
| Runtime | 4 hours | 17–25 min |
| CPU utilization | 12–20% | 85–95% |
| OOM crash rate | ~24% | ~0% |
| Manual tuning | Required | Eliminated |
PID overhead per chunk: ~0.1–0.5ms (dominated by psutil.virtual_memory() call, not the PID math itself).
Design patterns used
This library leans on classical OOP patterns:
| Pattern | Where |
|---|---|
| Iterator Protocol |
StreamChunker.__iter__() is a standard Python iterator |
| Observer |
report_latency() feeds user observations back to controller |
| Strategy |
mode="thread"/"process" swaps executor at runtime |
| Template Method |
BaseSource ABC — subclasses implement pull() and is_exhausted()
|
| Factory Method |
StreamChunker.from_config() — alternative YAML constructor |
| Facade |
__init__.py exports — clean public API over complex internals |
| DTO |
ChunkMetadata dataclass — immutable per-chunk telemetry bundle |
Quick reference
# Install — https://pypi.org/project/streamchunk/
pip install streamchunk
pip install "streamchunk[kafka,database,prometheus,pandas,async]"
# Single worker
StreamChunker(source, target_latency_ms=500, max_memory_pct=80)
# Parallel
ParallelStreamChunker(data, processor, mode="thread", n_workers=16)
# From config
StreamChunker.from_config("config.yaml", source)
# Async
async for chunk, meta in chunker.aiter(): ...
# Metrics server
start_metrics_server(port=8000)
# CPU info
detect_cpu_threads() # → {logical, physical, recommended_io, recommended_cpu}
# Stats
chunker.stats() # → {total_rows, total_chunks, avg_latency_ms, p95_latency_ms, ...}
The takeaway
Manual batch size tuning is a solved problem — it just hasn't been widely recognized as a control theory problem yet. PID control gives you:
- Automatic adaptation to any machine, any dataset, any workload
- Hard memory ceiling that prevents OOM crashes
- 12–14× throughput on I/O-bound work with zero extra configuration
- Prometheus metrics and async support out of the box
StreamChunk v2.0.1 · MIT License · Python 3.8–3.13 · 📦 PyPI
If you've ever woken up at 2 AM because a batch size was wrong, this one's for you.
Drop a comment if you're doing something interesting with ETL pipelines — always curious how others are handling this.
Top comments (0)