DEV Community

Cover image for Validating 1 million rows in 0.41 MB of RAM (how I built a streaming data validator in Python)
Ameer Adeigbe
Ameer Adeigbe

Posted on

Validating 1 million rows in 0.41 MB of RAM (how I built a streaming data validator in Python)

Every Python developer working with large files has hit this wall at least once.

You have a CSV with a few million rows. You want to validate every row against a schema before writing it to a database. You reach for Pydantic, which is excellent and write something like this:

rows = Path("orders.csv").read_text().splitlines()

for line in rows:
    Order.model_validate_json(line)
Enter fullscreen mode Exit fullscreen mode

It works on your 10 MB test file. Then it runs on the 2 GB production file, and your pod gets OOM-killed at 3 AM.

The problem is not Pydantic. Pydantic is a validator, not a file reader. It validates objects you hand it; it doesn't know anything about files, streams, or memory. Loading the whole file first is on you. And for large files, “loading the whole file first” is the bug.

The existing tools don't quite fit

Pandera is the obvious next stop. It's a schema library specifically designed for tabular data. But Pandera validates DataFrames, which means you still have to load your data into a DataFrame first. For a 2 GB Parquet file on a constrained pipeline worker, that's still a problem.

Great Expectations is powerful, but it's a full data quality platform. When you want “validate this CSV before inserting it”, you don't want to configure a data context, a datasource, and an expectation suite.

I couldn't find anything that did the thing I actually wanted:

define a Pydantic schema, point it at a file, get results row by row, never hold more than a small buffer in memory.

So I built it.

Introducing streamval

import streamval as sv
from typing import Literal

class Order(sv.Schema):
    id: int
    customer_email: str
    amount: float
    status: Literal["pending", "shipped", "cancelled"]

for result in sv.stream_csv("orders.csv", schema=Order):
    if result.valid:
        insert_to_db(result.data)
    else:
        log_errors(result.row_index, result.errors)

print(validator.stats)
Enter fullscreen mode Exit fullscreen mode

That's it. No DataFrame. No loading the file first. Rows come out one at a time as ValidationResult objects.

The memory contract

The core design constraint is an AsyncIterator[dict]; every format adapter emits rows as an async generator. The validator never holds more than batch_size rows at once (default: 1000).

Here's what that looks like in practice on a 1,000,000-row CSV:

Streamval Benchmark

0.47 MB peak on 1 million rows. The memory usage is flat from row 1 to row 1,000,000.

How the streaming works

The key abstraction is treating every file format as an async generator. Here's the shape of every adapter:

async def csv_adapter(path, config) -> AsyncIterator[dict]:
    async with aiofiles.open(path) as f:
        buffer = []

        async for line in f:
            buffer.append(line)

            if len(buffer) >= config.chunk_size:
                for row in csv.DictReader(buffer):
                    yield row

                buffer.clear()
Enter fullscreen mode Exit fullscreen mode

The BatchBuffer wraps this generator and groups rows into batches. The CompiledValidationPlan validates a full batch in one call to Pydantic's TypeAdapter, one Python → Rust boundary crossing per batch instead of one per row.

Error handling strategies

There are three built-in modes:

  • fail_fast

Raises StreamValidationError on the first invalid row. Good for CI assertions on fixture files.

  • collect (default)

Emits every row, accumulates errors, and surfaces them via stats after the run. Good for data pipeline reports.

  • skip

Only yields valid rows, logs invalid ones at the WARNING level. Good for “clean the stream and process” pipelines.

validator = sv.StreamValidator(Order, on_error="skip")

for result in validator.stream_parquet("warehouse.parquet"):
    process(result.data)

print(f"Processed {validator.stats.rows_valid} rows")
print(f"Skipped {validator.stats.rows_invalid} rows")

print(
    f"Most common error: "
    f"{max(validator.stats.errors_by_field, key=validator.stats.errors_by_field.get)}"
)
Enter fullscreen mode Exit fullscreen mode

Supported formats

  • CSV (via aiofiles, or polars for the Arrow fast path)
  • JSONL / NDJSON
  • Parquet (via pyarrow row-group streaming)
  • Arrow / Feather IPC
  • HTTP NDJSON streams including SSE and LLM provider output (v0.2)

Install

pip install streamval

# Fast path
pip install streamval[fast]

# HTTP / NDJSON streaming
pip install streamval[http]
The honest performance numbers

Enter fullscreen mode Exit fullscreen mode

Throughput on a 100k-row CSV (Linux, Python 3.12):

Streamval Benchmark

The naive Pydantic loop is 8× faster, but it loads the entire file into RAM.

For a 100 MB file, that's fine.

For a 10 GB file, it isn't.

streamval's value is the memory contract, not raw speed.

What's next

  • Arrow fast path for CSV (targeting 50k+ rps)
  • HTTP NDJSON adapter with SSE support
  • LLM streaming validation helpers for OpenAI / Anthropic formats

Links

If you've ever hit the OOM wall on a large file validation job, I'd love to hear if this helps. Issues and PRs are very welcome.

Top comments (2)

Collapse
 
blinknbuild profile image
BlinkNBuild

The canvas.width = 0 trick for releasing GPU memory is the browser-side equivalent of what you're doing here — both are about forcing the runtime to release backing resources it would otherwise hold. The CompiledValidationPlan batching one Python→Rust boundary crossing per batch instead of per row is a really clean insight. Curious: on the fail_fast mode, does it still release the async generator cleanly on early exit, or does the file handle stay open until GC? Asking because that edge case has burned me with aiofiles before.

Collapse
 
abatan_eniola_c7a251cc52a profile image
Abatan EniolaNimi

Niceone....
Keeping memory completely flat at 0.41 MB for a million rows is wild. I’m really curious about that Polars fast path on your roadmap....