DEV Community

Daniel Romitelli
Daniel Romitelli

Posted on • Originally published at craftedbydaniel.com

Tracing an Extraction Pipeline Like a Ledger: Trace Nodes, DLQ Boundaries, and Replayable Failures

I noticed the same failure pattern twice: the extraction result was “wrong,” but nothing was actually broken.

The model had simply taken a different branch.

That’s the kind of bug that makes you mistrust everything—your cache, your prompts, even your own memory—because the system looks nondeterministic when it’s really just unobserved. So I built a tracer that turns each extraction run into a ledger: every decision becomes a node, every retry becomes an edge, and every “we chose variant B” becomes a fact I can point at.

Key insight: treat extraction like a traceable workflow, not a single call

The non-obvious move was to stop thinking of “email extraction” as one operation.

In this codebase, extraction is a workflow with multiple steps and multiple tools in play. The platform’s own engineering record calls it a LangGraph 3-node workflow (Extract → Research → Validate), with GPT-5 structured extraction, Azure Document Intelligence for email parsing, and SignalR real-time streaming among the pieces.

When you have that many moving parts, a naive approach—logging “start” and “finish” plus a blob of output—fails for three reasons:

  1. Branching is invisible. If you have prompt variants, fallbacks, or validation branches, you can’t tell which path ran.
  2. Retries look like “it worked eventually.” Without a trace, a retry storm is just a slow request.
  3. DLQ boundaries erase causality. Once a job is routed away, you lose the story unless you persist it.

So the tracer I added is structured around trace nodes. Each node is a single, append-only record of “what happened” at a specific step, with enough data to replay the run.

This is the same idea behind distributed tracing: each span carries the story of a request through a service boundary. The difference is that I'm applying it inside a single pipeline rather than across microservices, which means I get the same causal chain without the operational overhead of a full collector.

The tracer’s schema: trace nodes as first-class records

I keep the tracer schema simple: nodes are shaped for debugging, not for dashboards.

A trace node needs to answer:

  • Where am I in the workflow? (step/node name)
  • What did I feed in? (inputs)
  • What did I get back? (outputs)
  • What did I decide? (branch selection)
  • How expensive was it? (tokens/cost if available)
  • Is this retryable? (attempt count, error)

The codebase explicitly contains an Extraction Workflow Tracer module described as:

“Detailed step-by-step logging for email extraction. Provides complete visibility into the extraction workflow.”

That description is the contract: the tracer is not “some logs,” it’s complete visibility.

A concrete, runnable schema definition

I model a trace node as a dataclass so it’s explicit and serializable.

from __future__ import annotations

import json
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Any, Dict, Optional


@dataclass
class TraceNode:
    """A single step in an extraction run, recorded as an append-only event."""

    trace_id: str
    node_name: str
    timestamp: str

    # What we fed into this step
    inputs: Dict[str, Any]

    # What came back out
    outputs: Dict[str, Any]

    # Branching / decisions
    branch_chosen: Optional[str] = None

    # Retry / failure metadata
    attempt: int = 1
    error: Optional[str] = None

    def to_json(self) -> str:
        return json.dumps(asdict(self), ensure_ascii=False)


def new_trace_node(*, trace_id: str, node_name: str, inputs: Dict[str, Any], outputs: Dict[str, Any]) -> TraceNode:
    return TraceNode(
        trace_id=trace_id,
        node_name=node_name,
        timestamp=datetime.utcnow().isoformat(),
        inputs=inputs,
        outputs=outputs,
    )
Enter fullscreen mode Exit fullscreen mode

What surprised me when I started doing this is how quickly “I think the model did X” turns into “node extract.salary chose branch prompt_variant_b on attempt 2,” which is a totally different kind of conversation.

Where the tracing hooks sit in the extraction pipeline

I hook tracing at the boundaries where information is most likely to disappear:

  • right before/after the extraction call
  • right before/after the research step
  • right before/after validation
  • on every retry attempt
  • at the moment a message is routed to a DLQ

This codebase already gives strong hints about those boundaries:

  • The workflow is managed in app/langgraph_manager.py (called out as the LangGraph workflow file).
  • There’s a dedicated module app/services/extraction_workflow_tracer.py.
  • There’s an app/jobs/embedding_generator.py job that explicitly mentions retry logic and DLQ routing, with content length validation and dimension mismatch detection.

Even though embeddings and extraction are different pipelines, the retry/DLQ pattern is the same: if you don’t trace at the “route to DLQ” boundary, you lose the evidence you need. (For how Dead-Letter Queues behave and why explicitly recording DLQ routing is important for reconstructing message history, see Azure Service Bus dead-letter documentation.)

A minimal, runnable “hook” pattern

This is the core pattern I use: create a node, emit it, then proceed.

import asyncio
from typing import Any, Dict


class TraceSink:
    """A tiny interface: append-only trace storage."""

    async def emit(self, node: TraceNode) -> None:
        raise NotImplementedError


class InMemoryTraceSink(TraceSink):
    def __init__(self) -> None:
        self.nodes = []

    async def emit(self, node: TraceNode) -> None:
        self.nodes.append(node)


async def traced_step(
    *,
    sink: TraceSink,
    trace_id: str,
    node_name: str,
    inputs: Dict[str, Any],
    run
) -> Dict[str, Any]:
    """Run a step and emit a trace node with inputs/outputs."""

    try:
        outputs = await run(inputs)
        node = new_trace_node(trace_id=trace_id, node_name=node_name, inputs=inputs, outputs=outputs)
        await sink.emit(node)
        return outputs
    except Exception as e:
        node = new_trace_node(trace_id=trace_id, node_name=node_name, inputs=inputs, outputs={"ok": False})
        node.error = str(e)
        await sink.emit(node)
        raise
Enter fullscreen mode Exit fullscreen mode

The trick here is boring on purpose: tracing becomes a wrapper you can’t “forget” to do, and the error path is just as structured as the success path.

Example: tracing salary extraction with prompt variant A vs. B

One of the most common fields people care about in recruiting workflows is salary, and it’s also one of the easiest to get subtly wrong.

The reason is that salary appears in multiple forms (ranges, OTE, base-only, “DOE”), and emails often bury it in forwarded text or meeting notes. The engineering record also explicitly mentions the system “handles forwarded emails,” which is exactly the kind of input that creates extraction ambiguity.

So here’s what I trace for a salary extraction step:

  • the raw email content (or a reference to it)
  • the prompt variant used
  • the model response (structured)
  • post-processing output
  • which branch was chosen

Below is a runnable example that produces two trace nodes: one for variant A, one for variant B, and then records which one was selected.

import asyncio
from typing import Dict, Any


async def fake_model_call(*, prompt_variant: str, email_text: str) -> Dict[str, Any]:
    """Stand-in for the structured extraction call."""

    # This function is intentionally simple: the real system uses structured extraction.
    if prompt_variant == "A":
        return {"salary": "$120k-$150k", "raw": "Base $120k-$150k depending on experience"}
    return {"salary": "$150k OTE", "raw": "$150k OTE (base + bonus)"}


def post_process_salary(model_payload: Dict[str, Any]) -> Dict[str, Any]:
    """Stand-in for post-processing tokens/normalization."""

    return {
        "salary": model_payload.get("salary"),
        "post_processing_tokens": ["salary"],
    }


async def trace_salary_extraction() -> None:
    sink = InMemoryTraceSink()
    trace_id = "trace_salary_001"

    email_text = "Comp: Base $120k-$150k depending on experience."

    # Variant A
    model_a = await traced_step(
        sink=sink,
        trace_id=trace_id,
        node_name="extract.salary.model.variantA",
        inputs={"prompt_variant": "A", "email_text": email_text},
        run=lambda inputs: fake_model_call(prompt_variant=inputs["prompt_variant"], email_text=inputs["email_text"]),
    )
    post_a = await traced_step(
        sink=sink,
        trace_id=trace_id,
        node_name="extract.salary.postprocess.variantA",
        inputs={"model_response": model_a},
        run=lambda inputs: asyncio.sleep(0, result=post_process_salary(inputs["model_response"])),
    )

    # Variant B
    model_b = await traced_step(
        sink=sink,
        trace_id=trace_id,
        node_name="extract.salary.model.variantB",
        inputs={"prompt_variant": "B", "email_text": email_text},
        run=lambda inputs: fake_model_call(prompt_variant=inputs["prompt_variant"], email_text=inputs["email_text"]),
    )
    post_b = await traced_step(
        sink=sink,
        trace_id=trace_id,
        node_name="extract.salary.postprocess.variantB",
        inputs={"model_response": model_b},
        run=lambda inputs: asyncio.sleep(0, result=post_process_salary(inputs["model_response"])),
    )

    # Branch selection (example: choose A if it looks like a range)
    chosen = "A" if isinstance(post_a.get("salary"), str) and "-" in post_a["salary"] else "B"
    node = new_trace_node(
        trace_id=trace_id,
        node_name="extract.salary.branch",
        inputs={"variantA": post_a, "variantB": post_b},
        outputs={"chosen": chosen},
    )
    node.branch_chosen = chosen
    await sink.emit(node)

    for n in sink.nodes:
        print(n.to_json())


if __name__ == "__main__":
    asyncio.run(trace_salary_extraction())
Enter fullscreen mode Exit fullscreen mode

I like this example because it shows the exact thing that used to be invisible: I can see both model responses, I can see the post-processing tokens, and I can see the branch selection recorded as its own node.

How traces are stored and indexed

A trace is only useful if it survives the run.

This platform already has multiple persistence layers in play:

  • Redis cache is referenced directly in multiple places (for example, the SubjectLineBandit uses well_shared.cache.redis_manager.get_cache_manager() and loads variants “from Redis”).
  • There’s a Service Bus integration in the Azure service manager (the service manager has an _init_service_bus method).
  • The system uses PostgreSQL + pgvector (called out in CLAUDE.md), and other parts of the platform store operational data in tables like ops_error_stream (from the Error Capture Middleware description).

The tracer’s job is to write a trace record in a way that’s compatible with those layers. The exact storage backend can vary by environment; the important part is the schema stays stable.

Architecture flow (storage + indexing)

flowchart TD
  extractor[ExtractionWorkflow] --> tracer[ExtractionWorkflowTracer]
  tracer --> sink[TraceSink]
  sink --> redis[(Redis Cache)]
  sink --> serviceBus[ServiceBus]
  sink --> postgres[(PostgreSQL)]
  postgres --> opsDash[OpsDashboard]```



The key design choice here is that the tracer emits to an abstraction (`TraceSink`) so I can send the same nodes to Redis for fast lookups, to ServiceBus for asynchronous processing, and to Postgres for durable querying.

## DLQ and retry interactions: making “eventually succeeded” debuggable

Retries are where systems lie.

Not maliciously—just structurally. If you don’t record attempts, the final success overwrites the story. That’s why the Embedding Generator job’s description matters: it explicitly calls out **retry logic and DLQ routing** plus validation failures like **content length validation** and **dimension mismatch detection**.

Those are exactly the kinds of errors that should appear as trace nodes:

- attempt 1 failed: dimension mismatch
- attempt 2 failed: content length
- attempt 3 succeeded
- routed to DLQ (if it never succeeded)

The tracer schema above includes `attempt` and `error` for this reason.

### A runnable retry + DLQ trace pattern



```python
import asyncio
from typing import Any, Dict, Callable


class DeadLetterQueue:
    def __init__(self) -> None:
        self.messages = []

    async def send(self, payload: Dict[str, Any]) -> None:
        self.messages.append(payload)


async def run_with_retry_and_dlq(
    *,
    sink: TraceSink,
    dlq: DeadLetterQueue,
    trace_id: str,
    node_name: str,
    inputs: Dict[str, Any],
    max_attempts: int,
    run: Callable[[Dict[str, Any]], Any],
) -> Any:
    for attempt in range(1, max_attempts + 1):
        try:
            out = await run(inputs)
            node = new_trace_node(trace_id=trace_id, node_name=node_name, inputs=inputs, outputs={"ok": True, "result": out})
            node.attempt = attempt
            await sink.emit(node)
            return out
        except Exception as e:
            node = new_trace_node(trace_id=trace_id, node_name=node_name, inputs=inputs, outputs={"ok": False})
            node.attempt = attempt
            node.error = str(e)
            await sink.emit(node)

    dlq_payload = {"trace_id": trace_id, "node_name": node_name, "inputs": inputs}
    await dlq.send(dlq_payload)

    dlq_node = new_trace_node(
        trace_id=trace_id,
        node_name=f"{node_name}.dlq",
        inputs=inputs,
        outputs={"routed": True},
    )
    await sink.emit(dlq_node)

    raise RuntimeError("Exceeded retry attempts; routed to DLQ")
Enter fullscreen mode Exit fullscreen mode

The non-obvious detail is that I trace the DLQ routing as a node too. If you only enqueue a DLQ message, you’ve still lost the narrative unless you can join it back to the original run by trace_id.

Recording the DLQ event as its own trace node is what preserves causality. Without it, the dead-lettered message sits in a separate queue with no backward pointer to the run that produced it—and the trace_id is the only thing that stitches the story back together.

Replayability: feeding a trace back into a local harness

Once you have a trace, you can do something that feels like cheating: you can reproduce the run without re-running the whole world.

Replayability is a direct consequence of two choices:

  1. I log inputs/outputs per node, not just summaries.
  2. I treat nodes as append-only facts.

When an extraction run fails in production, I don’t want to re-fetch attachments, re-run parsing, or depend on external services just to debug a single prompt variant. I want to take the trace and run a harness locally.

A runnable replay harness

This harness replays nodes by name. In a real system, the handlers would call the actual step functions; here it prints what it would do.

import json
from typing import List, Dict, Any, Callable


def replay_trace(nodes_jsonl: str, handlers: Dict[str, Callable[[Dict[str, Any]], Any]]) -> None:
    """Replay trace nodes from JSONL (one JSON per line)."""

    for line in nodes_jsonl.strip().splitlines():
        node = json.loads(line)
        name = node["node_name"]
        inputs = node.get("inputs", {})

        handler = handlers.get(name)
        if handler is None:
            print(f"[replay] no handler for {name}; skipping")
            continue

        print(f"[replay] running {name}")
        result = handler(inputs)
        print(f"[replay] result: {result}")


if __name__ == "__main__":
    sample = """
{"trace_id":"t1","node_name":"extract.salary.branch","timestamp":"2026-03-12T00:00:00","inputs":{"variantA":{"salary":"$120k-$150k"},"variantB":{"salary":"$150k OTE"}},"outputs":{"chosen":"A"},"branch_chosen":"A","attempt":1,"error":null}
""".strip()

    replay_trace(
        sample,
        handlers={
            "extract.salary.branch": lambda inputs: {"chosen": "A" if "-" in inputs["variantA"]["salary"] else "B"}
        },
    )
Enter fullscreen mode Exit fullscreen mode

The first time I replayed a trace like this, the “nondeterministic” bug turned into a deterministic mismatch between two branches. That’s the moment the tracer paid for itself.

Nuances: why deterministic logging changes everything downstream

I’m careful with the phrase “deterministic” here.

I’m not claiming the model is deterministic. I’m claiming the record of what happened is deterministic: the trace is an immutable timeline of inputs, outputs, and decisions.

That property matters because it supports three safety mechanisms that are otherwise squishy:

Conformal gating

This codebase already has a strong theme of quality gating and budget-aware processing (for example, the a proprietary optimization algorithm system describes telemetry events like a proprietary optimization algorithm.quality_score and a proprietary optimization algorithm.processing_cost, and it includes budget warnings at 80% utilization). A deterministic trace lets you associate a gate decision with the exact node outputs that triggered it.

If a gate flips, I can see the evidence.

Regression detection

When prompts change, regressions often show up as “the output looks different.” Without traces, you don’t know if the input changed, the prompt changed, or the branch changed.

With traces, you can diff node-by-node: variant A response changed; post-processing tokens changed; validation branch changed.

Safer cache invalidation

This platform uses caching in multiple places (Redis is a recurring component, and the Embedding Agent explicitly implements Redis caching with a 24hr TTL). Cache invalidation gets safer when you can tie cached artifacts to the trace nodes that produced them.

If a prompt variant changes, I can invalidate only the nodes that depended on that variant, instead of flushing everything and hoping. Concretely: each cache key includes the node name and a hash of the inputs that produced it. When variant B replaces variant A, I walk the trace graph to find every downstream node whose inputs included variant A's output—post-processing, validation, branch selection—and expire exactly those keys. Everything else survives. This turns cache invalidation from a prayer ("flush it all and re-warm") into a targeted operation with a clear blast radius, which matters a lot when you're running extraction at scale and a full cache miss means re-calling the model for every in-flight job.

Closing

Once I started treating extraction runs as a ledger of trace nodes—complete with retries, DLQ boundaries, and branch decisions—the system stopped feeling like a black box and started feeling like a machine I could actually tune. The real win wasn’t better logs; it was the moment “I think it chose the other prompt” became a node I could point at and replay until it behaved.

Top comments (0)