Google Cloud Tasks guarantees at-least-once delivery. That means your handler WILL be called multiple times for the same task. If you're not handling this, you have bugs. You might not know it yet.
This guide shows the exact pattern we use in production to handle 1000s of Cloud Tasks per day with zero duplicates.
The Problem
Here's what happens without idempotency:
# ❌ BAD: This creates duplicates on retry
@app.post("/tasks/score")
async def handle_score_task(request: ScoreRequest):
result = ScoringResult(
id=uuid4(), # New ID every time!
evaluation_id=request.evaluation_id,
score=compute_score(request),
)
db.add(result)
db.commit()
return {"status": "ok"}
Cloud Tasks retries if:
- Your handler returns 5xx
- Your handler takes longer than the timeout
- The network hiccups
Every retry creates a new row with a new UUID. Your data is silently corrupted.
The Fix: 3-Step Idempotency Pattern
Step 1: Deterministic IDs
Generate IDs from the task payload, not randomly:
import hashlib
def compute_task_id(evaluation_id: str, model: str, turn: int) -> str:
"""Same inputs → same ID. Always."""
payload = f"{evaluation_id}:{model}:{turn}"
return hashlib.sha256(payload.encode()).hexdigest()[:32]
Step 2: Guarded Upserts
Use ON CONFLICT DO NOTHING:
from sqlalchemy import text
@app.post("/tasks/score")
async def handle_score_task(request: ScoreRequest):
task_id = compute_task_id(
request.evaluation_id,
request.model,
request.turn,
)
result = db.execute(text("""
INSERT INTO scoring_results (id, evaluation_id, score, model, turn)
VALUES (:id, :eval_id, :score, :model, :turn)
ON CONFLICT (id) DO NOTHING
RETURNING id
"""), {
"id": task_id,
"eval_id": request.evaluation_id,
"score": compute_score(request),
"model": request.model,
"turn": request.turn,
})
was_inserted = result.fetchone() is not None
if not was_inserted:
logger.info(f"Duplicate delivery detected: {task_id}")
db.commit()
return {"status": "ok", "duplicate": not was_inserted}
Step 3: Return 200 Even on Duplicates
This is critical. If your handler detects a duplicate and returns 409 (Conflict), Cloud Tasks will retry it. Forever.
# ✅ Always return 200 for idempotent operations
return {"status": "ok", "duplicate": not was_inserted}
Handling Multi-Step Tasks
What if your task does 3 things?
@app.post("/tasks/evaluate")
async def handle_evaluation(request: EvalRequest):
task_id = compute_task_id(request.evaluation_id, request.model)
# Step 1: Generate chain-of-thought
cot_id = f"{task_id}:cot"
db.execute(text("""
INSERT INTO cot_results (id, task_id, content)
VALUES (:id, :task_id, :content)
ON CONFLICT (id) DO NOTHING
"""), {"id": cot_id, "task_id": task_id, "content": generate_cot(request)})
# Step 2: Generate critique
critique_id = f"{task_id}:critique"
db.execute(text("""
INSERT INTO critiques (id, task_id, content)
VALUES (:id, :task_id, :content)
ON CONFLICT (id) DO NOTHING
"""), {"id": critique_id, "task_id": task_id, "content": generate_critique(request)})
# Step 3: Store final score
db.execute(text("""
INSERT INTO scores (id, task_id, score)
VALUES (:id, :task_id, :score)
ON CONFLICT (id) DO NOTHING
"""), {"id": f"{task_id}:score", "task_id": task_id, "score": compute_score(request)})
db.commit()
return {"status": "ok"}
Each step has its own deterministic ID. If the handler crashes after step 2, the retry will skip steps 1 and 2 (already exist) and only execute step 3.
Cloud Run Configuration
Don't forget the infrastructure side:
# service.yaml for Cloud Run
spec:
template:
metadata:
annotations:
run.googleapis.com/execution-environment: gen2
spec:
containerConcurrency: 10 # Don't overwhelm the DB
timeoutSeconds: 3600 # Match your longest task
containers:
- resources:
limits:
memory: 2Gi
cpu: "2"
We reduced concurrency from 80 to 10 per instance. Why? Each handler holds a database connection. 80 concurrent handlers × 10 instances = 800 connections. Our Cloud SQL instance can't handle that. 10 × 10 = 100 connections. Much better.
Adding Correlation IDs
For debugging duplicate deliveries:
from uuid import uuid4
import structlog
@app.middleware("http")
async def add_correlation_id(request, call_next):
# Cloud Tasks sends X-CloudTasks-TaskName header
task_name = request.headers.get("X-CloudTasks-TaskName", str(uuid4()))
structlog.contextvars.bind_contextvars(correlation_id=task_name)
response = await call_next(request)
return response
Now every log line from a Cloud Tasks handler includes the task name. Duplicate deliveries show up as multiple log entries with the same correlation ID.
Testing Idempotency
def test_handler_is_idempotent():
"""Same request twice → same result, no duplicates."""
request = ScoreRequest(
evaluation_id="eval-123",
model="gpt-4o",
turn=1,
)
# First call
response1 = client.post("/tasks/score", json=request.dict())
assert response1.status_code == 200
assert response1.json()["duplicate"] == False
# Second call (simulating retry)
response2 = client.post("/tasks/score", json=request.dict())
assert response2.status_code == 200
assert response2.json()["duplicate"] == True
# Verify only one row exists
count = db.execute(text("SELECT COUNT(*) FROM scoring_results")).scalar()
assert count == 1
TL;DR
| Problem | Solution |
|---|---|
| Random UUIDs | Deterministic IDs from payload |
| INSERT | INSERT ... ON CONFLICT DO NOTHING |
| Return 409 on duplicate | Return 200 always |
| No logging | Correlation IDs from Cloud Tasks headers |
| High concurrency | Cap containerConcurrency (10, not 80) |
This pattern handles our entire evaluation pipeline -- thousands of tasks per day, zero duplicates, zero data corruption.
Read the full article on my blog. I write about production GCP patterns — find me at humzakt.github.io.
Top comments (0)