Task Queue Patterns Guide
Practical patterns and best practices for building reliable async task queues in production.
Delivery Guarantees
At-Least-Once Delivery
The default mode for this toolkit. A task may be delivered more than once if:
- The worker crashes after starting execution but before acknowledgment
- A network partition occurs between the worker and Redis
- The visibility timeout expires before the task completes
# Tasks are acknowledged AFTER successful execution
await handler(*msg.args, **msg.kwargs)
await broker.ack(msg.task_id) # Only after success
When to use: Most workloads where occasional duplicate processing is acceptable (sending emails, updating caches, writing to idempotent APIs).
Exactly-Once Semantics
True exactly-once delivery is impossible in distributed systems, but you can achieve exactly-once processing through idempotency:
@task(retries=3)
async def process_payment(payment_id: str, amount: float) -> dict:
"""Process a payment exactly once using idempotency keys."""
# Check if already processed
existing = await db.payments.find_one({"payment_id": payment_id})
if existing:
return {"status": "already_processed", "id": payment_id}
# Process with idempotency key
result = await payment_gateway.charge(
idempotency_key=payment_id,
amount=amount,
)
await db.payments.insert_one({"payment_id": payment_id, **result})
return result
Idempotency
Why It Matters
With at-least-once delivery, your task handlers MUST be idempotent — running them multiple times with the same input should produce the same result.
Patterns
1. Database upserts instead of inserts:
# Bad: duplicate inserts
await db.orders.insert_one(order)
# Good: upsert with unique key
await db.orders.update_one(
{"order_id": order["order_id"]},
{"$set": order},
upsert=True,
)
2. Idempotency keys for external APIs:
await stripe.charges.create(
amount=1000,
idempotency_key=f"charge-{order_id}",
)
3. Deduplication with Redis:
async def is_duplicate(task_id: str, ttl: int = 3600) -> bool:
key = f"dedup:{task_id}"
was_set = await redis.set(key, "1", nx=True, ex=ttl)
return was_set is None # None means key already existed
Rate Limiting
Token Bucket (built-in)
The RateLimitMiddleware implements a per-task-name token bucket:
from queue.middleware import MiddlewareChain, RateLimitMiddleware
chain = MiddlewareChain()
chain.add(RateLimitMiddleware(max_per_second=10.0))
External API Rate Limits
For APIs with strict rate limits, combine the middleware with task-level control:
@task(retries=5, timeout=30)
async def call_external_api(endpoint: str, payload: dict) -> dict:
"""Call an external API with rate-limit awareness."""
response = await http_client.post(endpoint, json=payload)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitExceeded(f"Rate limited, retry after {retry_after}s")
response.raise_for_status()
return response.json()
Priority Queue Patterns
Priority Levels
Use numeric priorities to control processing order:
class Priority:
CRITICAL = 100 # System health, security alerts
HIGH = 50 # User-facing actions (password reset)
NORMAL = 10 # Standard background work
LOW = 1 # Analytics, cleanup, batch jobs
await broker.enqueue("password_reset", priority=Priority.HIGH, ...)
await broker.enqueue("generate_report", priority=Priority.LOW, ...)
Starvation Prevention
Low-priority tasks can starve if high-priority tasks arrive continuously. Mitigate with separate queues:
# Use separate queues per priority tier
critical_broker = RedisBroker(queue_name="queue:critical")
standard_broker = RedisBroker(queue_name="queue:standard")
bulk_broker = RedisBroker(queue_name="queue:bulk")
Dead Letter Queue
Handling Failed Tasks
Tasks that exhaust all retries are moved to the DLQ for inspection:
from queue.dead_letter import DeadLetterQueue
dlq = DeadLetterQueue(broker=broker, max_size=10_000)
# Inspect recent failures
entries = await dlq.peek(count=20)
for entry in entries:
print(f"Task: {entry.task_name}, Error: {entry.error}")
# Replay a specific task
new_id = await dlq.replay(task_id="abc-123", retries=3)
# Purge old failures
removed = await dlq.purge()
Monitoring DLQ Growth
Set up alerts when the DLQ grows beyond a threshold:
from queue.monitoring import QueueMonitor
monitor = QueueMonitor(broker=broker)
metrics = await monitor.snapshot()
if metrics.dead_letter_count > 100:
await alert_ops_team(
f"DLQ has {metrics.dead_letter_count} entries"
)
Retry Strategies
Exponential Backoff
The built-in retry re-enqueues immediately. For exponential backoff, use delayed re-enqueue:
import asyncio
@task(retries=5, timeout=60)
async def resilient_task(data: dict) -> None:
attempt = data.get("_attempt", 0)
try:
await do_work(data)
except TransientError:
delay = min(2 ** attempt, 300) # Cap at 5 minutes
await asyncio.sleep(delay)
raise # Let the retry mechanism handle it
Circuit Breaker
Combine with a circuit breaker for external services:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60):
self.failures = 0
self.threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure = 0.0
self.state = "closed" # closed, open, half-open
Monitoring Best Practices
Track these key metrics:
- Queue depth: Number of pending tasks (alert if growing)
- Processing count: Tasks currently being executed
- Throughput: Tasks completed per second
- Latency: Time from enqueue to completion
- Error rate: Failed tasks per minute
- DLQ size: Permanently failed tasks
monitor = QueueMonitor(broker=broker)
metrics = await monitor.snapshot()
# Export to your metrics system
statsd.gauge("queue.depth", metrics.depth)
statsd.gauge("queue.throughput", metrics.throughput_per_sec)
statsd.gauge("queue.latency_ms", metrics.avg_latency_ms)
statsd.gauge("queue.dlq_size", metrics.dead_letter_count)
This is 1 of 14 resources in the Python Developer Pro toolkit. Get the complete [Async Task Queue Toolkit] with all files, templates, and documentation for $39.
Or grab the entire Python Developer Pro bundle (14 products) for $159 — save 30%.
Top comments (0)