DEV Community

Cover image for Simple Queue Can Save Your Pipeline: DuckDB + Python
Mee Mee Alainmar
Mee Mee Alainmar

Posted on

Simple Queue Can Save Your Pipeline: DuckDB + Python

If you’ve ever tried to shove millions of rows into a database, you know the pain, slow inserts, blocked threads, and that sinking feeling when your pipeline just can’t keep up.

I recently worked on a minimal pipeline design that tackles this problem head-on, and I think it’s worth sharing, especially if you’re building a bronze layer for data lake.

For small to medium scale workloads (think tens of millions of rows), this pipeline is enough. It’s simple, easy to maintain, and doesn’t require spinning up distributed infrastructure. DuckDB is surprisingly capable here.

You can access the repo here: https://github.com/meemeealm/Multithreaded-Ingestion-Pipeline.git


The Idea
The pipeline is built around a producer-consumer model. Instead of one big monolithic process, we split responsibilities:

  • Producer (Shredder Thread): Reads Parquet rows, batches them, and pushes them into a queue.

  • Queue (Thread-safe buffer): Acts as the middleman. It smooths out the flow and prevents the producer from overwhelming the consumers.

  • Consumers (Workers): Multiple threads pull from the queue, cast types, batch inserts, and push data into DuckDB.

  • DuckDB: Landing zone. Everything fans in here, into a raw_data table.

So for a nice fan-out → fan-in pattern: one producer, many workers (3 workers in example), one database.

Parallel Ingestion Pipeline


Queue Pattern

The critical part of this pipeline is queue. It also provides backpressure control, meaning, if workers are slower, the queue absorbs the imbalance.

The producer fills the queue with batches of rows, but it’s the workers that show the queue pattern in action — they keep pulling jobs out, processing them, and marking them done. This is the classic producer–consumer setup.

def db_ingestor(worker_id, job_queue, shared_con, batch_size):
    con = None
    try:
        # Each worker gets its own cursor from the shared connection
        con = shared_con.cursor()
        print(f"[Worker - {worker_id}] Ready and waiting for data...")

        batch = []
        while True:
            row = job_queue.get()  # Pull next job from the queue
            try:
                if row is None:  # Poison pill → stop signal
                    if batch:
                        con.executemany("INSERT INTO raw_data VALUES (?, ?, ?)", batch)
                    break

                # Cast row values into proper types
                clean_row = (
                    int(row['id']), 
                    float(row['value']), 
                    str(row['category'])
                )
                batch.append(clean_row)

                # Insert batch when it reaches the limit
                if len(batch) >= batch_size:
                    con.executemany("INSERT INTO raw_data VALUES (?, ?, ?)", batch)
                    batch = []

            finally:
                # Mark the job as done so the queue knows it’s processed
                job_queue.task_done()

Enter fullscreen mode Exit fullscreen mode

Why This Works for Bronze Layer

The bronze layer is all about raw ingestion. Here, fancy transformations is not required yet — the most critical job is to just get the data in reliably.

This design is good for -
Parallelism: Multiple workers keep inserts flowing without bottlenecks.
Decoupling: Producer and consumers don’t block each other. If one side slows down, the queue absorbs the shock.
Batching: Both reads and writes are batched, which makes inserts way faster.


Limitations

Python GIL: If your workers are CPU-heavy, threading won’t scale in long run. Multiprocessing or native extensions might be needed.

Single-node DB: DuckDB is designed to run on a single machine, so if your workload grows beyond what one node can handle, you’ll eventually need to switch to a distributed system

Error handling: In bronze layer, usually it doesn’t demand perfection, but you’ll still want retries and logging (which is not in this pipeline).

Top comments (0)