If you’ve ever tried to shove millions of rows into a database, you know the pain, slow inserts, blocked threads, and that sinking feeling when your pipeline just can’t keep up.
I recently worked on a minimal pipeline design that tackles this problem head-on, and I think it’s worth sharing, especially if you’re building a bronze layer for data lake.
For small to medium scale workloads (think tens of millions of rows), this pipeline is enough. It’s simple, easy to maintain, and doesn’t require spinning up distributed infrastructure. DuckDB is surprisingly capable here.
You can access the repo here: https://github.com/meemeealm/Multithreaded-Ingestion-Pipeline.git
The Idea
The pipeline is built around a producer-consumer model. Instead of one big monolithic process, we split responsibilities:
Producer (Shredder Thread): Reads Parquet rows, batches them, and pushes them into a queue.
Queue (Thread-safe buffer): Acts as the middleman. It smooths out the flow and prevents the producer from overwhelming the consumers.
Consumers (Workers): Multiple threads pull from the queue, cast types, batch inserts, and push data into DuckDB.
DuckDB: Landing zone. Everything fans in here, into a raw_data table.
So for a nice fan-out → fan-in pattern: one producer, many workers (3 workers in example), one database.
Queue Pattern
The critical part of this pipeline is queue. It also provides backpressure control, meaning, if workers are slower, the queue absorbs the imbalance.
The producer fills the queue with batches of rows, but it’s the workers that show the queue pattern in action — they keep pulling jobs out, processing them, and marking them done. This is the classic producer–consumer setup.
def db_ingestor(worker_id, job_queue, shared_con, batch_size):
con = None
try:
# Each worker gets its own cursor from the shared connection
con = shared_con.cursor()
print(f"[Worker - {worker_id}] Ready and waiting for data...")
batch = []
while True:
row = job_queue.get() # Pull next job from the queue
try:
if row is None: # Poison pill → stop signal
if batch:
con.executemany("INSERT INTO raw_data VALUES (?, ?, ?)", batch)
break
# Cast row values into proper types
clean_row = (
int(row['id']),
float(row['value']),
str(row['category'])
)
batch.append(clean_row)
# Insert batch when it reaches the limit
if len(batch) >= batch_size:
con.executemany("INSERT INTO raw_data VALUES (?, ?, ?)", batch)
batch = []
finally:
# Mark the job as done so the queue knows it’s processed
job_queue.task_done()
Why This Works for Bronze Layer
The bronze layer is all about raw ingestion. Here, fancy transformations is not required yet — the most critical job is to just get the data in reliably.
This design is good for -
Parallelism: Multiple workers keep inserts flowing without bottlenecks.
Decoupling: Producer and consumers don’t block each other. If one side slows down, the queue absorbs the shock.
Batching: Both reads and writes are batched, which makes inserts way faster.
Limitations
Python GIL: If your workers are CPU-heavy, threading won’t scale in long run. Multiprocessing or native extensions might be needed.
Single-node DB: DuckDB is designed to run on a single machine, so if your workload grows beyond what one node can handle, you’ll eventually need to switch to a distributed system
Error handling: In bronze layer, usually it doesn’t demand perfection, but you’ll still want retries and logging (which is not in this pipeline).

Top comments (0)