DEV Community

Thesius Code
Thesius Code

Posted on • Originally published at datanest-stores.pages.dev

Async Task Queue Toolkit: Task Queue Patterns Guide

Task Queue Patterns Guide

Practical patterns and best practices for building reliable async task queues in production.

Delivery Guarantees

At-Least-Once Delivery

The default mode for this toolkit. A task may be delivered more than once if:

  • The worker crashes after starting execution but before acknowledgment
  • A network partition occurs between the worker and Redis
  • The visibility timeout expires before the task completes
# Tasks are acknowledged AFTER successful execution
await handler(*msg.args, **msg.kwargs)
await broker.ack(msg.task_id)  # Only after success
Enter fullscreen mode Exit fullscreen mode

When to use: Most workloads where occasional duplicate processing is acceptable (sending emails, updating caches, writing to idempotent APIs).

Exactly-Once Semantics

True exactly-once delivery is impossible in distributed systems, but you can achieve exactly-once processing through idempotency:

@task(retries=3)
async def process_payment(payment_id: str, amount: float) -> dict:
    """Process a payment exactly once using idempotency keys."""
    # Check if already processed
    existing = await db.payments.find_one({"payment_id": payment_id})
    if existing:
        return {"status": "already_processed", "id": payment_id}

    # Process with idempotency key
    result = await payment_gateway.charge(
        idempotency_key=payment_id,
        amount=amount,
    )
    await db.payments.insert_one({"payment_id": payment_id, **result})
    return result
Enter fullscreen mode Exit fullscreen mode

Idempotency

Why It Matters

With at-least-once delivery, your task handlers MUST be idempotent — running them multiple times with the same input should produce the same result.

Patterns

1. Database upserts instead of inserts:

# Bad: duplicate inserts
await db.orders.insert_one(order)

# Good: upsert with unique key
await db.orders.update_one(
    {"order_id": order["order_id"]},
    {"$set": order},
    upsert=True,
)
Enter fullscreen mode Exit fullscreen mode

2. Idempotency keys for external APIs:

await stripe.charges.create(
    amount=1000,
    idempotency_key=f"charge-{order_id}",
)
Enter fullscreen mode Exit fullscreen mode

3. Deduplication with Redis:

async def is_duplicate(task_id: str, ttl: int = 3600) -> bool:
    key = f"dedup:{task_id}"
    was_set = await redis.set(key, "1", nx=True, ex=ttl)
    return was_set is None  # None means key already existed
Enter fullscreen mode Exit fullscreen mode

Rate Limiting

Token Bucket (built-in)

The RateLimitMiddleware implements a per-task-name token bucket:

from queue.middleware import MiddlewareChain, RateLimitMiddleware

chain = MiddlewareChain()
chain.add(RateLimitMiddleware(max_per_second=10.0))
Enter fullscreen mode Exit fullscreen mode

External API Rate Limits

For APIs with strict rate limits, combine the middleware with task-level control:

@task(retries=5, timeout=30)
async def call_external_api(endpoint: str, payload: dict) -> dict:
    """Call an external API with rate-limit awareness."""
    response = await http_client.post(endpoint, json=payload)

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        raise RateLimitExceeded(f"Rate limited, retry after {retry_after}s")

    response.raise_for_status()
    return response.json()
Enter fullscreen mode Exit fullscreen mode

Priority Queue Patterns

Priority Levels

Use numeric priorities to control processing order:

class Priority:
    CRITICAL = 100   # System health, security alerts
    HIGH = 50        # User-facing actions (password reset)
    NORMAL = 10      # Standard background work
    LOW = 1          # Analytics, cleanup, batch jobs

await broker.enqueue("password_reset", priority=Priority.HIGH, ...)
await broker.enqueue("generate_report", priority=Priority.LOW, ...)
Enter fullscreen mode Exit fullscreen mode

Starvation Prevention

Low-priority tasks can starve if high-priority tasks arrive continuously. Mitigate with separate queues:

# Use separate queues per priority tier
critical_broker = RedisBroker(queue_name="queue:critical")
standard_broker = RedisBroker(queue_name="queue:standard")
bulk_broker = RedisBroker(queue_name="queue:bulk")
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue

Handling Failed Tasks

Tasks that exhaust all retries are moved to the DLQ for inspection:

from queue.dead_letter import DeadLetterQueue

dlq = DeadLetterQueue(broker=broker, max_size=10_000)

# Inspect recent failures
entries = await dlq.peek(count=20)
for entry in entries:
    print(f"Task: {entry.task_name}, Error: {entry.error}")

# Replay a specific task
new_id = await dlq.replay(task_id="abc-123", retries=3)

# Purge old failures
removed = await dlq.purge()
Enter fullscreen mode Exit fullscreen mode

Monitoring DLQ Growth

Set up alerts when the DLQ grows beyond a threshold:

from queue.monitoring import QueueMonitor

monitor = QueueMonitor(broker=broker)
metrics = await monitor.snapshot()

if metrics.dead_letter_count > 100:
    await alert_ops_team(
        f"DLQ has {metrics.dead_letter_count} entries"
    )
Enter fullscreen mode Exit fullscreen mode

Retry Strategies

Exponential Backoff

The built-in retry re-enqueues immediately. For exponential backoff, use delayed re-enqueue:

import asyncio

@task(retries=5, timeout=60)
async def resilient_task(data: dict) -> None:
    attempt = data.get("_attempt", 0)
    try:
        await do_work(data)
    except TransientError:
        delay = min(2 ** attempt, 300)  # Cap at 5 minutes
        await asyncio.sleep(delay)
        raise  # Let the retry mechanism handle it
Enter fullscreen mode Exit fullscreen mode

Circuit Breaker

Combine with a circuit breaker for external services:

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, reset_timeout: float = 60):
        self.failures = 0
        self.threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.last_failure = 0.0
        self.state = "closed"  # closed, open, half-open
Enter fullscreen mode Exit fullscreen mode

Monitoring Best Practices

Track these key metrics:

  • Queue depth: Number of pending tasks (alert if growing)
  • Processing count: Tasks currently being executed
  • Throughput: Tasks completed per second
  • Latency: Time from enqueue to completion
  • Error rate: Failed tasks per minute
  • DLQ size: Permanently failed tasks
monitor = QueueMonitor(broker=broker)
metrics = await monitor.snapshot()

# Export to your metrics system
statsd.gauge("queue.depth", metrics.depth)
statsd.gauge("queue.throughput", metrics.throughput_per_sec)
statsd.gauge("queue.latency_ms", metrics.avg_latency_ms)
statsd.gauge("queue.dlq_size", metrics.dead_letter_count)
Enter fullscreen mode Exit fullscreen mode

This is 1 of 14 resources in the Python Developer Pro toolkit. Get the complete [Async Task Queue Toolkit] with all files, templates, and documentation for $39.

Get the Full Kit →

Or grab the entire Python Developer Pro bundle (14 products) for $159 — save 30%.

Get the Complete Bundle →


Related Articles

Top comments (0)