Timothy stared at his screen, frustrated once again. He'd successfully made his book scanning system run multiple tasks concurrently, but now he had a different problem: chaos.
"Margaret, look at this," Timothy said, pulling up his code. "I have ten scanner tasks running concurrently, but they're all trying to read from the same list of books. They're fighting over which book to scan next, some books get scanned twice, and sometimes the program just crashes."
import asyncio
import random
books_to_scan = ["Book_1", "Book_2", "Book_3", "Book_4", "Book_5"]
scanned_books = []
async def scanner(worker_id):
"""Scanner task that processes books"""
while books_to_scan:
# PROBLEM: Multiple tasks accessing shared state!
book = books_to_scan.pop(0) # Race condition!
print(f" Worker {worker_id} scanning {book}...")
await asyncio.sleep(random.uniform(0.5, 1.5))
scanned_books.append(book)
print(f" Worker {worker_id} finished {book}")
async def broken_system():
"""Multiple workers fighting over shared list"""
print("Starting 5 concurrent scanners...")
await asyncio.gather(*[scanner(i) for i in range(5)])
print(f"Scanned {len(scanned_books)} books")
# This will crash or produce wrong results!
Margaret looked at the code and nodded. "You're trying to share state between concurrent tasks. That's a recipe for race conditions. Tasks shouldn't share state—they should send messages to each other. That's what Queues are for."
Understanding the Queue Pattern
"Think about it like the library's book return system," Margaret explained, sketching on her whiteboard. "When patrons return books, they don't directly modify the catalog. They put books in the return bin. Then, different staff members take books from the bin and process them. The bin coordinates the work."
She typed:
import asyncio
import random
async def scanner(worker_id, queue):
"""Scanner task that gets books from queue"""
while True:
book = await queue.get()
if book is None: # Sentinel value to stop
queue.task_done()
break
print(f" Worker {worker_id} scanning {book}...")
await asyncio.sleep(random.uniform(0.5, 1.5))
print(f" Worker {worker_id} finished {book}")
queue.task_done()
async def basic_queue_system():
"""Using a queue to coordinate work"""
print("Starting queue-based system...")
# Create the queue
queue = asyncio.Queue()
# Put books in the queue
books = ["Book_1", "Book_2", "Book_3", "Book_4", "Book_5"]
for book in books:
await queue.put(book)
# Start 3 worker tasks
workers = [asyncio.create_task(scanner(i, queue)) for i in range(3)]
# Wait for all books to be processed
await queue.join()
# Stop workers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*workers)
print("All books scanned!")
asyncio.run(basic_queue_system())
Output:
Starting queue-based system...
Worker 0 scanning Book_1...
Worker 1 scanning Book_2...
Worker 2 scanning Book_3...
Worker 1 finished Book_2
Worker 1 scanning Book_4...
Worker 0 finished Book_1
Worker 0 scanning Book_5...
Worker 2 finished Book_3
Worker 1 finished Book_4
Worker 0 finished Book_5
All books scanned!
"See? No race conditions," Margaret said. "The queue handles coordination. Workers take books one at a time, and the queue ensures each book is processed exactly once."
The Queue API
"Let me show you the key methods," Margaret continued.
import asyncio
async def demo_queue_api():
"""Demonstrate basic Queue operations"""
queue = asyncio.Queue()
# Put items in the queue
await queue.put("item_1")
await queue.put("item_2")
await queue.put("item_3")
print(f"Queue size: {queue.qsize()}")
print(f"Empty? {queue.empty()}")
# Get items from the queue
item = await queue.get()
print(f"Got: {item}")
queue.task_done() # Mark as processed
item = await queue.get()
print(f"Got: {item}")
queue.task_done()
print(f"Queue size after getting 2 items: {queue.qsize()}")
asyncio.run(demo_queue_api())
Output:
Queue size: 3
Empty? False
Got: item_1
Got: item_2
Queue size after getting 2 items: 1
Margaret explained each method:
Key Queue Methods:
await queue.put(item) - Add an item to the queue (blocks if queue is full)
item = await queue.get() - Remove and return an item (blocks if queue is empty)
queue.task_done() - Mark a task as complete (must call after processing each item)
await queue.join() - Block until all items have been processed
queue.qsize() - Return the number of items in the queue
queue.empty() - Return True if the queue is empty
queue.full() - Return True if the queue is full (for bounded queues)
Producer-Consumer Pattern
"The real power comes from separating producers and consumers," Margaret said. "Some tasks create work, others process it."
import asyncio
import random
async def book_finder(queue, num_books):
"""Producer: Finds books that need scanning"""
print("Book finder starting...")
for i in range(num_books):
book_id = f"Book_{i:03d}"
print(f" Found {book_id}")
await queue.put(book_id)
await asyncio.sleep(random.uniform(0.1, 0.3)) # Simulated finding time
print("Book finder finished!")
async def book_scanner(worker_id, queue):
"""Consumer: Scans books from the queue"""
while True:
book_id = await queue.get()
if book_id is None:
queue.task_done()
break
print(f" [Worker {worker_id}] Scanning {book_id}...")
await asyncio.sleep(random.uniform(0.5, 1.0)) # Simulated scanning
print(f" [Worker {worker_id}] Finished {book_id}")
queue.task_done()
async def producer_consumer_demo():
"""Demonstrate producer-consumer pattern"""
queue = asyncio.Queue()
# Start producer
producer = asyncio.create_task(book_finder(queue, 10))
# Start 3 consumer workers
num_workers = 3
consumers = [
asyncio.create_task(book_scanner(i, queue))
for i in range(num_workers)
]
# Wait for producer to finish
await producer
# Wait for all items to be processed
await queue.join()
# Stop consumers
for _ in range(num_workers):
await queue.put(None)
await asyncio.gather(*consumers)
print("\nAll books processed!")
asyncio.run(producer_consumer_demo())
Output:
Book finder starting...
Found Book_000
[Worker 0] Scanning Book_000...
Found Book_001
[Worker 1] Scanning Book_001...
Found Book_002
[Worker 2] Scanning Book_002...
[Worker 0] Finished Book_000
Found Book_003
[Worker 0] Scanning Book_003...
Found Book_004
[Worker 1] Finished Book_001
[Worker 1] Scanning Book_004...
...
Book finder finished!
...
All books processed!
Timothy watched the output. "The finder keeps producing books while the workers consume them. They're completely decoupled!"
"Exactly," Margaret said. "The producer doesn't need to know how many consumers there are, and consumers don't need to know where the work comes from. The queue is the contract between them."
Multiple Producers and Consumers
"You can scale both sides independently," Margaret continued.
import asyncio
import random
async def shelf_scanner(scanner_id, queue, shelf_name, num_books):
"""Producer: Each scanner handles one shelf"""
print(f"Scanner {scanner_id} starting on {shelf_name}...")
for i in range(num_books):
book_id = f"{shelf_name}_Book_{i}"
await queue.put(book_id)
await asyncio.sleep(random.uniform(0.1, 0.2))
print(f"Scanner {scanner_id} finished {shelf_name}")
async def catalog_updater(worker_id, queue, processed_count):
"""Consumer: Updates the catalog database"""
while True:
book_id = await queue.get()
if book_id is None:
queue.task_done()
break
print(f" [Cataloger {worker_id}] Processing {book_id}")
await asyncio.sleep(random.uniform(0.3, 0.6))
processed_count[worker_id] += 1
queue.task_done()
async def multi_producer_consumer():
"""Multiple producers and consumers"""
queue = asyncio.Queue()
processed_count = {i: 0 for i in range(5)}
# Start 3 producers (scanning different shelves)
producers = [
asyncio.create_task(shelf_scanner(0, queue, "Shelf_A", 5)),
asyncio.create_task(shelf_scanner(1, queue, "Shelf_B", 5)),
asyncio.create_task(shelf_scanner(2, queue, "Shelf_C", 5)),
]
# Start 5 consumers (cataloging books)
num_consumers = 5
consumers = [
asyncio.create_task(catalog_updater(i, queue, processed_count))
for i in range(num_consumers)
]
# Wait for all producers to finish
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Stop consumers
for _ in range(num_consumers):
await queue.put(None)
await asyncio.gather(*consumers)
print("\n--- Processing Summary ---")
for worker_id, count in processed_count.items():
print(f"Cataloger {worker_id}: {count} books")
print(f"Total: {sum(processed_count.values())} books")
asyncio.run(multi_producer_consumer())
Output:
Scanner 0 starting on Shelf_A...
Scanner 1 starting on Shelf_B...
Scanner 2 starting on Shelf_C...
[Cataloger 0] Processing Shelf_A_Book_0
[Cataloger 1] Processing Shelf_B_Book_0
[Cataloger 2] Processing Shelf_C_Book_0
...
Scanner 0 finished Shelf_A
Scanner 1 finished Shelf_B
Scanner 2 finished Shelf_C
--- Processing Summary ---
Cataloger 0: 3 books
Cataloger 1: 4 books
Cataloger 2: 2 books
Cataloger 3: 3 books
Cataloger 4: 3 books
Total: 15 books
"Work gets distributed automatically," Margaret noted. "Whichever consumer is available grabs the next item."
Bounded Queues and Backpressure
"What if producers are much faster than consumers?" Timothy asked.
"Great question," Margaret said. "That's where bounded queues come in."
import asyncio
import random
async def fast_producer(queue):
"""Producer that generates work quickly"""
print("Fast producer starting...")
for i in range(20):
book = f"Book_{i}"
print(f" Producing {book}... (queue size: {queue.qsize()})")
await queue.put(book) # This will block when queue is full!
await asyncio.sleep(0.1) # Fast!
print("Producer finished")
async def slow_consumer(worker_id, queue):
"""Consumer that processes work slowly"""
while True:
book = await queue.get()
if book is None:
queue.task_done()
break
print(f" [Worker {worker_id}] Processing {book}...")
await asyncio.sleep(1.0) # Slow!
queue.task_done()
async def bounded_queue_demo():
"""Demonstrate backpressure with bounded queue"""
# Bounded queue - only holds 5 items
queue = asyncio.Queue(maxsize=5)
# Start slow consumers
consumers = [
asyncio.create_task(slow_consumer(i, queue))
for i in range(2)
]
# Start fast producer (will be throttled by queue)
producer = asyncio.create_task(fast_producer(queue))
await producer
await queue.join()
for _ in range(2):
await queue.put(None)
await asyncio.gather(*consumers)
print("All done!")
asyncio.run(bounded_queue_demo())
Output:
Fast producer starting...
Producing Book_0... (queue size: 0)
[Worker 0] Processing Book_0...
Producing Book_1... (queue size: 0)
[Worker 1] Processing Book_1...
Producing Book_2... (queue size: 0)
Producing Book_3... (queue size: 1)
Producing Book_4... (queue size: 2)
Producing Book_5... (queue size: 3)
Producing Book_6... (queue size: 4)
[blocks here until workers make room]
...
"See how the queue fills up to 5 items, then the producer blocks?" Margaret explained. "This is backpressure—the system naturally throttles itself. The producer can't overwhelm the consumers."
Priority Queues
"What if some books are more urgent?" Timothy asked.
import asyncio
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedBook:
priority: int
book_id: str = field(compare=False)
async def prioritized_scanner(queue):
"""Add books with different priorities"""
books = [
(1, "Urgent_Return"),
(3, "Regular_Book_A"),
(1, "Urgent_Reserve"),
(2, "Damaged_Book"),
(3, "Regular_Book_B"),
]
for priority, book_id in books:
item = PrioritizedBook(priority, book_id)
await queue.put(item)
print(f" Added {book_id} (priority {priority})")
await asyncio.sleep(0.1)
async def priority_worker(worker_id, queue):
"""Process books by priority"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f" [Worker {worker_id}] Processing {item.book_id} (priority {item.priority})")
await asyncio.sleep(0.5)
queue.task_done()
async def priority_queue_demo():
"""Demonstrate priority queue"""
queue = asyncio.PriorityQueue()
# Start workers
workers = [
asyncio.create_task(priority_worker(i, queue))
for i in range(2)
]
# Add books with priorities
await prioritized_scanner(queue)
# Wait for processing
await queue.join()
# Stop workers
for _ in range(2):
await queue.put(None)
await asyncio.gather(*workers)
asyncio.run(priority_queue_demo())
Output:
Added Urgent_Return (priority 1)
Added Regular_Book_A (priority 3)
Added Urgent_Reserve (priority 1)
Added Damaged_Book (priority 2)
Added Regular_Book_B (priority 3)
[Worker 0] Processing Urgent_Return (priority 1)
[Worker 1] Processing Urgent_Reserve (priority 1)
[Worker 0] Processing Damaged_Book (priority 2)
[Worker 1] Processing Regular_Book_A (priority 3)
[Worker 0] Processing Regular_Book_B (priority 3)
"Lower numbers have higher priority," Margaret explained. "Urgent books get processed first."
Pipeline Pattern: Chaining Queues
"Sometimes you need multiple stages," Margaret said. "Like an assembly line."
import asyncio
import random
async def scanner(input_queue, output_queue):
"""Stage 1: Scan books"""
while True:
book_id = await input_queue.get()
if book_id is None:
input_queue.task_done()
await output_queue.put(None) # Signal next stage
break
print(f" [Scanner] Scanning {book_id}...")
await asyncio.sleep(random.uniform(0.3, 0.6))
# Pass to next stage with scan data
scanned_data = {"book_id": book_id, "pages": random.randint(100, 500)}
await output_queue.put(scanned_data)
input_queue.task_done()
async def indexer(input_queue, output_queue):
"""Stage 2: Index scanned books"""
while True:
data = await input_queue.get()
if data is None:
input_queue.task_done()
await output_queue.put(None) # Signal next stage
break
print(f" [Indexer] Indexing {data['book_id']} ({data['pages']} pages)...")
await asyncio.sleep(random.uniform(0.2, 0.4))
# Pass to next stage with index data
indexed_data = {**data, "indexed": True}
await output_queue.put(indexed_data)
input_queue.task_done()
async def cataloger(input_queue):
"""Stage 3: Update catalog"""
while True:
data = await input_queue.get()
if data is None:
input_queue.task_done()
break
print(f" [Cataloger] Cataloging {data['book_id']}...")
await asyncio.sleep(random.uniform(0.1, 0.3))
input_queue.task_done()
async def pipeline_demo():
"""Demonstrate multi-stage pipeline"""
scan_queue = asyncio.Queue()
index_queue = asyncio.Queue()
catalog_queue = asyncio.Queue()
# Start pipeline stages
scanner_task = asyncio.create_task(scanner(scan_queue, index_queue))
indexer_task = asyncio.create_task(indexer(index_queue, catalog_queue))
cataloger_task = asyncio.create_task(cataloger(catalog_queue))
# Feed books into pipeline
books = [f"Book_{i}" for i in range(5)]
for book in books:
await scan_queue.put(book)
await scan_queue.put(None) # Signal end of input
# Wait for pipeline to complete
await asyncio.gather(scanner_task, indexer_task, cataloger_task)
print("\nPipeline complete!")
asyncio.run(pipeline_demo())
Output:
[Scanner] Scanning Book_0...
[Scanner] Scanning Book_1...
[Indexer] Indexing Book_0 (234 pages)...
[Scanner] Scanning Book_2...
[Indexer] Indexing Book_1 (456 pages)...
[Cataloger] Cataloging Book_0...
[Scanner] Scanning Book_3...
[Indexer] Indexing Book_2 (189 pages)...
[Cataloger] Cataloging Book_1...
...
Pipeline complete!
"Each stage processes items as they arrive from the previous stage," Margaret said. "Books flow through the pipeline continuously."
Error Handling and Graceful Shutdown
"What if a worker encounters an error?" Timothy asked.
import asyncio
import random
async def unreliable_processor(worker_id, queue, results):
"""Worker that might fail"""
while True:
try:
book_id = await queue.get()
if book_id is None:
queue.task_done()
break
# Simulate occasional failures
if random.random() < 0.2: # 20% failure rate
raise ValueError(f"Failed to process {book_id}")
print(f" [Worker {worker_id}] Successfully processed {book_id}")
results["success"].append(book_id)
await asyncio.sleep(0.5)
except ValueError as e:
print(f" [Worker {worker_id}] ERROR: {e}")
results["failed"].append(book_id)
finally:
queue.task_done()
async def error_handling_demo():
"""Demonstrate error handling in queue processing"""
queue = asyncio.Queue()
results = {"success": [], "failed": []}
# Start workers
workers = [
asyncio.create_task(unreliable_processor(i, queue, results))
for i in range(3)
]
# Add work
for i in range(10):
await queue.put(f"Book_{i}")
# Wait for all work to complete
await queue.join()
# Stop workers
for _ in range(3):
await queue.put(None)
await asyncio.gather(*workers)
print("\n--- Results ---")
print(f"Successful: {len(results['success'])}")
print(f"Failed: {len(results['failed'])}")
if results['failed']:
print(f"Failed books: {results['failed']}")
asyncio.run(error_handling_demo())
Output:
[Worker 0] Successfully processed Book_0
[Worker 1] ERROR: Failed to process Book_1
[Worker 2] Successfully processed Book_2
[Worker 0] Successfully processed Book_3
...
--- Results ---
Successful: 8
Failed: 2
Failed books: ['Book_1', 'Book_5']
"Notice the finally block ensures task_done() is always called," Margaret explained. "Even if processing fails, we mark the item as complete so join() doesn't hang."
Queue Patterns Summary
Margaret drew a summary on the whiteboard:
Common Queue Patterns:
-
Basic Producer-Consumer
- One or more producers feed queue
- One or more consumers process items
- Decouples producers from consumers
-
Bounded Queue (Backpressure)
-
Queue(maxsize=N)limits queue size - Producers block when queue is full
- Prevents memory overflow
-
-
Priority Queue
-
PriorityQueue()processes items by priority - Lower numbers = higher priority
- Use dataclass with
order=True
-
-
Pipeline (Multi-Stage)
- Chain queues together
- Output of one stage → input of next
- Each stage processes independently
-
Graceful Shutdown
- Send sentinel value (None) to stop workers
- Use
try/finallyto ensuretask_done() - Always call
queue.join()before shutdown
The Takeaway
Timothy closed his laptop, his book scanning system now properly coordinated.
Key insights:
Don't share state between tasks—use queues to pass messages
asyncio.Queue() provides thread-safe async coordination
await queue.put(item) adds items to the queue
item = await queue.get() retrieves items from the queue
Call queue.task_done() after processing each item
await queue.join() blocks until all items are processed
Producer-consumer pattern decouples work generation from work processing
Bounded queues (maxsize=N) provide automatic backpressure
PriorityQueue processes items by priority (lower number = higher priority)
Pipeline pattern chains queues for multi-stage processing
Always use try/finally to ensure task_done() is called
Send sentinel values (like None) to signal worker shutdown
Multiple producers and consumers can share the same queue
The queue handles all synchronization automatically
Choose queue type based on needs: Queue for FIFO, PriorityQueue for priority, LifoQueue for LIFO
Note: asyncio.Queue is for coordination within a single event loop. For multi-threading use queue.Queue, for multi-processing use multiprocessing.Queue
Margaret and Timothy had transformed his chaotic concurrent system into an organized pipeline. The queue pattern elegantly solved the coordination problem: tasks no longer fought over shared state, they communicated through messages. Workers could be added or removed without changing producer code, and the system naturally throttled itself when consumers couldn't keep up with producers.
As Timothy reviewed the code, he realized that queues weren't just about preventing race conditions—they were about designing systems where components could scale independently, fail gracefully, and communicate clearly. The queue was the contract that made concurrent systems reliable.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
Top comments (0)