Most tutorials show you how to send a task to Celery. None of them show you what
happens when that task runs twice.
In production, tasks run twice constantly — network retries, worker crashes,
accidental re-queues. If your task isn't idempotent, you'll process the same order
twice, send the same email twice, or charge a customer twice.
Here's exactly how I built idempotent task execution in a multi-tenant workload.
The problem
A simplified version of what I was dealing with:
Webhook comes in → task queued → worker processes it
Worker dies mid-execution → task requeued → second worker picks it up
Same task runs twice → duplicate side effects
The naive fix is "just check if it already ran." But that check itself has a race
condition if two workers pick up the same task simultaneously.
The solution: idempotency keys + atomic locking
Every task gets an idempotency key derived from its input — not a random UUID,
but a deterministic hash of the payload. If the same payload comes in twice, it
produces the same key.
import hashlib
import json
def make_idempotency_key(task_name: str, payload: dict) -> str:
payload_hash = hashlib.sha256(
json.dumps(payload, sort_keys=True).encode()
).hexdigest()[:16]
return f"task:{task_name}:{payload_hash}"
Before executing, the worker tries to acquire a Redis lock using SET NX EX
(set if not exists, with expiry). Only one worker wins the lock. The others
see the key exists and exit cleanly.
import redis
from celery import Task
r = redis.Redis(host="localhost", port=6379)
class IdempotentTask(Task):
def __call__(self, *args, **kwargs):
key = make_idempotency_key(self.name, kwargs)
lock_acquired = r.set(key, "processing", nx=True, ex=300)
if not lock_acquired:
# Another worker is handling this — safe to skip
return {"status": "skipped", "reason": "duplicate"}
try:
result = self.run(*args, **kwargs)
r.set(key, "completed", ex=86400) # mark done for 24hrs
return result
except Exception:
r.delete(key) # release lock on failure so it can retry
raise
What about the dead-letter queue?
Tasks that fail repeatedly (after your retry limit) go to a DLQ — a separate
Redis list or SQS queue depending on your broker. The DLQ is where you debug,
not where you panic.
# celery config
task_acks_late = True # only ack after success — not on receipt
task_reject_on_worker_lost = True # requeue if worker dies mid-task
task_serializer = "json"
# per-task retry config
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
queue="default",
)
def process_webhook(self, payload: dict):
try:
handle(payload)
except TransientError as e:
raise self.retry(exc=e, countdown=2 ** self.request.retries)
except PermanentError:
# don't retry — send to DLQ manually
send_to_dlq(payload)
What I learned
Three things that aren't obvious until you've been burned:
task_acks_late = True is not optional. Default Celery behaviour acknowledges
the task the moment a worker receives it. If the worker crashes before finishing,
the task is gone. Late acks mean the message stays in the queue until the task
succeeds.
Lock expiry must be longer than your task timeout. If your task takes 4
minutes and your lock expires in 3, a second worker will pick it up while the
first is still running. Set ex to at least 2× your p99 task duration.
The idempotency key must be deterministic from the payload, not the task ID.
Celery generates a new task ID on every retry. If you key on task ID, retries
won't detect duplicates. Key on the business payload instead.
This setup has been running in production handling clinical imaging workloads
for over a year with zero duplicate executions.
Top comments (0)