<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Adrian obungu</title>
    <description>The latest articles on DEV Community by Adrian obungu (@adrian_obungu_05a35126e1f).</description>
    <link>https://dev.to/adrian_obungu_05a35126e1f</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.us-east-2.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2049744%2Fc8324a50-23f6-4cb6-9b03-e31595cc80f5.png</url>
      <title>DEV Community: Adrian obungu</title>
      <link>https://dev.to/adrian_obungu_05a35126e1f</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/adrian_obungu_05a35126e1f"/>
    <language>en</language>
    <item>
      <title>Building a Highly Concurrent Webhook Processing Pipeline: Lessons from the Synapse Reconciliation Engine</title>
      <dc:creator>Adrian obungu</dc:creator>
      <pubDate>Wed, 24 Jun 2026 20:06:38 +0000</pubDate>
      <link>https://dev.to/adrian_obungu_05a35126e1f/building-a-highly-concurrent-webhook-processing-pipeline-lessons-from-the-synapse-reconciliation-5gok</link>
      <guid>https://dev.to/adrian_obungu_05a35126e1f/building-a-highly-concurrent-webhook-processing-pipeline-lessons-from-the-synapse-reconciliation-5gok</guid>
      <description>&lt;p&gt;Financial webhooks are deceptively simple on the surface. A third-party payment gateway sends an HTTP POST to your endpoint. You process it. You respond. The transaction is recorded.&lt;/p&gt;

&lt;p&gt;In practice, this description conceals a category of engineering problems that only reveal themselves under production conditions: duplicate delivery during network instability, malformed payloads from legacy client versions, floating-point arithmetic errors in financial calculations, and cascading failures when a downstream compliance API degrades. Each of these failure modes, left unaddressed, produces outcomes ranging from duplicate tax invoices to silent data corruption in a financial ledger.&lt;/p&gt;

&lt;p&gt;The Synapse Reconciliation Engine was built to bridge two of East Africa's most critical financial infrastructure components — Safaricom's M-Pesa Daraja API and the Kenya Revenue Authority's eTIMS compliance gateway. This article documents the architectural decisions made at each stage of the ingress-to-compliance pipeline, with a focus on the concurrency model, schema integrity, and graceful degradation strategy that define its production posture.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Ingress &amp;amp; Idempotency&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
*&lt;em&gt;The Fast-Acknowledgement Contract&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Safaricom's Daraja API imposes a hard constraint on webhook consumers: the callback endpoint must return an HTTP 200 OK within a narrow timeout window. If the response is delayed — by a slow database write, a blocked event loop, or an outbound API call — Daraja treats the delivery as failed and retries. Under network instability, this retry behaviour compounds: a single payment event can generate three, five, or more duplicate callback deliveries in rapid succession.&lt;/p&gt;

&lt;p&gt;The naive architectural response is to handle deduplication at the database layer. This is incorrect. By the time a duplicate reaches the database, the application has already consumed event loop time parsing the payload, acquired a connection from the Postgres pool, and potentially triggered an outbound call to the KRA eTIMS API. The cost of the duplicate has already been paid.&lt;/p&gt;

&lt;p&gt;The correct response is to enforce the idempotency guarantee at the ingress boundary, before any downstream I/O occurs.&lt;/p&gt;

&lt;p&gt;Synapse implements this using a Redis SET NX (set if not exists) command with a 24-hour TTL, keyed on Daraja's CheckoutRequestID — a globally unique identifier assigned per STK Push transaction. The operation is atomic by design: Redis guarantees that only one caller can set a given key, regardless of concurrent arrivals. If the key already exists, the callback is a duplicate and is dropped in sub-millisecond time. The HTTP 200 is returned immediately in both cases, satisfying Daraja's acknowledgement contract without triggering any downstream processing.&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;&lt;a class="mentioned-user" href="https://dev.to/router"&gt;@router&lt;/a&gt;.post("/callback")&lt;br&gt;
async def mpesa_callback(&lt;br&gt;
    payload: MpesaWebhookPayload,&lt;br&gt;
    background_tasks: BackgroundTasks,&lt;br&gt;
    request: Request&lt;br&gt;
):&lt;br&gt;
    checkout_id = payload.Body.stkCallback.CheckoutRequestID&lt;br&gt;
    is_novel = await request.app.state.storage.check_idempotency(checkout_id)&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;if not is_novel:
    return {"ResultCode": 0, "ResultDesc": "Duplicate ignored"}

background_tasks.add_task(process_compliance_pipeline, payload, request)
return {"ResultCode": 0, "ResultDesc": "Accepted"}`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The BackgroundTasks dispatch is the architectural boundary between the acknowledgement contract and the compliance pipeline. The event loop returns the HTTP 200 before the background task begins execution, which means Daraja's timeout window is never at risk regardless of downstream latency.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Structural Integrity at the Boundary&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Before idempotency is checked, the payload must be structurally valid. Pydantic v2 enforces this at the function signature level. Any payload that fails schema validation — missing required fields, incorrect types, or malformed nested structures — is rejected with an HTTP 422 Unprocessable Entity before it reaches the application layer. This is not merely a convenience; it is a security boundary. Unvalidated financial payloads reaching business logic are a source of both data corruption and potential injection vectors.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Schema Transformation&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
*&lt;em&gt;The E.164 Normalisation Problem&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Daraja's STK Push webhook delivers the customer's phone number as a metadata item within a dynamic key-value array. The format of this value is inconsistent across client versions and network conditions. Observed formats include 0712345678, +254712345678, 254712345678, 0112345678, and integer-coerced floats such as 254712345678.0 — a consequence of JSON serialisers treating numeric strings as numbers in some SDK implementations.&lt;/p&gt;

&lt;p&gt;Passing any of these raw values downstream creates two problems. First, the KRA eTIMS schema requires a strict E.164 format (254...). Second, normalising the value inside an async coroutine using an uncompiled regex introduces overhead on every invocation — the regex engine must reparse the pattern string each time, which under high concurrency creates measurable event loop contention.&lt;/p&gt;

&lt;p&gt;Synapse addresses both problems with a pre-compiled regex pattern, evaluated once at module import time:&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;PHONE_REGEX = re.compile(r"^(?:(?:+?254)|0)?([17]\d{8})$")&lt;/p&gt;

&lt;p&gt;def safe_normalize_phone(raw_phone: Any, fallback: str = "254700000000") -&amp;gt; str:&lt;br&gt;
    val_str = str(raw_phone).strip().replace(" ", "").replace("-", "")&lt;br&gt;
    if val_str.endswith(".0"):&lt;br&gt;
        val_str = val_str[:-2]&lt;br&gt;
    match = PHONE_REGEX.match(val_str)&lt;br&gt;
    return "254" + match.group(1) if match else fallback`&lt;/p&gt;

&lt;p&gt;The **.endswith(".0") **strip handles the float-coercion edge case before the regex is applied. The fallback value ensures the pipeline does not abort on an unparseable phone number — a deliberate trade-off that prioritises ledger completeness over strict rejection, since the transaction itself is valid even if the phone number is malformed.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Financial Precision and the Decimal Boundary&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Daraja's Amount field is delivered as a JSON number. In Python, JSON numbers without explicit decimal notation are parsed as float. This is a correctness problem for financial systems. IEEE 754 floating-point arithmetic cannot represent all decimal fractions exactly — 0.1 + 0.2 evaluates to 0.30000000000000004. At scale, these rounding errors accumulate into ledger discrepancies that are difficult to audit and impossible to explain to a regulator.&lt;/p&gt;

&lt;p&gt;The Synapse schema transformer converts the Amount field to Decimal at the boundary, using Python's decimal module which provides arbitrary-precision fixed-point arithmetic:&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;from decimal import Decimal, ROUND_HALF_UP&lt;/p&gt;

&lt;p&gt;amount_decimal = Decimal(str(raw_amount)).quantize(&lt;br&gt;
    Decimal("0.01"), rounding=ROUND_HALF_UP&lt;br&gt;
)`&lt;/p&gt;

&lt;p&gt;The conversion via str() is deliberate. Passing a float directly to Decimal() would inherit the floating-point imprecision. Converting to string first forces the decimal module to parse the human-readable representation, which is exact.&lt;/p&gt;

&lt;p&gt;The ROUND_HALF_UP rounding mode aligns with standard accounting conventions and KRA's invoice precision requirements.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Mapping Dynamic Arrays to a Strict Schema&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Daraja's CallbackMetadata.Item is a list of key-value objects rather than a fixed schema. Fields like Amount, MpesaReceiptNumber, and PhoneNumber must be extracted by iterating the array and matching on the Name key. This dynamic structure is incompatible with the strict, typed payload required by the eTIMS saveVscu endpoint.&lt;/p&gt;

&lt;p&gt;The ETIMSTransformer class encapsulates this mapping logic, producing a fully validated Pydantic model as output:&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;@staticmethod&lt;br&gt;
def transform_daraja_to_etims(payload: MpesaWebhookPayload) -&amp;gt; ETIMSInvoicePayload:&lt;br&gt;
    items = {&lt;br&gt;
        item.Name: item.Value&lt;br&gt;
        for item in payload.Body.stkCallback.CallbackMetadata.Item&lt;br&gt;
    }&lt;br&gt;
    return ETIMSInvoicePayload(&lt;br&gt;
        invoiceNumber=items["MpesaReceiptNumber"],&lt;br&gt;
        totalAmount=Decimal(str(items["Amount"])).quantize(Decimal("0.01")),&lt;br&gt;
        phoneNumber=safe_normalize_phone(items.get("PhoneNumber", "")),&lt;br&gt;
        transactionDate=datetime.utcnow().isoformat()&lt;br&gt;
    )&lt;br&gt;
`&lt;/p&gt;

&lt;p&gt;The transformer is a pure function with no side effects, which makes it trivially testable in isolation from the rest of the pipeline.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Graceful Degradation&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
*&lt;em&gt;The Shared Connection Pool&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Outbound HTTP calls to the KRA eTIMS API are the highest-latency operation in the pipeline. Under high concurrency, naive instantiation of a new httpx.AsyncClient per request would exhaust the system's file descriptor limit and saturate the TLS handshake overhead. Synapse avoids this by instantiating a single shared httpx.AsyncClient during the FastAPI lifespan context, configured with explicit connection pool limits:&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;limits = httpx.Limits(&lt;br&gt;
    max_connections=200,&lt;br&gt;
    max_keepalive_connections=100,&lt;br&gt;
    keepalive_expiry=30.0&lt;br&gt;
 )&lt;br&gt;
http_client = httpx.AsyncClient(limits=limits, timeout=httpx.Timeout(10.0 ))`&lt;/p&gt;

&lt;p&gt;The keepalive_expiry of 30 seconds ensures that idle connections are not held open indefinitely, while max_keepalive_connections=100 maintains a warm pool for burst traffic — avoiding TLS handshake overhead on repeated calls to the same endpoint.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Concurrency Capping with asyncio.Semaphore&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
A shared connection pool prevents file descriptor exhaustion, but it does not prevent the application from flooding the KRA API with more concurrent requests than the downstream service can handle. An asyncio.Semaphore provides a soft concurrency cap at the application layer:&lt;/p&gt;

&lt;p&gt;`Python&lt;/p&gt;

&lt;p&gt;ETIMS_SEMAPHORE = asyncio.Semaphore(50)&lt;/p&gt;

&lt;p&gt;async def submit_to_etims(payload: ETIMSInvoicePayload, client: httpx.AsyncClient ):&lt;br&gt;
    async with ETIMS_SEMAPHORE:&lt;br&gt;
        # outbound call executes here`&lt;/p&gt;

&lt;p&gt;The semaphore value of 50 is a deliberate configuration choice: it is low enough to protect the downstream API from being overwhelmed, but high enough to sustain meaningful throughput during peak transaction windows. This value should be tuned against observed KRA API rate limits once production credentials are available.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Exponential Backoff with Jitter&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
Transient failures from the KRA eTIMS API — network timeouts, 502 responses during deployments, TLS renegotiation errors — are expected in any production environment. A naive retry loop without delay creates a retry storm: all failed requests retry simultaneously, amplifying load on an already-degraded service.&lt;/p&gt;

&lt;p&gt;Synapse implements exponential backoff with full jitter on the retry loop:&lt;/p&gt;

&lt;p&gt;&lt;code&gt;Python&lt;br&gt;
for attempt in range(1, MAX_RETRIES + 1):&lt;br&gt;
    try:&lt;br&gt;
        response = await client.post(ETIMS_URL, json=payload.model_dump())&lt;br&gt;
        response.raise_for_status()&lt;br&gt;
        return&lt;br&gt;
    except (httpx.RequestError, httpx.HTTPStatusError ) as exc:&lt;br&gt;
        if attempt == MAX_RETRIES:&lt;br&gt;
            logger.error("eTIMS submission permanently failed", exc_info=exc)&lt;br&gt;
            raise&lt;br&gt;
        sleep_duration = (2 ** attempt) + random.uniform(0, 1)&lt;br&gt;
        await asyncio.sleep(sleep_duration)&lt;br&gt;
&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;random.uniform(0, 1)&lt;/strong&gt; jitter term desynchronises retries across concurrent coroutines. Without it, all coroutines that failed at the same moment would retry at the same moment — recreating the spike that caused the failure in the first place. The jitter term spreads retries across a one-second window, smoothing the load curve on the downstream service.&lt;/p&gt;

&lt;p&gt;*&lt;em&gt;Enterprise Readiness&lt;br&gt;
*&lt;/em&gt;&lt;br&gt;
The decisions documented here — atomic idempotency at ingress, pre-compiled normalisation, Decimal precision at the financial boundary, shared connection pooling, semaphore-capped concurrency, and jittered backoff — are not independent optimisations. They form a coherent philosophy: enforce guarantees as early as possible, minimise the blast radius of any single failure, and design every component to degrade gracefully rather than catastrophically.&lt;/p&gt;

&lt;p&gt;A system that drops duplicate webhooks silently, normalises dirty data without aborting, and retries compliance submissions with controlled backoff is not merely more reliable than one that does not. It is qualitatively different — it is a system that can be operated at scale, audited after an incident, and extended without fear of hidden coupling.&lt;/p&gt;

&lt;p&gt;The Synapse Reconciliation Engine is not yet at full production deployment. The remaining gaps — merchant authentication, a dead letter queue for permanently failed eTIMS submissions, and live KRA sandbox validation — are well-understood and bounded. The architectural foundation, however, is production-grade. That distinction matters when the system processes financial transactions that carry tax compliance obligations.&lt;/p&gt;

&lt;p&gt;_The Synapse Reconciliation Engine is an open architecture middleware layer for the East African digital payments ecosystem. The repository is available at github.com/Adrian-Obungu/synapse-reconciliation-engine.&lt;br&gt;
_&lt;/p&gt;

</description>
      <category>webdev</category>
      <category>python</category>
      <category>architecture</category>
      <category>opensource</category>
    </item>
  </channel>
</rss>
