DEV Community

Cover image for The Queue: Producer-Consumer Patterns and Async Communication
Aaron Rose
Aaron Rose

Posted on

The Queue: Producer-Consumer Patterns and Async Communication

Timothy stared at his screen, frustrated once again. He'd successfully made his book scanning system run multiple tasks concurrently, but now he had a different problem: chaos.

"Margaret, look at this," Timothy said, pulling up his code. "I have ten scanner tasks running concurrently, but they're all trying to read from the same list of books. They're fighting over which book to scan next, some books get scanned twice, and sometimes the program just crashes."

import asyncio
import random

books_to_scan = ["Book_1", "Book_2", "Book_3", "Book_4", "Book_5"]
scanned_books = []

async def scanner(worker_id):
    """Scanner task that processes books"""
    while books_to_scan:
        # PROBLEM: Multiple tasks accessing shared state!
        book = books_to_scan.pop(0)  # Race condition!
        print(f"  Worker {worker_id} scanning {book}...")
        await asyncio.sleep(random.uniform(0.5, 1.5))
        scanned_books.append(book)
        print(f"  Worker {worker_id} finished {book}")

async def broken_system():
    """Multiple workers fighting over shared list"""
    print("Starting 5 concurrent scanners...")
    await asyncio.gather(*[scanner(i) for i in range(5)])
    print(f"Scanned {len(scanned_books)} books")

# This will crash or produce wrong results!
Enter fullscreen mode Exit fullscreen mode

Margaret looked at the code and nodded. "You're trying to share state between concurrent tasks. That's a recipe for race conditions. Tasks shouldn't share state—they should send messages to each other. That's what Queues are for."

Understanding the Queue Pattern

"Think about it like the library's book return system," Margaret explained, sketching on her whiteboard. "When patrons return books, they don't directly modify the catalog. They put books in the return bin. Then, different staff members take books from the bin and process them. The bin coordinates the work."

She typed:

import asyncio
import random

async def scanner(worker_id, queue):
    """Scanner task that gets books from queue"""
    while True:
        book = await queue.get()

        if book is None:  # Sentinel value to stop
            queue.task_done()
            break

        print(f"  Worker {worker_id} scanning {book}...")
        await asyncio.sleep(random.uniform(0.5, 1.5))
        print(f"  Worker {worker_id} finished {book}")
        queue.task_done()

async def basic_queue_system():
    """Using a queue to coordinate work"""
    print("Starting queue-based system...")

    # Create the queue
    queue = asyncio.Queue()

    # Put books in the queue
    books = ["Book_1", "Book_2", "Book_3", "Book_4", "Book_5"]
    for book in books:
        await queue.put(book)

    # Start 3 worker tasks
    workers = [asyncio.create_task(scanner(i, queue)) for i in range(3)]

    # Wait for all books to be processed
    await queue.join()

    # Stop workers
    for _ in range(3):
        await queue.put(None)

    await asyncio.gather(*workers)
    print("All books scanned!")

asyncio.run(basic_queue_system())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting queue-based system...
  Worker 0 scanning Book_1...
  Worker 1 scanning Book_2...
  Worker 2 scanning Book_3...
  Worker 1 finished Book_2
  Worker 1 scanning Book_4...
  Worker 0 finished Book_1
  Worker 0 scanning Book_5...
  Worker 2 finished Book_3
  Worker 1 finished Book_4
  Worker 0 finished Book_5
All books scanned!
Enter fullscreen mode Exit fullscreen mode

"See? No race conditions," Margaret said. "The queue handles coordination. Workers take books one at a time, and the queue ensures each book is processed exactly once."

The Queue API

"Let me show you the key methods," Margaret continued.

import asyncio

async def demo_queue_api():
    """Demonstrate basic Queue operations"""
    queue = asyncio.Queue()

    # Put items in the queue
    await queue.put("item_1")
    await queue.put("item_2")
    await queue.put("item_3")

    print(f"Queue size: {queue.qsize()}")
    print(f"Empty? {queue.empty()}")

    # Get items from the queue
    item = await queue.get()
    print(f"Got: {item}")
    queue.task_done()  # Mark as processed

    item = await queue.get()
    print(f"Got: {item}")
    queue.task_done()

    print(f"Queue size after getting 2 items: {queue.qsize()}")

asyncio.run(demo_queue_api())
Enter fullscreen mode Exit fullscreen mode

Output:

Queue size: 3
Empty? False
Got: item_1
Got: item_2
Queue size after getting 2 items: 1
Enter fullscreen mode Exit fullscreen mode

Margaret explained each method:

Key Queue Methods:

await queue.put(item) - Add an item to the queue (blocks if queue is full)

item = await queue.get() - Remove and return an item (blocks if queue is empty)

queue.task_done() - Mark a task as complete (must call after processing each item)

await queue.join() - Block until all items have been processed

queue.qsize() - Return the number of items in the queue

queue.empty() - Return True if the queue is empty

queue.full() - Return True if the queue is full (for bounded queues)

Producer-Consumer Pattern

"The real power comes from separating producers and consumers," Margaret said. "Some tasks create work, others process it."

import asyncio
import random

async def book_finder(queue, num_books):
    """Producer: Finds books that need scanning"""
    print("Book finder starting...")
    for i in range(num_books):
        book_id = f"Book_{i:03d}"
        print(f"  Found {book_id}")
        await queue.put(book_id)
        await asyncio.sleep(random.uniform(0.1, 0.3))  # Simulated finding time
    print("Book finder finished!")

async def book_scanner(worker_id, queue):
    """Consumer: Scans books from the queue"""
    while True:
        book_id = await queue.get()

        if book_id is None:
            queue.task_done()
            break

        print(f"  [Worker {worker_id}] Scanning {book_id}...")
        await asyncio.sleep(random.uniform(0.5, 1.0))  # Simulated scanning
        print(f"  [Worker {worker_id}] Finished {book_id}")
        queue.task_done()

async def producer_consumer_demo():
    """Demonstrate producer-consumer pattern"""
    queue = asyncio.Queue()

    # Start producer
    producer = asyncio.create_task(book_finder(queue, 10))

    # Start 3 consumer workers
    num_workers = 3
    consumers = [
        asyncio.create_task(book_scanner(i, queue)) 
        for i in range(num_workers)
    ]

    # Wait for producer to finish
    await producer

    # Wait for all items to be processed
    await queue.join()

    # Stop consumers
    for _ in range(num_workers):
        await queue.put(None)

    await asyncio.gather(*consumers)
    print("\nAll books processed!")

asyncio.run(producer_consumer_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Book finder starting...
  Found Book_000
  [Worker 0] Scanning Book_000...
  Found Book_001
  [Worker 1] Scanning Book_001...
  Found Book_002
  [Worker 2] Scanning Book_002...
  [Worker 0] Finished Book_000
  Found Book_003
  [Worker 0] Scanning Book_003...
  Found Book_004
  [Worker 1] Finished Book_001
  [Worker 1] Scanning Book_004...
...
Book finder finished!
...
All books processed!
Enter fullscreen mode Exit fullscreen mode

Timothy watched the output. "The finder keeps producing books while the workers consume them. They're completely decoupled!"

"Exactly," Margaret said. "The producer doesn't need to know how many consumers there are, and consumers don't need to know where the work comes from. The queue is the contract between them."

Multiple Producers and Consumers

"You can scale both sides independently," Margaret continued.

import asyncio
import random

async def shelf_scanner(scanner_id, queue, shelf_name, num_books):
    """Producer: Each scanner handles one shelf"""
    print(f"Scanner {scanner_id} starting on {shelf_name}...")
    for i in range(num_books):
        book_id = f"{shelf_name}_Book_{i}"
        await queue.put(book_id)
        await asyncio.sleep(random.uniform(0.1, 0.2))
    print(f"Scanner {scanner_id} finished {shelf_name}")

async def catalog_updater(worker_id, queue, processed_count):
    """Consumer: Updates the catalog database"""
    while True:
        book_id = await queue.get()

        if book_id is None:
            queue.task_done()
            break

        print(f"  [Cataloger {worker_id}] Processing {book_id}")
        await asyncio.sleep(random.uniform(0.3, 0.6))
        processed_count[worker_id] += 1
        queue.task_done()

async def multi_producer_consumer():
    """Multiple producers and consumers"""
    queue = asyncio.Queue()
    processed_count = {i: 0 for i in range(5)}

    # Start 3 producers (scanning different shelves)
    producers = [
        asyncio.create_task(shelf_scanner(0, queue, "Shelf_A", 5)),
        asyncio.create_task(shelf_scanner(1, queue, "Shelf_B", 5)),
        asyncio.create_task(shelf_scanner(2, queue, "Shelf_C", 5)),
    ]

    # Start 5 consumers (cataloging books)
    num_consumers = 5
    consumers = [
        asyncio.create_task(catalog_updater(i, queue, processed_count))
        for i in range(num_consumers)
    ]

    # Wait for all producers to finish
    await asyncio.gather(*producers)

    # Wait for queue to be empty
    await queue.join()

    # Stop consumers
    for _ in range(num_consumers):
        await queue.put(None)

    await asyncio.gather(*consumers)

    print("\n--- Processing Summary ---")
    for worker_id, count in processed_count.items():
        print(f"Cataloger {worker_id}: {count} books")
    print(f"Total: {sum(processed_count.values())} books")

asyncio.run(multi_producer_consumer())
Enter fullscreen mode Exit fullscreen mode

Output:

Scanner 0 starting on Shelf_A...
Scanner 1 starting on Shelf_B...
Scanner 2 starting on Shelf_C...
  [Cataloger 0] Processing Shelf_A_Book_0
  [Cataloger 1] Processing Shelf_B_Book_0
  [Cataloger 2] Processing Shelf_C_Book_0
...
Scanner 0 finished Shelf_A
Scanner 1 finished Shelf_B
Scanner 2 finished Shelf_C

--- Processing Summary ---
Cataloger 0: 3 books
Cataloger 1: 4 books
Cataloger 2: 2 books
Cataloger 3: 3 books
Cataloger 4: 3 books
Total: 15 books
Enter fullscreen mode Exit fullscreen mode

"Work gets distributed automatically," Margaret noted. "Whichever consumer is available grabs the next item."

Bounded Queues and Backpressure

"What if producers are much faster than consumers?" Timothy asked.

"Great question," Margaret said. "That's where bounded queues come in."

import asyncio
import random

async def fast_producer(queue):
    """Producer that generates work quickly"""
    print("Fast producer starting...")
    for i in range(20):
        book = f"Book_{i}"
        print(f"  Producing {book}... (queue size: {queue.qsize()})")
        await queue.put(book)  # This will block when queue is full!
        await asyncio.sleep(0.1)  # Fast!
    print("Producer finished")

async def slow_consumer(worker_id, queue):
    """Consumer that processes work slowly"""
    while True:
        book = await queue.get()

        if book is None:
            queue.task_done()
            break

        print(f"  [Worker {worker_id}] Processing {book}...")
        await asyncio.sleep(1.0)  # Slow!
        queue.task_done()

async def bounded_queue_demo():
    """Demonstrate backpressure with bounded queue"""
    # Bounded queue - only holds 5 items
    queue = asyncio.Queue(maxsize=5)

    # Start slow consumers
    consumers = [
        asyncio.create_task(slow_consumer(i, queue))
        for i in range(2)
    ]

    # Start fast producer (will be throttled by queue)
    producer = asyncio.create_task(fast_producer(queue))

    await producer
    await queue.join()

    for _ in range(2):
        await queue.put(None)

    await asyncio.gather(*consumers)
    print("All done!")

asyncio.run(bounded_queue_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

Fast producer starting...
  Producing Book_0... (queue size: 0)
  [Worker 0] Processing Book_0...
  Producing Book_1... (queue size: 0)
  [Worker 1] Processing Book_1...
  Producing Book_2... (queue size: 0)
  Producing Book_3... (queue size: 1)
  Producing Book_4... (queue size: 2)
  Producing Book_5... (queue size: 3)
  Producing Book_6... (queue size: 4)
  [blocks here until workers make room]
...
Enter fullscreen mode Exit fullscreen mode

"See how the queue fills up to 5 items, then the producer blocks?" Margaret explained. "This is backpressure—the system naturally throttles itself. The producer can't overwhelm the consumers."

Priority Queues

"What if some books are more urgent?" Timothy asked.

import asyncio
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedBook:
    priority: int
    book_id: str = field(compare=False)

async def prioritized_scanner(queue):
    """Add books with different priorities"""
    books = [
        (1, "Urgent_Return"),
        (3, "Regular_Book_A"),
        (1, "Urgent_Reserve"),
        (2, "Damaged_Book"),
        (3, "Regular_Book_B"),
    ]

    for priority, book_id in books:
        item = PrioritizedBook(priority, book_id)
        await queue.put(item)
        print(f"  Added {book_id} (priority {priority})")
        await asyncio.sleep(0.1)

async def priority_worker(worker_id, queue):
    """Process books by priority"""
    while True:
        item = await queue.get()

        if item is None:
            queue.task_done()
            break

        print(f"  [Worker {worker_id}] Processing {item.book_id} (priority {item.priority})")
        await asyncio.sleep(0.5)
        queue.task_done()

async def priority_queue_demo():
    """Demonstrate priority queue"""
    queue = asyncio.PriorityQueue()

    # Start workers
    workers = [
        asyncio.create_task(priority_worker(i, queue))
        for i in range(2)
    ]

    # Add books with priorities
    await prioritized_scanner(queue)

    # Wait for processing
    await queue.join()

    # Stop workers
    for _ in range(2):
        await queue.put(None)

    await asyncio.gather(*workers)

asyncio.run(priority_queue_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  Added Urgent_Return (priority 1)
  Added Regular_Book_A (priority 3)
  Added Urgent_Reserve (priority 1)
  Added Damaged_Book (priority 2)
  Added Regular_Book_B (priority 3)
  [Worker 0] Processing Urgent_Return (priority 1)
  [Worker 1] Processing Urgent_Reserve (priority 1)
  [Worker 0] Processing Damaged_Book (priority 2)
  [Worker 1] Processing Regular_Book_A (priority 3)
  [Worker 0] Processing Regular_Book_B (priority 3)
Enter fullscreen mode Exit fullscreen mode

"Lower numbers have higher priority," Margaret explained. "Urgent books get processed first."

Pipeline Pattern: Chaining Queues

"Sometimes you need multiple stages," Margaret said. "Like an assembly line."

import asyncio
import random

async def scanner(input_queue, output_queue):
    """Stage 1: Scan books"""
    while True:
        book_id = await input_queue.get()

        if book_id is None:
            input_queue.task_done()
            await output_queue.put(None)  # Signal next stage
            break

        print(f"  [Scanner] Scanning {book_id}...")
        await asyncio.sleep(random.uniform(0.3, 0.6))

        # Pass to next stage with scan data
        scanned_data = {"book_id": book_id, "pages": random.randint(100, 500)}
        await output_queue.put(scanned_data)
        input_queue.task_done()

async def indexer(input_queue, output_queue):
    """Stage 2: Index scanned books"""
    while True:
        data = await input_queue.get()

        if data is None:
            input_queue.task_done()
            await output_queue.put(None)  # Signal next stage
            break

        print(f"  [Indexer] Indexing {data['book_id']} ({data['pages']} pages)...")
        await asyncio.sleep(random.uniform(0.2, 0.4))

        # Pass to next stage with index data
        indexed_data = {**data, "indexed": True}
        await output_queue.put(indexed_data)
        input_queue.task_done()

async def cataloger(input_queue):
    """Stage 3: Update catalog"""
    while True:
        data = await input_queue.get()

        if data is None:
            input_queue.task_done()
            break

        print(f"  [Cataloger] Cataloging {data['book_id']}...")
        await asyncio.sleep(random.uniform(0.1, 0.3))
        input_queue.task_done()

async def pipeline_demo():
    """Demonstrate multi-stage pipeline"""
    scan_queue = asyncio.Queue()
    index_queue = asyncio.Queue()
    catalog_queue = asyncio.Queue()

    # Start pipeline stages
    scanner_task = asyncio.create_task(scanner(scan_queue, index_queue))
    indexer_task = asyncio.create_task(indexer(index_queue, catalog_queue))
    cataloger_task = asyncio.create_task(cataloger(catalog_queue))

    # Feed books into pipeline
    books = [f"Book_{i}" for i in range(5)]
    for book in books:
        await scan_queue.put(book)

    await scan_queue.put(None)  # Signal end of input

    # Wait for pipeline to complete
    await asyncio.gather(scanner_task, indexer_task, cataloger_task)
    print("\nPipeline complete!")

asyncio.run(pipeline_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  [Scanner] Scanning Book_0...
  [Scanner] Scanning Book_1...
  [Indexer] Indexing Book_0 (234 pages)...
  [Scanner] Scanning Book_2...
  [Indexer] Indexing Book_1 (456 pages)...
  [Cataloger] Cataloging Book_0...
  [Scanner] Scanning Book_3...
  [Indexer] Indexing Book_2 (189 pages)...
  [Cataloger] Cataloging Book_1...
...
Pipeline complete!
Enter fullscreen mode Exit fullscreen mode

"Each stage processes items as they arrive from the previous stage," Margaret said. "Books flow through the pipeline continuously."

Error Handling and Graceful Shutdown

"What if a worker encounters an error?" Timothy asked.

import asyncio
import random

async def unreliable_processor(worker_id, queue, results):
    """Worker that might fail"""
    while True:
        try:
            book_id = await queue.get()

            if book_id is None:
                queue.task_done()
                break

            # Simulate occasional failures
            if random.random() < 0.2:  # 20% failure rate
                raise ValueError(f"Failed to process {book_id}")

            print(f"  [Worker {worker_id}] Successfully processed {book_id}")
            results["success"].append(book_id)
            await asyncio.sleep(0.5)

        except ValueError as e:
            print(f"  [Worker {worker_id}] ERROR: {e}")
            results["failed"].append(book_id)

        finally:
            queue.task_done()

async def error_handling_demo():
    """Demonstrate error handling in queue processing"""
    queue = asyncio.Queue()
    results = {"success": [], "failed": []}

    # Start workers
    workers = [
        asyncio.create_task(unreliable_processor(i, queue, results))
        for i in range(3)
    ]

    # Add work
    for i in range(10):
        await queue.put(f"Book_{i}")

    # Wait for all work to complete
    await queue.join()

    # Stop workers
    for _ in range(3):
        await queue.put(None)

    await asyncio.gather(*workers)

    print("\n--- Results ---")
    print(f"Successful: {len(results['success'])}")
    print(f"Failed: {len(results['failed'])}")
    if results['failed']:
        print(f"Failed books: {results['failed']}")

asyncio.run(error_handling_demo())
Enter fullscreen mode Exit fullscreen mode

Output:

  [Worker 0] Successfully processed Book_0
  [Worker 1] ERROR: Failed to process Book_1
  [Worker 2] Successfully processed Book_2
  [Worker 0] Successfully processed Book_3
...

--- Results ---
Successful: 8
Failed: 2
Failed books: ['Book_1', 'Book_5']
Enter fullscreen mode Exit fullscreen mode

"Notice the finally block ensures task_done() is always called," Margaret explained. "Even if processing fails, we mark the item as complete so join() doesn't hang."

Queue Patterns Summary

Margaret drew a summary on the whiteboard:

Common Queue Patterns:

  1. Basic Producer-Consumer

    • One or more producers feed queue
    • One or more consumers process items
    • Decouples producers from consumers
  2. Bounded Queue (Backpressure)

    • Queue(maxsize=N) limits queue size
    • Producers block when queue is full
    • Prevents memory overflow
  3. Priority Queue

    • PriorityQueue() processes items by priority
    • Lower numbers = higher priority
    • Use dataclass with order=True
  4. Pipeline (Multi-Stage)

    • Chain queues together
    • Output of one stage → input of next
    • Each stage processes independently
  5. Graceful Shutdown

    • Send sentinel value (None) to stop workers
    • Use try/finally to ensure task_done()
    • Always call queue.join() before shutdown

The Takeaway

Timothy closed his laptop, his book scanning system now properly coordinated.

Key insights:

Don't share state between tasks—use queues to pass messages

asyncio.Queue() provides thread-safe async coordination

await queue.put(item) adds items to the queue

item = await queue.get() retrieves items from the queue

Call queue.task_done() after processing each item

await queue.join() blocks until all items are processed

Producer-consumer pattern decouples work generation from work processing

Bounded queues (maxsize=N) provide automatic backpressure

PriorityQueue processes items by priority (lower number = higher priority)

Pipeline pattern chains queues for multi-stage processing

Always use try/finally to ensure task_done() is called

Send sentinel values (like None) to signal worker shutdown

Multiple producers and consumers can share the same queue

The queue handles all synchronization automatically

Choose queue type based on needs: Queue for FIFO, PriorityQueue for priority, LifoQueue for LIFO

Note: asyncio.Queue is for coordination within a single event loop. For multi-threading use queue.Queue, for multi-processing use multiprocessing.Queue

Margaret and Timothy had transformed his chaotic concurrent system into an organized pipeline. The queue pattern elegantly solved the coordination problem: tasks no longer fought over shared state, they communicated through messages. Workers could be added or removed without changing producer code, and the system naturally throttled itself when consumers couldn't keep up with producers.

As Timothy reviewed the code, he realized that queues weren't just about preventing race conditions—they were about designing systems where components could scale independently, fail gracefully, and communicate clearly. The queue was the contract that made concurrent systems reliable.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Top comments (0)