DEV Community

Humza Tareen
Humza Tareen

Posted on • Originally published at humzakt.github.io

How to Build Idempotent Cloud Tasks Handlers in Python (The Pattern That Eliminated Our Duplicate Record Bugs)

Google Cloud Tasks guarantees at-least-once delivery. That means your handler WILL be called multiple times for the same task. If you're not handling this, you have bugs. You might not know it yet.

This guide shows the exact pattern we use in production to handle 1000s of Cloud Tasks per day with zero duplicates.

The Problem

Here's what happens without idempotency:

# ❌ BAD: This creates duplicates on retry
@app.post("/tasks/score")
async def handle_score_task(request: ScoreRequest):
    result = ScoringResult(
        id=uuid4(),  # New ID every time!
        evaluation_id=request.evaluation_id,
        score=compute_score(request),
    )
    db.add(result)
    db.commit()
    return {"status": "ok"}
Enter fullscreen mode Exit fullscreen mode

Cloud Tasks retries if:

  • Your handler returns 5xx
  • Your handler takes longer than the timeout
  • The network hiccups

Every retry creates a new row with a new UUID. Your data is silently corrupted.

The Fix: 3-Step Idempotency Pattern

Step 1: Deterministic IDs

Generate IDs from the task payload, not randomly:

import hashlib

def compute_task_id(evaluation_id: str, model: str, turn: int) -> str:
    """Same inputs → same ID. Always."""
    payload = f"{evaluation_id}:{model}:{turn}"
    return hashlib.sha256(payload.encode()).hexdigest()[:32]
Enter fullscreen mode Exit fullscreen mode

Step 2: Guarded Upserts

Use ON CONFLICT DO NOTHING:

from sqlalchemy import text

@app.post("/tasks/score")
async def handle_score_task(request: ScoreRequest):
    task_id = compute_task_id(
        request.evaluation_id,
        request.model,
        request.turn,
    )

    result = db.execute(text("""
        INSERT INTO scoring_results (id, evaluation_id, score, model, turn)
        VALUES (:id, :eval_id, :score, :model, :turn)
        ON CONFLICT (id) DO NOTHING
        RETURNING id
    """), {
        "id": task_id,
        "eval_id": request.evaluation_id,
        "score": compute_score(request),
        "model": request.model,
        "turn": request.turn,
    })

    was_inserted = result.fetchone() is not None

    if not was_inserted:
        logger.info(f"Duplicate delivery detected: {task_id}")

    db.commit()
    return {"status": "ok", "duplicate": not was_inserted}
Enter fullscreen mode Exit fullscreen mode

Step 3: Return 200 Even on Duplicates

This is critical. If your handler detects a duplicate and returns 409 (Conflict), Cloud Tasks will retry it. Forever.

# ✅ Always return 200 for idempotent operations
return {"status": "ok", "duplicate": not was_inserted}
Enter fullscreen mode Exit fullscreen mode

Handling Multi-Step Tasks

What if your task does 3 things?

@app.post("/tasks/evaluate")
async def handle_evaluation(request: EvalRequest):
    task_id = compute_task_id(request.evaluation_id, request.model)

    # Step 1: Generate chain-of-thought
    cot_id = f"{task_id}:cot"
    db.execute(text("""
        INSERT INTO cot_results (id, task_id, content)
        VALUES (:id, :task_id, :content)
        ON CONFLICT (id) DO NOTHING
    """), {"id": cot_id, "task_id": task_id, "content": generate_cot(request)})

    # Step 2: Generate critique
    critique_id = f"{task_id}:critique"
    db.execute(text("""
        INSERT INTO critiques (id, task_id, content)
        VALUES (:id, :task_id, :content)
        ON CONFLICT (id) DO NOTHING
    """), {"id": critique_id, "task_id": task_id, "content": generate_critique(request)})

    # Step 3: Store final score
    db.execute(text("""
        INSERT INTO scores (id, task_id, score)
        VALUES (:id, :task_id, :score)
        ON CONFLICT (id) DO NOTHING
    """), {"id": f"{task_id}:score", "task_id": task_id, "score": compute_score(request)})

    db.commit()
    return {"status": "ok"}
Enter fullscreen mode Exit fullscreen mode

Each step has its own deterministic ID. If the handler crashes after step 2, the retry will skip steps 1 and 2 (already exist) and only execute step 3.

Cloud Run Configuration

Don't forget the infrastructure side:

# service.yaml for Cloud Run
spec:
  template:
    metadata:
      annotations:
        run.googleapis.com/execution-environment: gen2
    spec:
      containerConcurrency: 10  # Don't overwhelm the DB
      timeoutSeconds: 3600      # Match your longest task
      containers:
        - resources:
            limits:
              memory: 2Gi
              cpu: "2"
Enter fullscreen mode Exit fullscreen mode

We reduced concurrency from 80 to 10 per instance. Why? Each handler holds a database connection. 80 concurrent handlers × 10 instances = 800 connections. Our Cloud SQL instance can't handle that. 10 × 10 = 100 connections. Much better.

Adding Correlation IDs

For debugging duplicate deliveries:

from uuid import uuid4
import structlog

@app.middleware("http")
async def add_correlation_id(request, call_next):
    # Cloud Tasks sends X-CloudTasks-TaskName header
    task_name = request.headers.get("X-CloudTasks-TaskName", str(uuid4()))
    structlog.contextvars.bind_contextvars(correlation_id=task_name)
    response = await call_next(request)
    return response
Enter fullscreen mode Exit fullscreen mode

Now every log line from a Cloud Tasks handler includes the task name. Duplicate deliveries show up as multiple log entries with the same correlation ID.

Testing Idempotency

def test_handler_is_idempotent():
    """Same request twice → same result, no duplicates."""
    request = ScoreRequest(
        evaluation_id="eval-123",
        model="gpt-4o",
        turn=1,
    )

    # First call
    response1 = client.post("/tasks/score", json=request.dict())
    assert response1.status_code == 200
    assert response1.json()["duplicate"] == False

    # Second call (simulating retry)
    response2 = client.post("/tasks/score", json=request.dict())
    assert response2.status_code == 200
    assert response2.json()["duplicate"] == True

    # Verify only one row exists
    count = db.execute(text("SELECT COUNT(*) FROM scoring_results")).scalar()
    assert count == 1
Enter fullscreen mode Exit fullscreen mode

TL;DR

Problem Solution
Random UUIDs Deterministic IDs from payload
INSERT INSERT ... ON CONFLICT DO NOTHING
Return 409 on duplicate Return 200 always
No logging Correlation IDs from Cloud Tasks headers
High concurrency Cap containerConcurrency (10, not 80)

This pattern handles our entire evaluation pipeline -- thousands of tasks per day, zero duplicates, zero data corruption.


Read the full article on my blog. I write about production GCP patterns — find me at humzakt.github.io.


Top comments (0)