<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Pavel Goldin</title>
    <description>The latest articles on DEV Community by Pavel Goldin (@wicsion).</description>
    <link>https://dev.to/wicsion</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3909402%2F1e3079f5-7b6a-4e99-96d1-df4dd699225b.jpg</url>
      <title>DEV Community: Pavel Goldin</title>
      <link>https://dev.to/wicsion</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/wicsion"/>
    <language>en</language>
    <item>
      <title>I Thought I'd Just Call a Blockchain API. It Didn't Work Out That Way.</title>
      <dc:creator>Pavel Goldin</dc:creator>
      <pubDate>Sat, 02 May 2026 18:24:52 +0000</pubDate>
      <link>https://dev.to/wicsion/i-thought-id-just-call-a-blockchain-api-it-didnt-work-out-that-way-4mp</link>
      <guid>https://dev.to/wicsion/i-thought-id-just-call-a-blockchain-api-it-didnt-work-out-that-way-4mp</guid>
      <description>&lt;p&gt;I walked into a crypto project thinking I'd just pull data from a blockchain API. That's not how it went.&lt;br&gt;
I join the project. Stack: FastAPI, PostgreSQL, Redis as Celery broker, Celery workers, Docker, Web3. Startup on a hype wave, real money, architecture built on the fly. I look at the payment processing architecture and my first thought: guys, are you serious? Financial operations with real money, zero idempotency, Redis as broker with no persistence, Web3.py synchronous calls inside Celery tasks.&lt;br&gt;
The conversation was short: here's the task, fix what's there. Deadlines were burning.&lt;/p&gt;

&lt;h2&gt;
  
  
  What Was Broken
&lt;/h2&gt;

&lt;p&gt;First month of prod. A user writes to support: credited twice, withdrew double. I open the logs, clean. Two identical events, both 200, four seconds apart. Both processed. The user got double the balance.&lt;br&gt;
Daily reconciliation with on-chain data showed a discrepancy: several accounts with a balance higher than confirmed transactions should allow. In the first month we found 23 duplicate credits across roughly 180k transactions, around a 0.013% error rate. 23 double credits in a month. Real money, not a metric.&lt;br&gt;
The first thing that surfaced: duplicates from the provider. Alchemy, Infura, and every other blockchain provider operates on at-least-once delivery. On network failure, restart, or under load the provider retries delivery. The provider says so right in the docs. That's not a bug, that's the rules of the game. The provider retries delivery, your code has to survive that without consequences. Ours didn't.&lt;br&gt;
It got worse. Two parallel withdrawal requests read the balance simultaneously, both saw enough funds, both passed validation. Two requests read the balance at the same time, both see enough money, both deduct. Textbook race condition.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;async def withdraw(conn, user_id: int, amount: Decimal):&lt;br&gt;
    balance = await conn.fetchval(&lt;br&gt;
        "SELECT balance FROM users WHERE id = $1", user_id&lt;br&gt;
    )&lt;br&gt;
    if balance &amp;gt;= amount:&lt;br&gt;
        await conn.execute(&lt;br&gt;
            "UPDATE users SET balance = balance - $1 WHERE id = $2",&lt;br&gt;
            amount, user_id&lt;br&gt;
        )&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Then this. Celery with default settings acknowledges the task to the broker at the moment of receipt. A worker dies mid-processing, the event is acknowledged, the DB write never happened. No retry, no DLQ. Worker died, task acknowledged, money never arrived. The user waits and has no idea what happened.&lt;br&gt;
And a separate silent killer: amount gets serialized to JSON through the Celery broker as a float. Decimal("50.1") becomes a JSON float, meaning 50.099999999999994. At scale this accumulates into a real loss. Nobody noticed until they ran the numbers.&lt;br&gt;
Last one: calling .delay() directly from the webhook handler creates a window between the DB write and the queue insertion. If the process dies in that moment, the event hangs in pending with no automatic recovery.&lt;br&gt;
Five problems total. I started fixing them.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;## First Instinct: Redis Distributed Lock&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;SET NX EX on user_id. The pattern is described by Antirez, implemented in 20 minutes. Didn't fly.&lt;br&gt;
Here's the specific scenario that showed up in the logs. A worker acquires the lock in Redis. Starts a transaction in PostgreSQL. Between those two operations, the OOM killer takes out the process. The PostgreSQL transaction rolled back automatically, balance unchanged. The Redis lock hangs for 30 seconds until TTL. After 30 seconds the next worker acquires the lock, sees that the idempotency_key isn't recorded (nobody was around to write it, the transaction rolled back) and processes the event again. Double credit. Both workers are clean in the logs.&lt;br&gt;
The problem isn't the TTL size. The problem is the absence of cross-system atomicity between Redis and PostgreSQL. Redis doesn't work here, no atomicity with PostgreSQL. A code-level check doesn't work either, two workers will both pass the SELECT before the INSERT. The only thing that's atomic by definition: a unique constraint. With money there's no "almost right."&lt;/p&gt;

&lt;p&gt;`CREATE TABLE payment_events (&lt;br&gt;
    event_id     TEXT PRIMARY KEY,&lt;br&gt;
    user_id      INTEGER NOT NULL REFERENCES users(id),&lt;br&gt;
    amount       NUMERIC(38, 18) NOT NULL,&lt;br&gt;
    event_type   TEXT NOT NULL,&lt;br&gt;
    status       TEXT NOT NULL DEFAULT 'pending',&lt;br&gt;
    retry_count  INTEGER NOT NULL DEFAULT 0,&lt;br&gt;
    created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),&lt;br&gt;
    updated_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),&lt;br&gt;
    CONSTRAINT valid_status CHECK (&lt;br&gt;
        status IN ('pending', 'enqueued', 'processing', 'confirmed', 'failed')&lt;br&gt;
    )&lt;br&gt;
);&lt;/p&gt;

&lt;p&gt;CREATE TABLE balance_events (&lt;br&gt;
    id              BIGSERIAL PRIMARY KEY,&lt;br&gt;
    user_id         INTEGER NOT NULL REFERENCES users(id),&lt;br&gt;
    amount          NUMERIC(38, 18) NOT NULL,&lt;br&gt;
    event_type      TEXT NOT NULL,&lt;br&gt;
    source_event_id TEXT NOT NULL,&lt;br&gt;
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),&lt;br&gt;
    CONSTRAINT uq_balance_events_source UNIQUE (source_event_id, event_type)&lt;br&gt;
);&lt;/p&gt;

&lt;p&gt;CREATE TABLE processed_events (&lt;br&gt;
    idempotency_key TEXT PRIMARY KEY,&lt;br&gt;
    outcome         TEXT NOT NULL DEFAULT 'pending',&lt;br&gt;
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()&lt;br&gt;
);&lt;/p&gt;

&lt;p&gt;CREATE TABLE dead_letter_queue (&lt;br&gt;
    id         BIGSERIAL PRIMARY KEY,&lt;br&gt;
    event_id   TEXT NOT NULL,&lt;br&gt;
    event_type TEXT NOT NULL,&lt;br&gt;
    user_id    INTEGER NOT NULL,&lt;br&gt;
    amount     NUMERIC(38, 18) NOT NULL,&lt;br&gt;
    error      TEXT NOT NULL,&lt;br&gt;
    attempt    INTEGER NOT NULL DEFAULT 1,&lt;br&gt;
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()&lt;br&gt;
);&lt;/p&gt;

&lt;p&gt;ALTER TABLE users&lt;br&gt;
    ADD COLUMN IF NOT EXISTS initial_balance NUMERIC(38, 18) NOT NULL DEFAULT 0,&lt;br&gt;
    ADD CONSTRAINT balance_non_negative CHECK (balance &amp;gt;= 0);&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_payment_events_pending&lt;br&gt;
    ON payment_events (updated_at, created_at)&lt;br&gt;
    WHERE status = 'pending';&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_payment_events_enqueued&lt;br&gt;
    ON payment_events (updated_at)&lt;br&gt;
    WHERE status = 'enqueued';&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_payment_events_processing&lt;br&gt;
    ON payment_events (updated_at)&lt;br&gt;
    WHERE status = 'processing';&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_balance_events_user_id&lt;br&gt;
    ON balance_events (user_id);&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_balance_events_created_at&lt;br&gt;
    ON balance_events (created_at DESC);&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_processed_events_created&lt;br&gt;
    ON processed_events (created_at);&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_processed_events_pending_stale&lt;br&gt;
    ON processed_events (created_at)&lt;br&gt;
    WHERE outcome = 'pending';&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_dlq_event_id&lt;br&gt;
    ON dead_letter_queue (event_id);&lt;/p&gt;

&lt;p&gt;CREATE INDEX idx_dlq_created_at&lt;br&gt;
    ON dead_letter_queue (created_at DESC);`&lt;/p&gt;

&lt;p&gt;Three non-obvious decisions in the schema.&lt;br&gt;
NUMERIC(38, 18), not NUMERIC(20, 8). The amount column stores in ETH, not wei. The webhook provider sends an already-converted value. If your provider returns wei, convert at the boundary: amount_eth = Decimal(wei_str) / Decimal(10**18) before passing to _validate_amount. ERC-20 tokens declare their own decimals(): USDC/USDT use 6, WBTC uses 8, DAI/WETH/MKR use 18. ETH in wei is also 10^18. NUMERIC(20, 8) handles USDC/USDT, but physically cannot store 18-decimal tokens, so we take the worst case: NUMERIC(38, 18).&lt;br&gt;
initial_balance is needed for reconciliation. During migration you populate it with the current balance: UPDATE users SET initial_balance = balance WHERE . This means balance_events starts filling from zero, and hot_path_balance_check correctly accounts only for users whose operations have all gone through balance_events. For new systems initial_balance stays at 0.&lt;br&gt;
Separate indexes for pending/enqueued/processing instead of a single status IN (...), because pollers use different access patterns. idx_payment_events_pending is a partial index on (updated_at, created_at) for ORDER BY created_at in enqueue_pending_events, otherwise the planner sorts without an index.&lt;br&gt;
retry_count in payment_events was added to prevent infinite pending -&amp;gt; enqueued cycling during a durable Redis outage. More on that in the degradation section.&lt;/p&gt;

&lt;h2&gt;
  
  
  How It Got Fixed
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Initialization&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;`import os&lt;br&gt;
import uuid&lt;br&gt;
import json&lt;br&gt;
import hmac&lt;br&gt;
import random&lt;br&gt;
import hashlib&lt;br&gt;
import secrets&lt;br&gt;
import threading&lt;br&gt;
import structlog&lt;br&gt;
import psycopg2&lt;br&gt;
import psycopg2.extras&lt;br&gt;
import psycopg2.pool&lt;br&gt;
import redis as redis_lib&lt;br&gt;
from typing import Literal, Optional&lt;br&gt;
import re&lt;br&gt;
from decimal import Decimal, InvalidOperation&lt;br&gt;
from contextvars import ContextVar&lt;br&gt;
from dataclasses import dataclass, field&lt;br&gt;
from datetime import datetime, timezone, timedelta&lt;br&gt;
from celery import shared_task&lt;br&gt;
from celery.exceptions import Ignore, MaxRetriesExceededError&lt;/p&gt;

&lt;p&gt;logger = structlog.get_logger()&lt;/p&gt;

&lt;p&gt;@dataclass(frozen=True)&lt;br&gt;
class Settings:&lt;br&gt;
    DATABASE_URL:   str&lt;br&gt;
    WEBHOOK_SECRET: str&lt;br&gt;
    ETH_RPC_URL:    str&lt;br&gt;
    ALERT_EMAIL:    str&lt;br&gt;
    REDIS_URL:      str = "redis://localhost:6379/0"&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@classmethod
def from_env(cls) -&amp;gt; "Settings":
    required = ("DATABASE_URL", "WEBHOOK_SECRET", "ETH_RPC_URL", "ALERT_EMAIL")
    missing  = [k for k in required if not os.environ.get(k)]
    if missing:
        raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
    return cls(
        DATABASE_URL   = os.environ["DATABASE_URL"],
        WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"],
        ETH_RPC_URL    = os.environ["ETH_RPC_URL"],
        ALERT_EMAIL    = os.environ["ALERT_EMAIL"],
        REDIS_URL      = os.environ.get("REDIS_URL", "redis://localhost:6379/0"),
    )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;settings = Settings.from_env()&lt;/p&gt;

&lt;p&gt;_redis_client = redis_lib.Redis.from_url(&lt;br&gt;
    settings.REDIS_URL,&lt;br&gt;
    decode_responses=True,&lt;br&gt;
    socket_connect_timeout=2,&lt;br&gt;
    socket_timeout=2,&lt;br&gt;
    retry_on_timeout=False,&lt;br&gt;
)`&lt;/p&gt;

&lt;p&gt;send_alert: a rate-limited wrapper around the logger. In prod it's replaced with a PagerDuty/OpsGenie SDK. Identical alert_key values within the cooldown window are suppressed. If no key is provided, sends without rate-limiting, for one-off critical alerts. Never throws exceptions.&lt;br&gt;
_alert_last_sent grows with unique keys. If you generate keys per-event-id (which we do for orphan alerts), over a month that's millions of entries. So when it overflows, we first clean out stale keys, and if there's still no room after cleanup, we suppress new ones. Hacky, yes. But it hasn't fallen over in eight months.&lt;/p&gt;

&lt;p&gt;`_alert_lock = threading.Lock()&lt;br&gt;
_alert_last_sent: dict = {}&lt;br&gt;
MAX_ALERT_KEYS = 1_000&lt;/p&gt;

&lt;p&gt;def send_alert(message: str, alert_key: Optional[str] = None,&lt;br&gt;
               cooldown_seconds: int = 300) -&amp;gt; None:&lt;br&gt;
    try:&lt;br&gt;
        if alert_key is None:&lt;br&gt;
            logger.critical("ALERT", message=message)&lt;br&gt;
            return&lt;br&gt;
        with _alert_lock:&lt;br&gt;
            now = datetime.now(timezone.utc)&lt;br&gt;
            if len(_alert_last_sent) &amp;gt;= MAX_ALERT_KEYS:&lt;br&gt;
                stale_cutoff = now - timedelta(seconds=cooldown_seconds * 2)&lt;br&gt;
                stale = [k for k, v in _alert_last_sent.items() if v &amp;lt; stale_cutoff]&lt;br&gt;
                for k in stale:&lt;br&gt;
                    del _alert_last_sent[k]&lt;br&gt;
                if len(_alert_last_sent) &amp;gt;= MAX_ALERT_KEYS and alert_key not in _alert_last_sent:&lt;br&gt;
                    logger.warning("send_alert suppressed: rate limit dict full",&lt;br&gt;
                                   alert_key=alert_key)&lt;br&gt;
                    return&lt;br&gt;
            last = _alert_last_sent.get(alert_key)&lt;br&gt;
            if last and (now - last).total_seconds() &amp;lt; cooldown_seconds:&lt;br&gt;
                return&lt;br&gt;
            _alert_last_sent[alert_key] = now&lt;br&gt;
        logger.critical("ALERT", message=message, alert_key=alert_key)&lt;br&gt;
    except Exception as e:&lt;br&gt;
        logger.error("send_alert failed", error=str(e))&lt;/p&gt;

&lt;p&gt;class ImproperlyConfigured(RuntimeError):&lt;br&gt;
    pass`&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Trace ID Through the Entire Chain&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Each worker gets its own ContextVar automatically. Sharing it between threads is impossible.&lt;/p&gt;

&lt;p&gt;`_trace_id: ContextVar[str] = ContextVar('trace_id', default='')&lt;/p&gt;

&lt;p&gt;def get_trace_id() -&amp;gt; str:&lt;br&gt;
    return _trace_id.get() or 'no-trace'&lt;/p&gt;

&lt;p&gt;def set_trace_id(tid: str) -&amp;gt; None:&lt;br&gt;
    _trace_id.set(tid)&lt;/p&gt;

&lt;p&gt;def new_trace_id() -&amp;gt; str:&lt;br&gt;
    tid = str(uuid.uuid4())&lt;br&gt;
    _trace_id.set(tid)&lt;br&gt;
    return tid&lt;/p&gt;

&lt;p&gt;structlog.configure(&lt;br&gt;
    processors=[&lt;br&gt;
        structlog.processors.add_log_level,&lt;br&gt;
        lambda &lt;em&gt;, _&lt;/em&gt;, event_dict: {**event_dict, "trace_id": get_trace_id()},&lt;br&gt;
        structlog.processors.JSONRenderer(),&lt;br&gt;
    ]&lt;br&gt;
)`&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Idempotency Key via DB Unique Constraint&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The key is built from event_id and event_type, written to a separate table with a unique constraint in the same transaction as the balance change.&lt;br&gt;
Redis doesn't work here, no atomicity with PostgreSQL. A code-level check doesn't work either, two workers will both pass SELECT before INSERT. The only thing that's atomic by definition: a unique constraint.&lt;br&gt;
In the first version I used concatenation f"{event_id}::{event_type}". Got a collision when :: appeared in event_id. Tried a NUL separator: f"{event_id}\0{event_type}".encode(). Also a collision: _idempotency_key("a\x00b", "c") == _idempotency_key("a", "b\x00c"), both produce bytes b"a\x00b\x00c". Final version, length-prefix encoding: each field is preceded by a 4-byte length, collisions between fields are structurally impossible.&lt;/p&gt;

&lt;p&gt;`class RetryableError(Exception):&lt;br&gt;
    pass&lt;/p&gt;

&lt;p&gt;class AlreadyProcessedError(Exception):&lt;br&gt;
    pass&lt;/p&gt;

&lt;p&gt;MAX_AMOUNT = Decimal("10") ** 20&lt;/p&gt;

&lt;p&gt;_AMOUNT_RE = re.compile(r"^[0-9]+(.[0-9]+)?([eE][+-]?[0-9]+)?$")&lt;/p&gt;

&lt;p&gt;def _validate_amount(amount) -&amp;gt; Decimal:&lt;br&gt;
    if isinstance(amount, float):&lt;br&gt;
        raise ValueError(&lt;br&gt;
            f"float is not allowed. Pass amount as str from JSON payload. "&lt;br&gt;
            f"Got: {amount!r}"&lt;br&gt;
        )&lt;br&gt;
    if isinstance(amount, str) and amount != amount.strip():&lt;br&gt;
        raise ValueError(&lt;br&gt;
            f"amount contains whitespace: {amount!r}. "&lt;br&gt;
            f"Pass amount without spaces."&lt;br&gt;
        )&lt;br&gt;
    if isinstance(amount, str) and not _AMOUNT_RE.fullmatch(amount):&lt;br&gt;
        raise ValueError(f"invalid amount format: {amount!r}")&lt;br&gt;
    try:&lt;br&gt;
        amount_decimal = Decimal(str(amount))&lt;br&gt;
        if not amount_decimal.is_finite():&lt;br&gt;
            raise ValueError(f"amount must be finite, got {amount_decimal}")&lt;br&gt;
        if amount_decimal &amp;lt;= 0:&lt;br&gt;
            raise ValueError(f"amount must be positive, got {amount_decimal}")&lt;br&gt;
        if amount_decimal.normalize().as_tuple().exponent &amp;lt; -18:&lt;br&gt;
            raise ValueError(f"amount precision exceeds 18 decimals: {amount_decimal}")&lt;br&gt;
        if amount_decimal &amp;gt;= MAX_AMOUNT:&lt;br&gt;
            raise ValueError(&lt;br&gt;
                f"amount exceeds NUMERIC(38,18) capacity: {amount_decimal} &amp;gt;= 10^20"&lt;br&gt;
            )&lt;br&gt;
        return amount_decimal&lt;br&gt;
    except InvalidOperation:&lt;br&gt;
        raise ValueError(f"invalid amount format: {amount!r}")&lt;/p&gt;

&lt;p&gt;def _idempotency_key(event_id: str, event_type: str) -&amp;gt; str:&lt;br&gt;
    a = event_id.encode("utf-8")&lt;br&gt;
    b = event_type.encode("utf-8")&lt;br&gt;
    payload = (&lt;br&gt;
        len(a).to_bytes(4, "big") + a +&lt;br&gt;
        len(b).to_bytes(4, "big") + b&lt;br&gt;
    )&lt;br&gt;
    return hashlib.sha256(payload).hexdigest()`&lt;/p&gt;

&lt;p&gt;Two places in the first version broke on edge cases: MAX_AMOUNT = Decimal("10")&lt;strong&gt;20 - 1 rejected a valid amount, and 50.0000000000000000000 failed as exponent=-19 even though there are no significant digits there. Fixed: 10&lt;/strong&gt;20 without the -1, and normalize() before checking exponent. It's worth writing tests for the edge cases of your own validators. You learn interesting things.&lt;br&gt;
Another surprise from the same category: _validate_amount("+50.1"), _validate_amount("1_000"), and _validate_amount("١٢٣") all return a valid Decimal. Python is tolerant of underscore notation, leading +, and Eastern Arabic numerals. For a financial validator this is undesirable behavior, the input should strictly be [digits].[digits]. Added regex ^[0-9]+(.[0-9]+)?([eE][+-]?[0-9]+)?$ before Decimal(), rejects everything non-standard.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;FSM of Transitions, Single Source of Truth&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Event status is a deterministic finite state machine. In the beginning there were three places with raw UPDATE payment_events SET status = .... That violated the FSM invariant.&lt;br&gt;
Transitions: pending goes to enqueued via the poller. enqueued to processing when a worker picks up the task. From processing only to confirmed or failed. There's a separate edge, enqueued directly to confirmed, needed for the replay path: when processed_events already contains outcome=success, but the worker crashed after writing that and before transitioning payment_events to confirmed. Without this edge, events would hang in enqueued forever.&lt;/p&gt;

&lt;p&gt;`VALID_TRANSITIONS: dict[str, set[str]] = {&lt;br&gt;
    "pending":    {"enqueued", "failed"},&lt;br&gt;
    "enqueued":   {"processing", "failed", "pending", "confirmed"},&lt;br&gt;
    "processing": {"confirmed", "failed"},&lt;br&gt;
}&lt;/p&gt;

&lt;p&gt;TERMINAL_STATUSES = frozenset({"confirmed", "failed"})&lt;/p&gt;

&lt;p&gt;def transition_event_status(cur, event_id, from_status, to_status):&lt;br&gt;
    if to_status not in VALID_TRANSITIONS.get(from_status, set()):&lt;br&gt;
        raise ValueError(f"invalid transition: {from_status} -&amp;gt; {to_status}")&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cur.execute(
    "UPDATE payment_events SET status = %s, updated_at = NOW() "
    "WHERE event_id = %s AND status = %s",
    (to_status, event_id, from_status),
)

if cur.rowcount == 0:
    cur.execute("SELECT status FROM payment_events WHERE event_id = %s", (event_id,))
    actual = cur.fetchone()
    actual_status = actual["status"] if actual else "not found"

    if actual_status == to_status:
        logger.info("status already set", event_id=event_id, status=to_status)
        return

    if actual_status in TERMINAL_STATUSES:
        raise AlreadyProcessedError(f"event already terminal: {actual_status}")

    raise RetryableError(
        f"concurrent status transition event_id={event_id} "
        f"expected={from_status} actual={actual_status}"
    )`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;_mark_event_failed: safely transitions to failed from any non-terminal status. Commits itself, the rule is simple: failed status must land in the DB no matter what. Everything else can wait. Call only on a clean connection, after rollback.&lt;br&gt;
_mark_event_failed commits itself. If you refactor, this will bite you.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;def _mark_event_failed(conn, event_id) -&amp;gt; bool:&lt;br&gt;
    try:&lt;br&gt;
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:&lt;br&gt;
            cur.execute("SET LOCAL lock_timeout = '2s'")&lt;br&gt;
            cur.execute(&lt;br&gt;
                "SELECT status FROM payment_events WHERE event_id = %s FOR UPDATE NOWAIT",&lt;br&gt;
                (event_id,)&lt;br&gt;
            )&lt;br&gt;
            row = cur.fetchone()&lt;br&gt;
            if row is None:&lt;br&gt;
                conn.rollback()&lt;br&gt;
                return False&lt;br&gt;
            current = row["status"]&lt;br&gt;
            if current in TERMINAL_STATUSES:&lt;br&gt;
                conn.rollback()&lt;br&gt;
                return False&lt;br&gt;
            try:&lt;br&gt;
                transition_event_status(cur, event_id, current, "failed")&lt;br&gt;
                conn.commit()&lt;br&gt;
                return True&lt;br&gt;
            except (ValueError, RetryableError, AlreadyProcessedError):&lt;br&gt;
                conn.rollback()&lt;br&gt;
                return False&lt;br&gt;
    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled):&lt;br&gt;
        try:&lt;br&gt;
            conn.rollback()&lt;br&gt;
        except Exception:&lt;br&gt;
            pass&lt;br&gt;
        return False&lt;br&gt;
    except Exception:&lt;br&gt;
        try:&lt;br&gt;
            conn.rollback()&lt;br&gt;
        except Exception:&lt;br&gt;
            pass&lt;br&gt;
        raise&lt;/code&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;SELECT FOR UPDATE NOWAIT + lock_timeout&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;With a plain FOR UPDATE the worker silently waits for the lock, blocking the thread. NOWAIT eliminates that.&lt;br&gt;
A migration with ALTER TABLE locks the entire table, and lock_timeout = '2s' prevents the worker from hanging for the full duration.&lt;br&gt;
On error codes: lock_timeout throws LockNotAvailable (pgcode 55P03), statement_timeout throws QueryCanceled (pgcode 57014). Confusing them leads to an uncaught exception in production. DeadlockDetected (pgcode 40P01): the transaction was killed by PostgreSQL due to a lock cycle, also transient, also retryable. PostgreSQL picks a victim and rolls back its transaction, a retry resolves the issue. All three need to be caught together.&lt;br&gt;
Connection Pool with Validation&lt;br&gt;
A connection in the pool can be dead: PostgreSQL closes idle connections via tcp_keepalives_idle or idle_in_transaction_session_timeout. Without a check the worker gets a broken TCP and crashes with InterfaceError at a random moment.&lt;br&gt;
_PooledConn: a wrapper around a connection that knows how to return itself to its owner. Via putconn() back to the pool or via close() if the connection was created directly. putconn() is idempotent, the second call is a no-op. getattr doesn't proxy dunder methods, so _PooledConn cannot be used as a context manager. All code works through conn.cursor().&lt;br&gt;
get_validated_conn does three levels of checking without I/O in the normal path: first conn.closed (an in-memory flag), then conn.status (dirty transaction from previous use), and only if status is not STATUS_READY, it runs SELECT 1.&lt;/p&gt;

&lt;p&gt;`class &lt;em&gt;PooledConn:&lt;br&gt;
    def __init&lt;/em&gt;_(self, conn, pool=None):&lt;br&gt;
        self._conn = conn&lt;br&gt;
        self._pool = pool&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def putconn(self, close=False):
    if self._pool is None:
        try:
            self._conn.close()
        except Exception:
            pass
        return
    pool, self._pool = self._pool, None  # idempotent: second call is a no-op
    try:
        pool.putconn(self._conn, close=close)
    except Exception:
        pass

def __getattr__(self, name):
    return getattr(self._conn, name)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;def get_validated_conn(pool: psycopg2.pool.SimpleConnectionPool) -&amp;gt; "_PooledConn":&lt;br&gt;
    try:&lt;br&gt;
        conn = pool.getconn()&lt;br&gt;
    except psycopg2.pool.PoolError as e:&lt;br&gt;
        raise RetryableError(f"DB connection pool exhausted: {e}")&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;if conn.closed != 0:
    try:
        pool.putconn(conn, close=True)
    except Exception:
        pass
    direct = psycopg2.connect(dsn=settings.DATABASE_URL)
    return _PooledConn(direct, pool=None)

if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
    try:
        conn.rollback()
        logger.warning("get_validated_conn: rolled back dirty connection")
    except Exception:
        try:
            pool.putconn(conn, close=True)
        except Exception:
            pass
        direct = psycopg2.connect(dsn=settings.DATABASE_URL)
        return _PooledConn(direct, pool=None)

if conn.status != psycopg2.extensions.STATUS_READY:
    try:
        with conn.cursor() as cur:
            cur.execute("SELECT 1")
    except Exception:
        try:
            pool.putconn(conn, close=True)
        except Exception:
            pass
        direct = psycopg2.connect(dsn=settings.DATABASE_URL)
        return _PooledConn(direct, pool=None)

return _PooledConn(conn, pool)`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Deposit and Withdrawal&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;On INSERT INTO processed_events there are two outcomes: success, we continue (first-time path). UniqueViolation, we've seen this event before (replay path).&lt;br&gt;
On the replay path we check outcome. If success, we sync payment_events status with reality. If pending, another worker is mid-transaction, we throw RetryableError for immediate retry instead of waiting for recover_stale_enqueued_events in 3 minutes.&lt;br&gt;
retry_count resets to 0 on successful processing. Without this: event hit retry, processed successfully with retry_count=7, after 14+ days processed_events got cleaned up by TTL, event arrives again via reorg compensation. It starts at 7/10 to DLQ instead of 0/10.&lt;br&gt;
One special case on replay: processed_events says everything is fine, but payment_events knows nothing about this event. That shouldn't happen. Log it, alert, don't panic.&lt;/p&gt;

&lt;p&gt;`def process_deposit_sync(conn, event_id, event_type, user_id, amount):&lt;br&gt;
    amount_decimal  = _validate_amount(amount)&lt;br&gt;
    idempotency_key = _idempotency_key(event_id, event_type)&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
    cur.execute("SET LOCAL lock_timeout = '2s'")
    cur.execute("SET LOCAL statement_timeout = '5s'")

    try:
        cur.execute(
            "INSERT INTO processed_events (idempotency_key, outcome) "
            "VALUES (%s, 'pending')",
            (idempotency_key,),
        )
    except psycopg2.errors.UniqueViolation:
        conn.rollback()
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur2:
            cur2.execute("SET LOCAL lock_timeout = '2s'")
            cur2.execute("SET LOCAL statement_timeout = '5s'")
            cur2.execute(
                "SELECT outcome FROM processed_events WHERE idempotency_key = %s",
                (idempotency_key,),
            )
            row = cur2.fetchone()
            outcome = row["outcome"] if row else "pending"

            if outcome == "success":
                cur2.execute(
                    "SELECT status FROM payment_events WHERE event_id = %s",
                    (event_id,)
                )
                r = cur2.fetchone()
                current = r["status"] if r else None
                if current is None:
                    logger.error(
                        "deposit_replay: orphan event, processed_events "
                        "exists but payment_event not found",
                        event_id=event_id,
                    )
                    send_alert(
                        f"[CRITICAL] orphan deposit event: {event_id}",
                        alert_key=f"orphan_deposit:{event_id}",
                    )
                elif current == "enqueued":
                    transition_event_status(cur2, event_id, "enqueued", "confirmed")
                elif current == "processing":
                    transition_event_status(cur2, event_id, "processing", "confirmed")
                elif current == "confirmed":
                    pass
                else:
                    conn.rollback()
                    raise RetryableError(
                        f"deposit_replay FSM violation: "
                        f"payment_event.status={current!r} with outcome=success "
                        f"for event_id={event_id}"
                    )
                conn.commit()
                return

        conn.rollback()
        raise RetryableError(
            f"deposit idempotency hit with outcome={outcome!r} for event_id={event_id}"
        )

    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
            psycopg2.errors.DeadlockDetected) as e:
        conn.rollback()
        raise RetryableError(f"timeout on deposit for user {user_id}: {e}")

    try:
        transition_event_status(cur, event_id, "enqueued", "processing")

        cur.execute("SELECT id FROM users WHERE id = %s", (user_id,))
        if cur.fetchone() is None:
            conn.rollback()
            raise ValueError(f"user {user_id} not found")

        cur.execute(
            "UPDATE users SET balance = balance + %s WHERE id = %s",
            (amount_decimal, user_id),
        )
        if cur.rowcount == 0:
            conn.rollback()
            raise ValueError(f"user {user_id} disappeared between SELECT and UPDATE")
    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
            psycopg2.errors.DeadlockDetected) as e:
        conn.rollback()
        raise RetryableError(f"lock/timeout on deposit first-time path for user {user_id}: {e}")

    try:
        cur.execute(
            "INSERT INTO balance_events "
            "(user_id, amount, event_type, source_event_id, created_at) "
            "VALUES (%s, %s, %s, %s, NOW())",
            (user_id, amount_decimal, event_type, event_id),
        )
    except psycopg2.errors.UniqueViolation:
        conn.rollback()
        raise Exception(
            f"balance_events duplicate without idempotency key violation "
            f"event_id={event_id}, investigate immediately"
        )

    cur.execute(
        "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s",
        (idempotency_key,),
    )
    cur.execute(
        "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s",
        (event_id,),
    )
    transition_event_status(cur, event_id, "processing", "confirmed")
    conn.commit()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;WithdrawalOutcome = Literal["success", "insufficient_funds"]&lt;/p&gt;

&lt;p&gt;def process_withdrawal_sync(conn, event_id, event_type, user_id, amount) -&amp;gt; WithdrawalOutcome:&lt;br&gt;
    amount_decimal  = _validate_amount(amount)&lt;br&gt;
    idempotency_key = _idempotency_key(event_id, event_type)&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
    cur.execute("SET LOCAL lock_timeout = '2s'")
    cur.execute("SET LOCAL statement_timeout = '5s'")

    try:
        cur.execute(
            "INSERT INTO processed_events (idempotency_key, outcome) "
            "VALUES (%s, 'pending')",
            (idempotency_key,),
        )
    except psycopg2.errors.UniqueViolation:
        conn.rollback()
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as lc:
            lc.execute("SET LOCAL lock_timeout = '2s'")
            lc.execute("SET LOCAL statement_timeout = '5s'")
            lc.execute(
                "SELECT outcome FROM processed_events WHERE idempotency_key = %s",
                (idempotency_key,),
            )
            stored = (lc.fetchone() or {"outcome": "pending"})["outcome"]

            if stored == "success":
                lc.execute(
                    "SELECT status FROM payment_events WHERE event_id = %s",
                    (event_id,)
                )
                r = lc.fetchone()
                current = r["status"] if r else None
                if current is None:
                    logger.error(
                        "withdrawal_replay: orphan event, processed_events "
                        "exists but payment_event not found",
                        event_id=event_id,
                    )
                    send_alert(
                        f"[CRITICAL] orphan withdrawal event: {event_id}",
                        alert_key=f"orphan_withdrawal:{event_id}",
                    )
                elif current == "enqueued":
                    transition_event_status(lc, event_id, "enqueued", "confirmed")
                elif current == "processing":
                    transition_event_status(lc, event_id, "processing", "confirmed")
                elif current == "confirmed":
                    pass
                else:
                    conn.rollback()
                    raise RetryableError(
                        f"withdrawal_replay FSM violation: "
                        f"payment_event.status={current!r} with outcome=success "
                        f"for event_id={event_id}"
                    )
                conn.commit()
                return "success"
            elif stored == "insufficient_funds":
                lc.execute(
                    "SELECT status FROM payment_events WHERE event_id = %s",
                    (event_id,)
                )
                r = lc.fetchone()
                current = r["status"] if r else None
                if current == "enqueued":
                    transition_event_status(lc, event_id, "enqueued", "failed")
                elif current == "processing":
                    transition_event_status(lc, event_id, "processing", "failed")
                elif current is None:
                    logger.error(
                        "withdrawal_replay insufficient_funds: orphan event",
                        event_id=event_id,
                    )
                    send_alert(
                        f"[CRITICAL] orphan withdrawal (insufficient_funds): {event_id}",
                        alert_key=f"orphan_withdrawal_insuf:{event_id}",
                    )
                conn.commit()
                return "insufficient_funds"
            else:
                conn.rollback()
                raise RetryableError(
                    f"withdrawal idempotency hit with outcome={stored!r} "
                    f"for event_id={event_id}"
                )

    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
            psycopg2.errors.DeadlockDetected) as e:
        conn.rollback()
        raise RetryableError(f"lock/timeout on withdrawal INSERT for user {user_id}: {e}")

    try:
        cur.execute(
            "SELECT balance FROM users WHERE id = %s FOR UPDATE NOWAIT",
            (user_id,),
        )
    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
            psycopg2.errors.DeadlockDetected) as e:
        conn.rollback()
        raise RetryableError(f"lock/timeout/deadlock on user lock for user {user_id}: {e}")

    row = cur.fetchone()
    if row is None:
        conn.rollback()
        raise ValueError(f"user {user_id} not found")

    try:
        if row["balance"] &amp;lt; amount_decimal:
            cur.execute(
                "UPDATE processed_events SET outcome = 'insufficient_funds' "
                "WHERE idempotency_key = %s",
                (idempotency_key,),
            )
            transition_event_status(cur, event_id, "enqueued", "failed")
            conn.commit()
            return "insufficient_funds"

        transition_event_status(cur, event_id, "enqueued", "processing")

        cur.execute(
            "UPDATE users SET balance = balance - %s WHERE id = %s",
            (amount_decimal, user_id),
        )

        try:
            cur.execute(
                "INSERT INTO balance_events "
                "(user_id, amount, event_type, source_event_id, created_at) "
                "VALUES (%s, %s, %s, %s, NOW())",
                (user_id, -amount_decimal, event_type, event_id),
            )
        except psycopg2.errors.UniqueViolation:
            conn.rollback()
            raise Exception(
                f"balance_events duplicate without idempotency key violation "
                f"event_id={event_id}, investigate immediately"
            )

        cur.execute(
            "UPDATE processed_events SET outcome = 'success' WHERE idempotency_key = %s",
            (idempotency_key,),
        )
        cur.execute(
            "UPDATE payment_events SET retry_count = 0 WHERE event_id = %s",
            (event_id,),
        )
        transition_event_status(cur, event_id, "processing", "confirmed")
        conn.commit()
        return "success"
    except (psycopg2.errors.LockNotAvailable, psycopg2.errors.QueryCanceled,
            psycopg2.errors.DeadlockDetected) as e:
        conn.rollback()
        raise RetryableError(f"lock/timeout on withdrawal path for user {user_id}: {e}")`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Webhook: Outbox Pattern Instead of Direct .delay()&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Calling .delay() directly from the webhook handler creates a window between the DB write and the queue insertion. If the process dies in that moment, the event hangs in pending forever.&lt;br&gt;
Solution: the webhook only writes to the DB. A separate poller every 5 seconds picks up pending events, atomically changes statuses and commits, and only then puts them into the Celery queue. Commit first, then enqueue. Otherwise the worker starts before the DB knows about enqueued.&lt;br&gt;
Alchemy and Infura only know tx-hash and recipient address. The mapping from to_address to user_id is done via a separate query to the deposit_addresses table. That layer is not in this article, but without it an attacker with the HMAC key can credit money to any user_id. Worth keeping in mind.&lt;br&gt;
verify_webhook_signature accepts raw_body as bytes before JSON parsing. The signature is computed over the original bytes. secrets.compare_digest protects against timing attacks.&lt;/p&gt;

&lt;p&gt;`import asyncpg&lt;br&gt;
from fastapi import FastAPI, Request, HTTPException&lt;br&gt;
from slowapi import Limiter, _rate_limit_exceeded_handler&lt;br&gt;
from slowapi.util import get_remote_address&lt;br&gt;
from slowapi.errors import RateLimitExceeded&lt;/p&gt;

&lt;p&gt;app = FastAPI()&lt;/p&gt;

&lt;p&gt;limiter = Limiter(key_func=get_remote_address, storage_uri=settings.REDIS_URL)&lt;br&gt;
app.state.limiter = limiter&lt;br&gt;
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)&lt;/p&gt;

&lt;p&gt;ALLOWED_EVENT_TYPES = frozenset({"deposit", "airdrop", "withdrawal", "withdrawal_fee"})&lt;/p&gt;

&lt;p&gt;def verify_webhook_signature(raw_body, signature_header, signing_key):&lt;br&gt;
    if not signing_key:&lt;br&gt;
        raise ImproperlyConfigured("WEBHOOK_SECRET is not set")&lt;br&gt;
    if len(signing_key) &amp;lt; 32:&lt;br&gt;
        raise ImproperlyConfigured(&lt;br&gt;
            f"WEBHOOK_SECRET too short: {len(signing_key)} chars, minimum 32"&lt;br&gt;
        )&lt;br&gt;
    if not signature_header:&lt;br&gt;
        return False&lt;br&gt;
    mac = hmac.new(&lt;br&gt;
        key=signing_key.encode("utf-8"),&lt;br&gt;
        msg=raw_body,&lt;br&gt;
        digestmod=hashlib.sha256,&lt;br&gt;
    )&lt;br&gt;
    return secrets.compare_digest(mac.hexdigest(), signature_header)&lt;/p&gt;

&lt;p&gt;@app.post("/webhook/payments")&lt;br&gt;
@limiter.limit("300/minute")&lt;br&gt;
@limiter.limit("30/second")&lt;br&gt;
async def payment_webhook(request: Request):&lt;br&gt;
    raw_body  = await request.body()&lt;br&gt;
    signature = request.headers.get("X-Alchemy-Signature", "")&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;if not verify_webhook_signature(raw_body, signature, settings.WEBHOOK_SECRET):
    raise HTTPException(status_code=401, detail="invalid signature")

trace_id = (
    request.headers.get("X-Request-ID")
    or request.headers.get("X-Alchemy-Request-ID")
    or new_trace_id()
)
set_trace_id(trace_id)

try:
    payload    = json.loads(raw_body)
    event_id   = payload["event_id"]
    event_type = payload["event_type"]
    user_id    = payload["user_id"]
    if not isinstance(payload.get("amount"), str):
        raise HTTPException(status_code=400, detail="amount must be a JSON string, not a number")
    amount_str = payload["amount"]
except (json.JSONDecodeError, KeyError) as e:
    raise HTTPException(status_code=400, detail=f"invalid payload: {e}")

if event_type not in ALLOWED_EVENT_TYPES:
    raise HTTPException(status_code=400, detail=f"unknown event_type: {event_type!r}")

try:
    _validate_amount(amount_str)
except ValueError as e:
    raise HTTPException(status_code=400, detail=f"invalid amount: {e}")

db = request.app.state.db
try:
    async with db.transaction():
        await db.fetchrow(
            "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) "
            "VALUES ($1, $2, $3, $4, 'pending') ON CONFLICT (event_id) DO NOTHING",
            event_id, user_id, amount_str, event_type,
        )
except asyncpg.exceptions.ForeignKeyViolationError:
    logger.warning(
        "orphan webhook event (user not found)",
        event_id=event_id, user_id=user_id,
    )
    raise HTTPException(status_code=400, detail="user not found")

return {"status": "accepted", "trace_id": trace_id}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;FastAPI runs in an async event loop. Blocking psycopg2 there would kill throughput, so we use asyncpg in the webhook. Celery workers are separate processes without an event loop, asyncpg there would only add complexity.&lt;br&gt;
enqueue_pending_events uses the same SAVEPOINT pattern as recover_stale_enqueued_events. Without it, a single event with AlreadyProcessedError from a race with the recoverer would roll back the entire batch of 100 events. They'd stay in pending and get picked up on the next tick, but this shows up in the logs as a lost tick. SAVEPOINT sp_enq per event isolates the error.&lt;/p&gt;

&lt;p&gt;`@shared_task(name="enqueue_pending_events")&lt;br&gt;
def enqueue_pending_events() -&amp;gt; dict:&lt;br&gt;
    conn = get_validated_conn(db_pool)&lt;br&gt;
    events_to_enqueue = []&lt;br&gt;
    try:&lt;br&gt;
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:&lt;br&gt;
            cur.execute("""&lt;br&gt;
                SELECT event_id, event_type, user_id, amount&lt;br&gt;
                FROM payment_events&lt;br&gt;
                WHERE status = 'pending'&lt;br&gt;
                  AND updated_at &amp;lt; NOW() - INTERVAL '5 seconds'&lt;br&gt;
                ORDER BY created_at&lt;br&gt;
                LIMIT 100&lt;br&gt;
                FOR UPDATE SKIP LOCKED&lt;br&gt;
            """)&lt;br&gt;
            events = cur.fetchall()&lt;br&gt;
            events_ok = []&lt;br&gt;
            for event in events:&lt;br&gt;
                try:&lt;br&gt;
                    cur.execute("SAVEPOINT sp_enq")&lt;br&gt;
                    transition_event_status(cur, event['event_id'], "pending", "enqueued")&lt;br&gt;
                    cur.execute("RELEASE SAVEPOINT sp_enq")&lt;br&gt;
                    events_ok.append(event)&lt;br&gt;
                except (ValueError, RetryableError, AlreadyProcessedError) as e:&lt;br&gt;
                    cur.execute("ROLLBACK TO SAVEPOINT sp_enq")&lt;br&gt;
                    cur.execute("RELEASE SAVEPOINT sp_enq")&lt;br&gt;
                    logger.warning("enqueue: skipped event",&lt;br&gt;
                                   event_id=event['event_id'], error=str(e))&lt;br&gt;
            conn.commit()&lt;br&gt;
            events_to_enqueue = list(events_ok)&lt;br&gt;
    except Exception:&lt;br&gt;
        try:&lt;br&gt;
            conn.rollback()&lt;br&gt;
        except Exception:&lt;br&gt;
            pass&lt;br&gt;
        logger.exception("enqueue_pending_events: transition failed")&lt;br&gt;
        raise&lt;br&gt;
    finally:&lt;br&gt;
        try:&lt;br&gt;
            conn.putconn()&lt;br&gt;
        except Exception:&lt;br&gt;
            pass&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;enqueued = 0
for event in events_to_enqueue:
    try:
        process_payment_event.apply_async(
            args=[event['event_id'], event['event_type'],
                  event['user_id'], str(event['amount'])],
            kwargs={"trace_id": str(uuid.uuid4())},
        )
        enqueued += 1
    except Exception:
        logger.exception("apply_async failed", event_id=event['event_id'])

logger.info("enqueue_pending_events done", enqueued=enqueued)
return {"enqueued": enqueued}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;recover_stale_enqueued_events runs every 2 minutes and finds events stuck in enqueued. After MAX_RECOVERY_ATTEMPTS attempts it transitions to failed + DLQ. SAVEPOINT per event, an error in one doesn't roll back the whole batch.&lt;/p&gt;

&lt;p&gt;`MAX_RECOVERY_ATTEMPTS = 10&lt;/p&gt;

&lt;p&gt;@shared_task(name="recover_stale_enqueued_events")&lt;br&gt;
def recover_stale_enqueued_events() -&amp;gt; dict:&lt;br&gt;
    conn      = get_validated_conn(db_pool)&lt;br&gt;
    recovered = 0&lt;br&gt;
    dlqed     = 0&lt;br&gt;
    try:&lt;br&gt;
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:&lt;br&gt;
            cur.execute("""&lt;br&gt;
                SELECT event_id, event_type, user_id, amount, retry_count&lt;br&gt;
                FROM payment_events&lt;br&gt;
                WHERE status = 'enqueued'&lt;br&gt;
                  AND updated_at &amp;lt; NOW() - INTERVAL '3 minutes'&lt;br&gt;
                ORDER BY created_at&lt;br&gt;
                LIMIT 50&lt;br&gt;
                FOR UPDATE SKIP LOCKED&lt;br&gt;
            """)&lt;br&gt;
            stale = cur.fetchall()&lt;br&gt;
            for event in stale:&lt;br&gt;
                try:&lt;br&gt;
                    cur.execute("SAVEPOINT sp_recover")&lt;br&gt;
                    if event['retry_count'] &amp;gt;= MAX_RECOVERY_ATTEMPTS:&lt;br&gt;
                        transition_event_status(cur, event['event_id'], "enqueued", "failed")&lt;br&gt;
                        cur.execute(&lt;br&gt;
                            "INSERT INTO dead_letter_queue "&lt;br&gt;
                            "(event_id, event_type, user_id, amount, error) "&lt;br&gt;
                            "VALUES (%s, %s, %s, %s, %s)",&lt;br&gt;
                            (event['event_id'], event['event_type'], event['user_id'],&lt;br&gt;
                             str(event['amount']),&lt;br&gt;
                             f"exhausted recovery attempts ({MAX_RECOVERY_ATTEMPTS})")&lt;br&gt;
                        )&lt;br&gt;
                        dlqed += 1&lt;br&gt;
                    else:&lt;br&gt;
                        transition_event_status(cur, event['event_id'], "enqueued", "pending")&lt;br&gt;
                        cur.execute(&lt;br&gt;
                            "UPDATE payment_events SET retry_count = retry_count + 1 "&lt;br&gt;
                            "WHERE event_id = %s",&lt;br&gt;
                            (event['event_id'],)&lt;br&gt;
                        )&lt;br&gt;
                        recovered += 1&lt;br&gt;
                    cur.execute("RELEASE SAVEPOINT sp_recover")&lt;br&gt;
                except Exception as sp_exc:&lt;br&gt;
                    try:&lt;br&gt;
                        cur.execute("ROLLBACK TO SAVEPOINT sp_recover")&lt;br&gt;
                        cur.execute("RELEASE SAVEPOINT sp_recover")&lt;br&gt;
                    except Exception:&lt;br&gt;
                        pass&lt;br&gt;
                    logger.error("recover: event skipped due to error",&lt;br&gt;
                                 event_id=event['event_id'], error=str(sp_exc))&lt;br&gt;
            conn.commit()&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    if recovered or dlqed:
        logger.warning("recover_stale_enqueued_events",
                      recovered=recovered, dlqed=dlqed)
    if dlqed:
        send_alert(
            f"[WARNING] {dlqed} events exhausted recovery attempts, check DLQ",
            alert_key="recovery_exhausted",
        )
    return {"recovered": recovered, "dlqed": dlqed}
except Exception:
    try:
        conn.rollback()
    except Exception:
        pass
    logger.exception("recover_stale_enqueued_events failed")
    raise
finally:
    try:
        conn.putconn()
    except Exception:
        pass`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Celery: acks_late + reject_on_worker_lost + Connection Pool&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;acks_late=True: acknowledge to the broker after processing completes, not at receipt. By default Celery acknowledges immediately: the worker crashes mid-processing, the task is lost.&lt;br&gt;
reject_on_worker_lost=True: on SIGKILL/OOM the task is returned to the queue.&lt;br&gt;
If you create the pool before fork, all workers inherit the same file descriptors. Two processes send requests over the same socket and responses get scrambled. That's why the pool is created in worker_process_init.&lt;/p&gt;

&lt;p&gt;`import os&lt;br&gt;
from celery.signals import worker_process_init&lt;/p&gt;

&lt;p&gt;db_pool = None&lt;br&gt;
_local_breaker = None&lt;/p&gt;

&lt;p&gt;@worker_process_init.connect&lt;br&gt;
def init_worker(**kwargs):&lt;br&gt;
    global db_pool, _local_breaker&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;_local_breaker = _InProcessBreaker()

worker_pool = os.environ.get("CELERY_WORKER_POOL", "prefork")
is_threaded = worker_pool in ("gevent", "eventlet")
pool_class  = (
    psycopg2.pool.ThreadedConnectionPool if is_threaded
    else psycopg2.pool.SimpleConnectionPool
)
db_pool = pool_class(minconn=2, maxconn=10, dsn=settings.DATABASE_URL)
logger.info("worker init done", pool=pool_class.__name__, worker_pool=worker_pool)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;def _get_local_breaker() -&amp;gt; "_InProcessBreaker":&lt;br&gt;
    global _local_breaker&lt;br&gt;
    if _local_breaker is None:&lt;br&gt;
        _local_breaker = _InProcessBreaker()&lt;br&gt;
    return _local_breaker&lt;/p&gt;

&lt;p&gt;BACKOFF_BASE_SEC = 1&lt;br&gt;
BACKOFF_CAP_SEC  = 60&lt;/p&gt;

&lt;p&gt;def jittered_backoff(attempt: int) -&amp;gt; float:&lt;br&gt;
    cap = min(BACKOFF_CAP_SEC, BACKOFF_BASE_SEC * (2 ** attempt))&lt;br&gt;
    return random.uniform(0, cap)`&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Two-Level DLQ&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;In the first version I used LPUSH and EXPIRE as two separate commands. A crash between them is possible, the key remains without a TTL and lives forever. Fixed with a pipeline.&lt;br&gt;
On the DLQ schema: the table uses BIGSERIAL PRIMARY KEY, not event_id PRIMARY KEY. This allows storing multiple attempts for a single event_id. Consequence: ON CONFLICT (event_id) DO NOTHING is not valid, event_id has no UNIQUE constraint. Each INSERT creates a new record with the full attempt history.&lt;/p&gt;

&lt;p&gt;`DLQ_REDIS_KEY = "dlq:payment_events"&lt;br&gt;
DLQ_REDIS_TTL = 7 * 24 * 3600  # 7 days&lt;/p&gt;

&lt;p&gt;def save_to_dlq_sync(conn, event_id, event_type, user_id, amount, error):&lt;br&gt;
    payload = {&lt;br&gt;
        "event_id": event_id, "event_type": event_type,&lt;br&gt;
        "user_id": user_id, "amount": str(amount),&lt;br&gt;
        "error": error, "trace_id": get_trace_id(),&lt;br&gt;
    }&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;db_ok = False
try:
    with conn.cursor() as cur:
        cur.execute(
            "INSERT INTO dead_letter_queue "
            "(event_id, event_type, user_id, amount, error, created_at) "
            "VALUES (%s, %s, %s, %s, %s, NOW())",
            (event_id, event_type, user_id, str(amount), error)
        )
        conn.commit()
    db_ok = True
except Exception as db_exc:
    logger.error("DLQ postgres write failed", event_id=event_id, error=str(db_exc))
    try:
        conn.rollback()
    except Exception:
        pass

if db_ok:
    return

try:
    pipe = _redis_client.pipeline()
    pipe.lpush(DLQ_REDIS_KEY, json.dumps(payload))
    pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL)
    pipe.execute()
    logger.warning("DLQ saved to Redis fallback", event_id=event_id)
    return
except redis_lib.RedisError as e:
    logger.error("DLQ redis write failed", event_id=event_id, error=str(e))

logger.critical(
    "DLQ_UNRECOVERABLE",
    event_id=event_id, event_type=event_type,
    user_id=user_id, amount=str(amount),
    error=error, trace_id=get_trace_id(),
    dlq_payload=payload,
)`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;drain_redis_dlq runs on schedule after a DB incident. failed resets to 0 on each successful INSERT, the counter is consecutive, not total. Alternating successes and failures don't trigger the emergency break, but drained &amp;gt; 0 at the same time.&lt;/p&gt;

&lt;p&gt;`DRAIN_BATCH_SIZE = 500&lt;/p&gt;

&lt;p&gt;@shared_task(name="drain_redis_dlq")&lt;br&gt;
def drain_redis_dlq() -&amp;gt; dict:&lt;br&gt;
    drained = 0&lt;br&gt;
    failed  = 0&lt;br&gt;
    conn    = get_validated_conn(db_pool)&lt;br&gt;
    try:&lt;br&gt;
        for _ in range(DRAIN_BATCH_SIZE):&lt;br&gt;
            raw = _redis_client.rpop(DLQ_REDIS_KEY)&lt;br&gt;
            if raw is None:&lt;br&gt;
                break&lt;br&gt;
            try:&lt;br&gt;
                payload = json.loads(raw)&lt;br&gt;
            except json.JSONDecodeError:&lt;br&gt;
                logger.critical(&lt;br&gt;
                    "drain_redis_dlq: malformed JSON in DLQ, item discarded",&lt;br&gt;
                    raw=raw[:200],&lt;br&gt;
                )&lt;br&gt;
                continue&lt;br&gt;
            try:&lt;br&gt;
                with conn.cursor() as cur:&lt;br&gt;
                    cur.execute(&lt;br&gt;
                        "INSERT INTO dead_letter_queue "&lt;br&gt;
                        "(event_id, event_type, user_id, amount, error, created_at) "&lt;br&gt;
                        "VALUES (%s, %s, %s, %s, %s, NOW())",&lt;br&gt;
                        (payload['event_id'], payload['event_type'],&lt;br&gt;
                         payload['user_id'], payload['amount'], payload['error'])&lt;br&gt;
                    )&lt;br&gt;
                    conn.commit()&lt;br&gt;
                drained += 1&lt;br&gt;
                failed = 0&lt;br&gt;
            except Exception as e:&lt;br&gt;
                try:&lt;br&gt;
                    conn.rollback()&lt;br&gt;
                except Exception:&lt;br&gt;
                    pass&lt;br&gt;
                pipe = _redis_client.pipeline()&lt;br&gt;
                pipe.lpush(DLQ_REDIS_KEY, raw)&lt;br&gt;
                pipe.expire(DLQ_REDIS_KEY, DLQ_REDIS_TTL)&lt;br&gt;
                pipe.execute()&lt;br&gt;
                failed += 1&lt;br&gt;
                logger.error(&lt;br&gt;
                    "drain failed, requeued to head",&lt;br&gt;
                    event_id=payload.get('event_id'), error=str(e),&lt;br&gt;
                )&lt;br&gt;
                if failed &amp;gt;= 10:&lt;br&gt;
                    logger.error("drain aborted after 10 consecutive failures")&lt;br&gt;
                    break&lt;br&gt;
    finally:&lt;br&gt;
        conn.putconn()&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;logger.info("drain_redis_dlq done", drained=drained, failed=failed)
return {"drained": drained, "failed": failed}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Celery Task: Full Version&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;`@shared_task(name="process_payment_event", bind=True, max_retries=5, acks_late=True, reject_on_worker_lost=True)&lt;br&gt;
def process_payment_event(self, event_id, event_type, user_id, amount, trace_id=""):&lt;br&gt;
    set_trace_id(trace_id or new_trace_id())&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;conn      = get_validated_conn(db_pool)
committed = False
conn_ok   = True

try:
    if event_type in ("deposit", "airdrop"):
        process_deposit_sync(conn, event_id, event_type, user_id, amount)
        committed = True
    elif event_type in ("withdrawal", "withdrawal_fee"):
        outcome = process_withdrawal_sync(conn, event_id, event_type, user_id, amount)
        committed = True
        if outcome == "insufficient_funds":
            notify_user_insufficient_funds(user_id)
    else:
        logger.error("unknown event_type", event_type=event_type, event_id=event_id)
        try:
            conn.rollback()
        except Exception:
            pass
        try:
            _mark_event_failed(conn, event_id)
        except Exception as mark_exc:
            logger.error("_mark_event_failed raised",
                        event_id=event_id, error=str(mark_exc))
        save_to_dlq_sync(
            conn, event_id, event_type, user_id, amount,
            f"unknown event_type: {event_type!r}"
        )
        raise Ignore()
except AlreadyProcessedError as exc:
    logger.info("event already processed", event_id=event_id, reason=str(exc))
    raise Ignore()
except RetryableError as exc:
    delay = jittered_backoff(self.request.retries)
    logger.warning(
        "retrying", event_id=event_id,
        attempt=self.request.retries, delay=delay, reason=str(exc),
    )
    try:
        raise self.retry(exc=exc, countdown=delay)
    except MaxRetriesExceededError:
        conn.rollback()
        _mark_event_failed(conn, event_id)
        save_to_dlq_sync(conn, event_id, event_type, user_id, amount,
                         f"retries exhausted: {exc}")
        raise Ignore()
except Ignore:
    raise
except Exception as exc:
    logger.exception("unhandled error", event_id=event_id)
    try:
        conn.rollback()
    except Exception:
        pass
    try:
        _mark_event_failed(conn, event_id)
    except Exception as mark_exc:
        logger.error("_mark_event_failed raised in catch-all",
                    event_id=event_id, error=str(mark_exc))
    try:
        save_to_dlq_sync(conn, event_id, event_type, user_id, amount, str(exc))
    except Exception as dlq_exc:
        logger.critical(
            "DLQ write failed, manual recovery required",
            event_id=event_id, trace_id=get_trace_id(),
            original_error=str(exc), dlq_error=str(dlq_exc),
        )
    self.update_state(state="FAILURE", meta={"error": str(exc)})
    raise Ignore()
finally:
    if not committed:
        try:
            conn.rollback()
        except Exception as rb_exc:
            logger.error("rollback failed", event_id=event_id, error=str(rb_exc))
            conn_ok = False
    conn.putconn(close=not conn_ok)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;def notify_user_insufficient_funds(user_id: int) -&amp;gt; None:&lt;br&gt;
    pass`&lt;/p&gt;

&lt;p&gt;notify_user_insufficient_funds: a stub. In prod you need an outbox record inside process_withdrawal_sync before the final commit. Calling it from here (after commit) means out-of-band: a separate transaction, separate delivery guarantees.&lt;br&gt;
There's a trap hidden here that's specific to at-least-once. The provider sends one event three times. process_payment_event runs three times. The balance won't change (idempotency via unique constraint). But notify gets called three times, the user receives three "insufficient funds" notifications instead of one. DB idempotency doesn't automatically cover side effects outside the transaction.&lt;br&gt;
Second problem with this placement: if notify throws an exception, it lands in the except Exception catch-all, which writes the event to DLQ, even though the money was already deducted correctly and the business transaction committed. That's noise in the DLQ that will hide real incidents.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Circuit Breaker for Web3 RPC&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Three-phase: closed, open, half-open, closed. Redis as shared state between instances, in-process breaker as fallback when Redis is unavailable.&lt;br&gt;
_InProcessBreaker: a simple per-process counter with a lock. Opens after RPC_ERROR_THRESHOLD errors, closes automatically after RPC_COOLDOWN_SEC seconds. Needed specifically as a fallback: if Redis itself is unavailable, the circuit breaker shouldn't stop working.&lt;br&gt;
The goal is to run only one test request while the breaker is open. SET nx=True guarantees only the first worker gets permission, and the rest see the probe key is occupied.&lt;/p&gt;

&lt;p&gt;`import web3&lt;br&gt;
from prometheus_client import Counter&lt;/p&gt;

&lt;p&gt;rpc_errors_total = Counter("web3_rpc_errors_total", "Web3 RPC failures", ["method"])&lt;/p&gt;

&lt;p&gt;_w3 = web3.Web3(web3.HTTPProvider(settings.ETH_RPC_URL))&lt;/p&gt;

&lt;p&gt;FINALIZED_CACHE_TTL   = 86_400&lt;br&gt;
PENDING_CACHE_TTL     = 30&lt;br&gt;
CACHE_PREFIX          = "eth:fin:"&lt;br&gt;
RPC_ERROR_THRESHOLD   = 5&lt;br&gt;
RPC_COOLDOWN_SEC      = 60&lt;br&gt;
RPC_ERROR_WINDOW_SEC  = 30&lt;br&gt;
HALF_OPEN_PROBE_KEY   = "circuit:web3:half_open_probe"&lt;br&gt;
HALF_OPEN_PROBE_TTL   = 10&lt;/p&gt;

&lt;p&gt;@dataclass&lt;br&gt;
class _InProcessBreaker:&lt;br&gt;
    _lock:       threading.Lock = field(default_factory=threading.Lock)&lt;br&gt;
    _errors:     int            = 0&lt;br&gt;
    _open_until: "datetime | None" = None&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def is_open(self) -&amp;gt; bool:
    with self._lock:
        if self._open_until is None:
            return False
        if datetime.now(timezone.utc) &amp;gt; self._open_until:
            self._open_until = None
            self._errors = 0
            return False
        return True

def record_error(self) -&amp;gt; None:
    with self._lock:
        self._errors += 1
        if self._errors &amp;gt;= RPC_ERROR_THRESHOLD:
            self._open_until = datetime.now(timezone.utc) + timedelta(seconds=RPC_COOLDOWN_SEC)

def record_success(self) -&amp;gt; None:
    with self._lock:
        self._errors     = 0
        self._open_until = None
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;def _is_circuit_open() -&amp;gt; bool:&lt;br&gt;
    try:&lt;br&gt;
        if _redis_client.get("circuit:web3:open"):&lt;br&gt;
            is_probe = _redis_client.set(&lt;br&gt;
                HALF_OPEN_PROBE_KEY, "1", nx=True, ex=HALF_OPEN_PROBE_TTL&lt;br&gt;
            )&lt;br&gt;
            if is_probe:&lt;br&gt;
                return False&lt;br&gt;
            return True&lt;br&gt;
    except redis_lib.RedisError:&lt;br&gt;
        pass&lt;br&gt;
    return _get_local_breaker().is_open()&lt;/p&gt;

&lt;p&gt;def _record_rpc_error(method: str) -&amp;gt; None:&lt;br&gt;
    rpc_errors_total.labels(method=method).inc()&lt;br&gt;
    _get_local_breaker().record_error()&lt;br&gt;
    try:&lt;br&gt;
        pipe  = _redis_client.pipeline()&lt;br&gt;
        pipe.incr("circuit:web3:errors")&lt;br&gt;
        pipe.expire("circuit:web3:errors", RPC_ERROR_WINDOW_SEC)&lt;br&gt;
        count, _ = pipe.execute()&lt;br&gt;
        if count &amp;gt;= RPC_ERROR_THRESHOLD:&lt;br&gt;
            open_pipe = _redis_client.pipeline()&lt;br&gt;
            open_pipe.setex("circuit:web3:open", RPC_COOLDOWN_SEC, "1")&lt;br&gt;
            open_pipe.set(HALF_OPEN_PROBE_KEY, "1", ex=HALF_OPEN_PROBE_TTL)&lt;br&gt;
            open_pipe.execute()&lt;br&gt;
            logger.critical("web3 circuit breaker opened", count=count)&lt;br&gt;
            send_alert(&lt;br&gt;
                f"[CRITICAL] Web3 RPC circuit breaker OPEN, "&lt;br&gt;
                f"{count} failures in {RPC_ERROR_WINDOW_SEC}s.",&lt;br&gt;
                alert_key="web3_breaker_open",&lt;br&gt;
            )&lt;br&gt;
        else:&lt;br&gt;
            if not _redis_client.exists("circuit:web3:open"):&lt;br&gt;
                _redis_client.delete(HALF_OPEN_PROBE_KEY)&lt;br&gt;
    except redis_lib.RedisError as e:&lt;br&gt;
        logger.warning("circuit breaker state write failed", error=str(e))&lt;/p&gt;

&lt;p&gt;def _record_rpc_success() -&amp;gt; None:&lt;br&gt;
    _get_local_breaker().record_success()&lt;br&gt;
    try:&lt;br&gt;
        _redis_client.delete("circuit:web3:errors")&lt;br&gt;
        _redis_client.delete(HALF_OPEN_PROBE_KEY)&lt;br&gt;
        if _redis_client.delete("circuit:web3:open"):&lt;br&gt;
            logger.info("web3 circuit breaker closed")&lt;br&gt;
            send_alert(&lt;br&gt;
                "[INFO] Web3 RPC circuit breaker CLOSED, recovered",&lt;br&gt;
                alert_key="web3_breaker_closed",&lt;br&gt;
            )&lt;br&gt;
    except redis_lib.RedisError:&lt;br&gt;
        pass&lt;/p&gt;

&lt;p&gt;def is_transaction_finalized(tx_hash: str) -&amp;gt; bool:&lt;br&gt;
    if _is_circuit_open():&lt;br&gt;
        logger.warning("web3 circuit open, skipping", tx_hash=tx_hash)&lt;br&gt;
        return False&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cache_key = f"{CACHE_PREFIX}{tx_hash}"
try:
    cached = _redis_client.get(cache_key)
    if cached == "1":
        return True
    if cached == "0":
        return False
except redis_lib.RedisError as e:
    logger.warning("redis cache unavailable", tx_hash=tx_hash, error=str(e))

method = "get_transaction_receipt"
try:
    receipt = _w3.eth.get_transaction_receipt(tx_hash)
    if receipt is None:
        return False
    method = "get_block_finalized"
    finalized_block = _w3.eth.get_block("finalized")["number"]
    result = receipt["blockNumber"] &amp;lt;= finalized_block
    _record_rpc_success()
except Exception as e:
    logger.error("eth rpc error", tx_hash=tx_hash, error=str(e))
    _record_rpc_error(method)
    return False

try:
    ttl = FINALIZED_CACHE_TTL if result else PENDING_CACHE_TTL
    _redis_client.setex(cache_key, ttl, "1" if result else "0")
except redis_lib.RedisError as e:
    logger.warning("redis cache write failed", tx_hash=tx_hash, error=str(e))

return result`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  &lt;strong&gt;Monitoring&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;hot_path_balance_check: not a full financial reconciliation, just monitoring of the hot path over the last 10 minutes. users.balance and SUM(balance_events) are read in a single JOIN query, the data snapshot is taken atomically. REPEATABLE_READ here is defensive engineering: it guarantees a consistent snapshot at the transaction level and protects against phantom reads if the transaction grows to multiple statements in the future. A full historical reconciliation requires a nightly job, that's in the backlog.&lt;br&gt;
set_isolation_level can throw if the connection is broken. Without a try/except around it the connection will return to the pool in REPEATABLE_READ, and the next user won't expect that behavior.&lt;/p&gt;

&lt;p&gt;`from prometheus_client import Counter&lt;br&gt;
hot_path_runs = Counter("hot_path_balance_check_runs_total", "Runs of hot path balance check")&lt;/p&gt;

&lt;p&gt;@shared_task(name="hot_path_balance_check")&lt;br&gt;
def hot_path_balance_check() -&amp;gt; None:&lt;br&gt;
    conn    = get_validated_conn(db_pool)&lt;br&gt;
    conn_ok = True&lt;br&gt;
    try:&lt;br&gt;
        conn.set_isolation_level(&lt;br&gt;
            psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ&lt;br&gt;
        )&lt;br&gt;
        try:&lt;br&gt;
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:&lt;br&gt;
                cur.execute("SET LOCAL statement_timeout = '10s'")&lt;br&gt;
                cur.execute("""&lt;br&gt;
                    SELECT&lt;br&gt;
                        u.id,&lt;br&gt;
                        u.balance                                        AS actual_balance,&lt;br&gt;
                        u.initial_balance + COALESCE(SUM(be.amount), 0) AS calculated_balance&lt;br&gt;
                    FROM users u&lt;br&gt;
                    INNER JOIN (&lt;br&gt;
                        SELECT DISTINCT user_id FROM balance_events&lt;br&gt;
                        WHERE created_at &amp;gt; NOW() - INTERVAL '10 minutes'&lt;br&gt;
                    ) recent ON recent.user_id = u.id&lt;br&gt;
                    LEFT JOIN balance_events be ON be.user_id = u.id&lt;br&gt;
                    GROUP BY u.id, u.balance, u.initial_balance&lt;br&gt;
                    HAVING u.balance != u.initial_balance + COALESCE(SUM(be.amount), 0)&lt;br&gt;
                """)&lt;br&gt;
                mismatches = cur.fetchall()&lt;br&gt;
            conn.commit()&lt;br&gt;
        except Exception:&lt;br&gt;
            conn.rollback()&lt;br&gt;
            raise&lt;br&gt;
        finally:&lt;br&gt;
            try:&lt;br&gt;
                conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT)&lt;br&gt;
            except Exception:&lt;br&gt;
                conn_ok = False&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    hot_path_runs.inc()

    if mismatches:
        send_alert(
            f"[CRITICAL] balance mismatch: {[dict(m) for m in mismatches]}",
            alert_key="balance_mismatch",
        )

except Exception:
    logger.exception("hot_path_balance_check failed")
    try:
        conn.rollback()
    except Exception:
        pass
    send_alert(
        "[WARNING] hot_path_balance_check failed, check worker logs",
        alert_key="balance_check_failed",
    )
    raise
finally:
    conn.putconn(close=not conn_ok)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;@shared_task(name="alert_zombie_events")&lt;br&gt;
def alert_zombie_events() -&amp;gt; None:&lt;br&gt;
    conn = get_validated_conn(db_pool)&lt;br&gt;
    try:&lt;br&gt;
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:&lt;br&gt;
            cur.execute("""&lt;br&gt;
                SELECT event_id, user_id, status, updated_at FROM payment_events&lt;br&gt;
                WHERE (status = 'processing' AND updated_at &amp;lt; NOW() - INTERVAL '5 minutes')&lt;br&gt;
                   OR (status IN ('pending', 'enqueued') AND updated_at &amp;lt; NOW() - INTERVAL '15 minutes')&lt;br&gt;
            """)&lt;br&gt;
            zombies = cur.fetchall()&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;        cur.execute("""
            SELECT COUNT(*) AS dlq_size FROM dead_letter_queue
            WHERE created_at &amp;gt; NOW() - INTERVAL '1 hour'
        """)
        recent_dlq = cur.fetchone()['dlq_size']

        cur.execute("""
            SELECT COUNT(*) AS stuck FROM processed_events
            WHERE outcome = 'pending' AND created_at &amp;lt; NOW() - INTERVAL '10 minutes'
        """)
        stuck_pending = cur.fetchone()['stuck']

    if zombies:
        send_alert(
            f"[WARNING] zombie events: {[z['event_id'] for z in zombies[:20]]}",
            alert_key="zombie_events",
        )
    if recent_dlq &amp;gt; 10:
        send_alert(
            f"[WARNING] {recent_dlq} events in DLQ last hour, investigate",
            alert_key="dlq_flood",
        )
    if stuck_pending:
        send_alert(
            f"[CRITICAL] {stuck_pending} stuck 'pending' rows in processed_events, "
            f"architectural invariant broken, investigate urgently",
            alert_key="processed_events_stuck_pending",
        )
finally:
    try:
        conn.rollback()
    except Exception:
        pass
    conn.putconn()`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Prometheus alerts:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;`- alert: HotPathBalanceCheckNotRunning&lt;br&gt;
  expr: increase(hot_path_balance_check_runs_total[6m]) == 0&lt;br&gt;
  for: 0m&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;alert: Web3RPCCircuitOpen
expr: increase(web3_rpc_errors_total[5m]) &amp;gt; 5
for: 0m&lt;/li&gt;
&lt;li&gt;alert: CeleryHighRetryRate
expr: rate(celery_tasks_total{state="retry"}[5m])
  / rate(celery_tasks_total{state="success"}[5m]) &amp;gt; 0.1&lt;/li&gt;
&lt;li&gt;alert: CeleryQueueDepth
expr: celery_queue_depth &amp;gt; 500
for: 5m`&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Beat schedule:&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;`from celery.schedules import crontab&lt;/p&gt;

&lt;p&gt;beat_schedule = {&lt;br&gt;
    "enqueue-pending-events":        {"task": "enqueue_pending_events",        "schedule": 5.0},&lt;br&gt;
    "recover-stale-enqueued-events": {"task": "recover_stale_enqueued_events", "schedule": 120.0},&lt;br&gt;
    "cleanup-processed-events":      {"task": "cleanup_processed_events",      "schedule": crontab(hour=3, minute=0)},&lt;br&gt;
    "hot-path-balance-check":        {"task": "hot_path_balance_check",        "schedule": 60.0},&lt;br&gt;
    "alert-zombie-events":           {"task": "alert_zombie_events",           "schedule": 60.0},&lt;br&gt;
    "drain-redis-dlq":               {"task": "drain_redis_dlq",               "schedule": crontab(hour=4, minute=0)},&lt;br&gt;
}`&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Cleanup&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;processed_events only cleans up terminal outcomes. outcome='pending' is never touched. This protects against silently deleting events mid-processing. The connection is acquired and returned to the pool on each batch, not held for the entire task duration.&lt;br&gt;
A 14-day TTL is safe thanks to UNIQUE (source_event_id, event_type) on balance_events: even if the provider replays an event after 14+ days and processed_events has been cleaned up, a repeat INSERT into balance_events is rejected by the unique constraint. Even if the application fails, the unique constraint on balance_events won't let a duplicate through. The database is the last line of defense.&lt;/p&gt;

&lt;p&gt;`CLEANUP_BATCH_SIZE    = 5_000&lt;br&gt;
CLEANUP_BATCH_PAUSE   = 0.1&lt;br&gt;
CLEANUP_MAX_BATCHES   = 200&lt;br&gt;
CLEANUP_TTL_DAYS      = 14&lt;br&gt;
CLEANUP_SAFE_STATUSES = ("success", "insufficient_funds")&lt;/p&gt;

&lt;p&gt;@shared_task(name="cleanup_processed_events")&lt;br&gt;
def cleanup_processed_events() -&amp;gt; dict:&lt;br&gt;
    import time&lt;br&gt;
    total_deleted = 0&lt;br&gt;
    batches = 0&lt;br&gt;
    for _ in range(CLEANUP_MAX_BATCHES):&lt;br&gt;
        conn = get_validated_conn(db_pool)&lt;br&gt;
        try:&lt;br&gt;
            with conn.cursor() as cur:&lt;br&gt;
                cur.execute("""&lt;br&gt;
                    DELETE FROM processed_events&lt;br&gt;
                    WHERE idempotency_key IN (&lt;br&gt;
                        SELECT idempotency_key FROM processed_events&lt;br&gt;
                        WHERE outcome = ANY(%s)&lt;br&gt;
                          AND created_at &amp;lt; NOW() - (%s * INTERVAL '1 day')&lt;br&gt;
                        LIMIT %s&lt;br&gt;
                        FOR UPDATE SKIP LOCKED&lt;br&gt;
                    )&lt;br&gt;
                """, (list(CLEANUP_SAFE_STATUSES), CLEANUP_TTL_DAYS, CLEANUP_BATCH_SIZE))&lt;br&gt;
                deleted = cur.rowcount&lt;br&gt;
                conn.commit()&lt;br&gt;
        except Exception:&lt;br&gt;
            try:&lt;br&gt;
                conn.rollback()&lt;br&gt;
            except Exception:&lt;br&gt;
                pass&lt;br&gt;
            raise&lt;br&gt;
        finally:&lt;br&gt;
            conn.putconn()&lt;br&gt;
        total_deleted += deleted&lt;br&gt;
        batches += 1&lt;br&gt;
        if deleted &amp;lt; CLEANUP_BATCH_SIZE:&lt;br&gt;
            break&lt;br&gt;
        time.sleep(CLEANUP_BATCH_PAUSE)&lt;br&gt;
    return {"batches": batches, "deleted": total_deleted}`&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Tests and What They Don't Cover&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;You can't verify idempotency with mocks. The unique constraint has to work exactly as in prod, which means you need a real database. threading doesn't work here, the GIL gets in the way of properly reproducing a race condition. So: multiprocessing.&lt;br&gt;
The balance can add up correctly while balance_events is still duplicated. Check both. That's exactly how it happens: the balance looks fine, everything appears clean, and the duplicates in balance_events only surface when an auditor shows up.&lt;/p&gt;

&lt;p&gt;`import pytest&lt;br&gt;
import psycopg2&lt;br&gt;
import psycopg2.extras&lt;br&gt;
import multiprocessing&lt;br&gt;
from decimal import Decimal&lt;/p&gt;

&lt;p&gt;TEST_DSN = "host=localhost port=5433 dbname=testdb user=testuser password=testuser"&lt;/p&gt;

&lt;p&gt;@pytest.fixture(scope="session")&lt;br&gt;
def db_conn_session():&lt;br&gt;
    conn = psycopg2.connect(TEST_DSN)&lt;br&gt;
    yield conn&lt;br&gt;
    conn.close()&lt;/p&gt;

&lt;p&gt;@pytest.fixture&lt;br&gt;
def db_conn(db_conn_session):&lt;br&gt;
    conn = db_conn_session&lt;br&gt;
    conn.rollback()&lt;br&gt;
    with conn.cursor() as cur:&lt;br&gt;
        cur.execute("""&lt;br&gt;
            TRUNCATE balance_events, processed_events, payment_events,&lt;br&gt;
                     dead_letter_queue, users&lt;br&gt;
            RESTART IDENTITY CASCADE&lt;br&gt;
        """)&lt;br&gt;
        cur.execute(&lt;br&gt;
            "INSERT INTO users (id, balance, initial_balance) VALUES (1, 100.0, 100.0)"&lt;br&gt;
        )&lt;br&gt;
    conn.commit()&lt;br&gt;
    yield conn&lt;br&gt;
    try:&lt;br&gt;
        conn.rollback()&lt;br&gt;
    except Exception:&lt;br&gt;
        pass&lt;/p&gt;

&lt;p&gt;def &lt;em&gt;deposit_worker(dsn, event_id, event_type, user_id, amount, barrier, q):&lt;br&gt;
    conn = psycopg2.connect(dsn)&lt;br&gt;
    try:&lt;br&gt;
        barrier.wait(timeout=10)&lt;br&gt;
        process_deposit_sync(conn, event_id, event_type, user_id, amount)&lt;br&gt;
        q.put(("ok", None))&lt;br&gt;
    except Exception as e:&lt;br&gt;
        q.put(("err", f"{type(e).&lt;/em&gt;&lt;em&gt;name&lt;/em&gt;_}: {e}"))&lt;br&gt;
    finally:&lt;br&gt;
        conn.close()&lt;/p&gt;

&lt;p&gt;def test_duplicate_deposits_produce_single_credit(db_conn):&lt;br&gt;
    """10 workers, ONE event_id, simulating at-least-once delivery."""&lt;br&gt;
    with db_conn.cursor() as cur:&lt;br&gt;
        cur.execute(&lt;br&gt;
            "INSERT INTO payment_events (event_id, user_id, amount, event_type, status) "&lt;br&gt;
            "VALUES ('evt_dup', 1, '50.0', 'deposit', 'enqueued')"&lt;br&gt;
        )&lt;br&gt;
    db_conn.commit()&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;N = 10&lt;br&gt;
barrier = multiprocessing.Barrier(N)&lt;br&gt;
q = multiprocessing.Queue()&lt;br&gt;
workers = [&lt;br&gt;
    multiprocessing.Process(&lt;br&gt;
        target=_deposit_worker,&lt;br&gt;
        args=(TEST_DSN, "evt_dup", "deposit", 1, "50.0", barrier, q),&lt;br&gt;
    ) for _ in range(N)&lt;br&gt;
]&lt;br&gt;
for w in workers: w.start()&lt;br&gt;
for w in workers: w.join(timeout=20)

&lt;p&gt;with db_conn.cursor() as cur:&lt;br&gt;
    cur.execute("SELECT balance FROM users WHERE id = 1")&lt;br&gt;
    balance = cur.fetchone()[0]&lt;br&gt;
    cur.execute("SELECT COUNT(&lt;em&gt;) FROM balance_events")&lt;br&gt;
    be_count = cur.fetchone()[0]&lt;br&gt;
    cur.execute("SELECT COUNT(&lt;/em&gt;) FROM processed_events")&lt;br&gt;
    pe_count = cur.fetchone()[0]&lt;/p&gt;

&lt;p&gt;assert balance == Decimal("150.0"), f"duplication! balance={balance}"&lt;br&gt;
assert be_count == 1, f"balance_events duplicated: {be_count}"&lt;br&gt;
assert pe_count == 1`&lt;br&gt;
&lt;/p&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  &lt;strong&gt;What Tests Don't Cover&lt;/strong&gt;&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Throughput under real load. NOWAIT serializes access, throughput is achieved through Celery retry with jittered backoff. Load testing requires a separate stand with Celery workers.&lt;br&gt;
Worker crash mid-transaction. reject_on_worker_lost=True requires killing the Celery worker process and verifying the task is returned to the broker. That's an integration test, it lives separately.&lt;br&gt;
Redis DLQ fallback. Requires real Redis and simulating a PostgreSQL outage. Verified through chaos testing.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Backpressure and Degradation&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;Under high concurrency on a single user, RetryableError from NOWAIT accumulates. max_retries=5 gives 6 attempts (initial + 5 retries), total worst-case delay up to roughly 63 seconds. If the incoming webhook throughput exceeds the capacity to drain retries, CeleryQueueDepth will start growing.&lt;br&gt;
When the DLQ starts filling up: alert_zombie_events will catch dlq_size &amp;gt; 10 events/hour, that's the first signal. The PostgreSQL dead_letter_queue grows without a limit, theoretically up to disk size. After that it's manual work: diagnose the cause, fix it, replay from the DLQ.&lt;br&gt;
Auto-replay was deliberately not built. An event landed in the DLQ, which means something went wrong. Let a person figure it out before pushing it back through. I don't want the system making its own decisions about what to do with money that already failed once.&lt;br&gt;
If the Redis broker is unavailable, apply_async always fails. The event stays in enqueued, after 3 minutes recover_stale_enqueued_events moves it back to pending and increments retry_count. After MAX_RECOVERY_ATTEMPTS (10 attempts, roughly 30 minutes), the event goes to DLQ + alert.&lt;br&gt;
acks_late and reject_on_worker_lost protect against worker crashes, not broker crashes. If the Redis master goes down, in-flight tasks are lost. appendonly yes + appendfsync everysec, that's the minimum that should be in place. If you genuinely can't afford to lose data: appendfsync always, but throughput will drop.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;On Blockchain Reorgs&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;For Ethereum after the switch to PoS, finalization happens through two checkpoint epochs (roughly 12-15 minutes). The "12 blocks" heuristic from the PoW era doesn't apply anymore. For L2 (Arbitrum, Optimism) the rules are different. This section covers Ethereum L1 only.&lt;br&gt;
The current implementation doesn't handle reorgs. That's out of scope for the first release. During a reorg, the blockchain "rolls back" several blocks. A transaction may land in the new chain unchanged, or it may disappear entirely, effectively cancelled.&lt;br&gt;
The simplest approach: a compensating entry in balance_events with a negative sign, with a separate idempotency key to avoid conflicting with the original event:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;def handle_reorg_event(original_tx_hash: str, user_id: int, amount: Decimal) -&amp;gt; None:&lt;br&gt;
    reorg_event_id  = f"reorg:{original_tx_hash}"&lt;br&gt;
    idempotency_key = _idempotency_key(reorg_event_id, "reorg_compensation")&lt;br&gt;
    # then standard flow through processed_events + balance_events&lt;/code&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;What's in the Backlog&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The most painful thing right now: notify_user_insufficient_funds is called outside the transaction. It needs an outbox pattern, a record written to an outbox table inside the same transaction as UPDATE users. Without that, under at-least-once delivery the user gets N notifications for one rejection, and when notify fails, DLQ gets records for successfully committed transactions. That's noise that hides real incidents.&lt;br&gt;
Above 50 workers, direct connections hit max_connections, PgBouncer is needed. One mine worth noting before enabling it in transaction pooling mode: hot_path_balance_check uses conn.set_isolation_level(REPEATABLE_READ), a session-level command that PgBouncer in transaction pooling doesn't preserve between transactions. SET LOCAL lock_timeout/statement_timeout work fine (they're transaction-scoped), but isolation level will need to be rewritten as BEGIN ISOLATION LEVEL REPEATABLE READ inline, or use session pooling for that specific task. This is a classic senior-level mine: works in dev, breaks only after enabling PgBouncer in prod.&lt;br&gt;
processed_events keeps growing. Cleanup handles the near term, but partitioning by date becomes inevitable past hundreds of millions of rows.&lt;br&gt;
Further out: Redis broker with appendfsync always, full historical reconciliation via materialized view, full reorg handling implementation, Admin UI for DLQ instead of manual SQL. One trap in the code I haven't touched yet: _mark_event_failed commits itself. When you refactor, it will bite you.&lt;/p&gt;

&lt;h2&gt;
  
  
  &lt;strong&gt;Results&lt;/strong&gt;
&lt;/h2&gt;

&lt;p&gt;The queue draining and the balances matching are two different things. Prometheus shows the first, hot_path_balance_check shows the second. You need both. One alone is not enough.&lt;br&gt;
8 months in production. 0 duplicate credits after deploying the fix, versus 23 in the first month across roughly 180k transactions. Webhook delivery at 100% accounting for provider retry logic.&lt;br&gt;
There's not a single decision here that I designed upfront. Each one closes a hole that had already fired. Idempotency via DB unique constraint, TOCTOU via SELECT FOR UPDATE NOWAIT, lost transactions via acks_late + outbox, FSM via VALID_TRANSITIONS. Each of these in isolation is not a guarantee. Together they cover each other's gaps. Eight months without an incident.&lt;/p&gt;

</description>
      <category>webdev</category>
      <category>backend</category>
      <category>web3</category>
      <category>blockchain</category>
    </item>
  </channel>
</rss>
