The pipeline that processes your records and the pipeline that traces them are the same pipeline. That sentence should feel wrong — tracing means logging, timing, capturing intermediate state. That's side-effect territory. You'd expect the traced version to be a different thing, wired with callbacks and context objects, behaving subtly differently under pressure.
It isn't. And the way it works changed how I think about composing software around a core that shouldn't be touched.
The Problem With Observability That Modifies What It Observes
You have a pipeline — four stages, composed together, each taking a Result and returning a Result. It works. Now product wants a visualization: show which stage each record passed through, how long each took, where failures forked off the happy path.
The natural instinct is to reach inside. Add a logger to each stage function. Pass a context object through. Wrap each stage in a decorator that captures timing. Every approach has the same shape: you modify the thing you're trying to observe.
And now you have two problems. The production pipeline and the traced pipeline are different code paths. They can diverge. The tracing might swallow an exception. The timing wrapper might change async scheduling. You're not observing the pipeline anymore — you're observing a different pipeline that you hope behaves the same way.
What if the observation points were composed between stages, in the same way the stages themselves are composed? What if the pipeline couldn't tell the difference?
Interleaving Observation
Here's the production pipeline. Four stages, one recovery step, no instrumentation:
export const processTransaction = flowAsync(
validateRaw,
flatMapWith(normalize),
flatMapWith(applyBusinessRules),
flatMapWith(enrichWithCustomerData),
orElseWith(recovery),
);
And here's the traced version:
const createTracedPipeline = () => {
const tracer = createTracer();
const pipeline = flowAsync(
validateRaw,
tapWith((tx: Transaction) => tracer.recordOk("validate", tx)),
flatMapWith(normalize),
tapWith((tx: NormalizedTransaction) => tracer.recordOk("normalize", tx)),
flatMapWith(applyBusinessRules),
tapWith((tx: NormalizedTransaction) =>
tracer.recordOk("businessRules", tx),
),
flatMapWith(enrichWithCustomerData),
tapWith((tx: EnrichedTransaction) => tracer.recordOk("enrich", tx)),
tapErrWith((error: string) => tracer.recordErr(error)),
orElseWith(recovery),
);
return { pipeline, tracer };
};
Look at what didn't change. Every stage function is identical. The composition order is identical. validateRaw, normalize, applyBusinessRules, enrichWithCustomerData, recovery — all untouched. The only additions are tapWith and tapErrWith calls slotted between stages.
tapWith runs its callback when a value passes through on the ok track, then forwards that value unchanged. tapErrWith does the same on the err track. They're observation points that participate in the composition but not in the logic. The pipeline genuinely cannot tell they're there.
This only works because the composition model treats observation the same way it treats transformation — as a function in the chain. There's no special "middleware" concept, no decorator protocol. A tap is just another step that happens to return its input untouched.
Reconstructing What Happened
The clever part isn't the taps. It's what happens after.
The pipeline returns a single Result — either ok(processedTransaction) or err("some message"). That's the whole output. But the tracer has been quietly accumulating entries as each tap fired. After the pipeline completes, tracer.buildStages() reconstructs the full four-stage trace from those breadcrumbs.
Take a concrete example. Record #2 — bob@example.com, $50 USD, December 2024. It passes validation, passes normalization, then fails business rules because the date is before the 2025 cutoff. The pipeline returns err("Transaction too old").
The tracer's entries array after execution:
[ { stage: "validate", value: {...}, timestamp: 1.2 },
{ stage: "normalize", value: {...}, timestamp: 1.5 } ]
Two entries. The tapErrWith also captured "Transaction too old" with its own timestamp.
Now buildStages() walks the four-stage order and applies three rules:
- Stage has a recorded entry → ok, duration derived from timestamps.
- First missing stage whose predecessor was ok → err, with the captured error message.
- Everything after that → skipped.
Output: [ok, ok, err, skipped]. Validate ✓ → Normalize ✓ → Business Rules ✗ → Enrich skipped.
From a single error string and two breadcrumbs, the tracer reconstructed exactly where and when the pipeline diverged. That's what the demo renders in the center panel — not a log dump, but a visual trace showing each record's path through the pipeline, with timing, fork points, and recovery status.
Record #5 shows the recovery case. It passes all stages, then enrichWithCustomerData fails because the customer isn't in the mock database. But the orElseWith recovery catches customer-lookup failures and converts them to partial results. The tracer sees three ok entries, plus the error capture, and the final result is ok({ partial: true }). Four stages visible, enrichment marked with ◐ instead of ✗. Partial success, not failure.
One Schema in Two Contexts
There's a second pattern in the demo that's less surprising but saves real maintenance pain.
In most form + pipeline setups, you define validation constraints twice. Even when tools let you share, there's glue. With Zod and react-hook-form, for example, you share the schema but still need a resolver adapter to bridge the two worlds:
// Same schema, but you still need this adapter
const form = useForm({
resolver: zodResolver(transactionSchema),
defaultValues: record.data,
});
In the demo, transactionSchema is declared once and imported directly into both contexts with no adapter at all:
// Pipeline side — validates raw input on entry
export const validateRaw = (raw: unknown): Result<Transaction, string> => {
const result = validate(raw, transactionSchema);
// ...
};
// Form side — same schema, no resolver, no translation layer
const form = useForm(transactionSchema, {
initialValues: record.data,
onSubmit: (values) => updateRecord(index, values),
});
The email() validator that rejects "not-an-email" in the pipeline is the same validator that shows an inline error when you type it into the form. Change a constraint in one place, both contexts pick it up. The schema also infers the TypeScript type, so there's no separate type definition drifting out of sync.
It's not a breakthrough — it's just what happens when the schema layer is designed to work as both a runtime validator and a form driver from the start, rather than bolting the form integration on after.
Batch Semantics, Three Ways
One more thing the demo makes visible. After the pipeline runs, the right panel applies three aggregation strategies to the same batch of results:
-
partitionsplits successes from failures and keeps both. The ETL default — process what you can, quarantine what you can't. -
combineis all-or-nothing, stops at the first error. For atomic operations where partial success is meaningless. -
combineAllis all-or-nothing but collects every error. For import validation where you want to surface all problems before rejecting.
With try/catch, each of these is a custom accumulation loop with its own subtle bugs. With Result, they're one-liners on results the pipeline already produced. When record #2 fails, combine and combineAll both reject the entire batch. partition shows 4 successes alongside that failure. Same data, different semantics, visible side by side.
What This Pattern Enables
Both the tracing and the schema sharing come from the same design choice: compose around the pipeline rather than modifying it.
The schema is shared by importing one declaration into both contexts — not by building an adapter between two different systems. The tracing is added by interleaving taps into the composition — not by modifying stage functions. The batch semantics are computed by running combinators over existing results — not by changing how those results are produced.
But the interesting question is what else you could slot into those gaps. The same tapWith mechanism that drives a visualization could drive an audit log, a performance budget that rejects slow stages, a diff tracker that compares outputs across pipeline versions, a circuit breaker that trips after N consecutive failures. Each would be composed between stages the same way. Each would leave the pipeline unaware.
The constraint — observation can't modify what it observes — turns out to be generative. When you can't reach inside, you find better places to stand.
ETL Pipeline Dashboard on StackBlitz — if StackBlitz appears stuck on "cloning from repo", hit refresh (known StackBlitz bug).
GitHub: @railway-ts/pipelines · @railway-ts/use-form
Top comments (0)