When I took over a transaction ingestion system running in production, one of the first things I noticed was the time it took to process CSV uploads.
Users regularly uploaded files averaging around 50,000 rows, with some reaching over 600,000. The upload API handled everything synchronously: parsing the file, inserting rows into Cassandra, fetching historical prices from an external service per transaction, performing reconciliation, calculating balances and tax metrics, and only then returning a response.
In practice, this meant requests could take anywhere from five to ten minutes to complete. Frontend and Nginx timeouts had been extended to accommodate this behavior. The system technically worked, but only by allowing long blocking requests.
It became clear that this wasn’t just a performance issue. The architecture itself was tightly coupled to the request lifecycle. Heavy compute and IO operations were happening directly inside the API path.
Synchronous designs often work well at small scale. But as data size increases, the bottlenecks become harder to ignore.
What the Original Flow Looked Like
At a high level, the upload endpoint was responsible for doing everything in a single request lifecycle.
Once a user uploaded a CSV file, the API would:
- Parse the file row by row
- Insert each transaction into Cassandra
- Fetch historical pricing data for every transaction
- Perform reconciliation
- Calculate balances and tax metrics
- Return a response only after all processing was complete
Conceptually, it looked like this:
@app.post("/upload")
def upload_csv(file):
rows = parse_csv(file)
for row in rows:
insert_into_cassandra(row)
price = fetch_historical_price(row)
reconciled = reconcile_transaction(row, price)
processed = calculate_metrics(reconciled)
update_transaction(processed)
return {"status": "completed"}
Every upload request had to wait for:
- Large file parsing
- Tens of thousands of database writes
- External API calls per transaction
- Reconciliation and tax calculations
There was also no automated retry mechanism. If processing failed midway, it required manual intervention.
At the same time, another API responsible for returning transaction data to the frontend fetched raw records from Cassandra and performed calculations inside the request path. That endpoint routinely took 30–40 seconds to complete.
The ingestion flow looked like this:
User
│
▼
Upload API
│
▼
Parse CSV
│
▼
Insert into Cassandra
│
▼
Fetch Historical Price (External API)
│
▼
Reconciliation
│
▼
Tax / Metrics Calculation
│
▼
HTTP Response
Each stage blocked the next. The system accumulated CPU-bound work (parsing, calculations) and IO-bound work (database writes, external API calls) inside a single HTTP request.
Rethinking the Request Lifecycle
The first architectural decision was simple:
Move heavy work out of the request path.
Instead of completing ingestion synchronously, the upload API was redesigned to become status-based:
@app.post("/upload")
def upload_csv(file):
job_id = create_job_record(status="queued")
queue.publish({"job_id": job_id})
return {"job_id": job_id, "status": "processing"}
The API now initiated a pipeline rather than executing it.
Building a Staged Async Pipeline
Processing was decomposed into multiple independent consumers:
Upload API
│
▼
Queue A → Insert Consumer
│
▼
Queue B → Historical Price Workers
│
▼
Queue C → Reconciliation Consumer
│
▼
Queue D → Tax / Metrics Consumer
│
▼
Read-Optimized Table
Each stage became independently scalable and observable.
Handling Large CSV Files
Files were processed in dynamic chunks (typically 500–2000 rows depending on structure). Chunk-level parallelism was introduced using threads, configurable via environment variables (defaulted to 8).
This reduced memory spikes and improved CPU utilization during parsing and metric computation.
Optimizing External API Calls with a Master–Worker Pattern
Originally, historical pricing APIs were called sequentially per transaction.
This was redesigned using a master–worker model:
- The master grouped transactions into batches
- Workers processed batches in parallel
- Workers wrote results directly to Cassandra
- The master coordinated using
asyncio.gather
async def master(batch_groups):
tasks = [worker.process(batch) for batch in batch_groups]
results = await asyncio.gather(*tasks)
return results
This allowed controlled concurrency, better rate-limit handling, and parallel processing instead of sequential IO.
Optimizing the Read Path
The read API was slow because it was calculating metrics at request time.
Instead of using a Materialized View, a new Cassandra table was introduced:
- Contained only frontend-required fields
- Included computed metrics
- Excluded ingestion-only data
A new consumer transformed processed records into this optimized table.
The read API now simply applied filters and limits on precomputed data.
Measured Impact
Upload API
Before: 5–10 minutes
After: ~0.5 seconds
End-to-End Processing
Before: 5–20 minutes
After: 2–4 minutes
Read API
Before: 30–40 seconds
After: ~0.5–1 second
CPU Utilization
Before: ~5–8%
After: ~70–80%
The improvement was not just about latency reduction. It was about reshaping how work flowed through the system.
Engineering Takeaways
- Heavy work does not belong in the request lifecycle.
- Separate ingestion from presentation.
- Parallelism should be intentional and controlled.
- External rate limits are architectural constraints.
- Pipelines scale better than monolithic request handlers.
This redesign changed how I approach ingestion systems.
When processing grows, the architecture must evolve with it.
If you’ve worked on ingestion pipelines or faced similar architectural bottlenecks, I’d be interested to hear how you approached it.
Happy to discuss trade-offs or alternative designs.
Top comments (0)