DEV Community

Matthew Hou
Matthew Hou

Posted on

Building Maintainable AI Pipelines with the Task Context Observer Pattern

Building Maintainable AI Pipelines with the Task Context Observer Pattern

When building AI-powered applications, monitoring task execution becomes critical as pipelines grow in complexity. However, adding monitoring logic directly into your business logic creates tight coupling and makes the codebase harder to maintain. This article presents the Task Context Observer pattern—a clean architecture approach that separates monitoring from execution logic.

The Problem

As AI applications scale, we need to:

  1. Track task status (success or failure) throughout the entire process
  2. Monitor what changes at each step
  3. Keep monitoring logic completely separate from business logic

Traditional approaches often mix monitoring code with business logic, creating maintenance nightmares. The Task Context Observer pattern solves this through clean separation of concerns.


Design Goals

The pattern achieves two key goals:

  1. Zero coupling - The monitor system never touches chain logic
  2. Complete visibility - Track the full status of each task from start to finish

Solution: Two-Class Design

The pattern uses two core abstractions:

  1. Context (ctx) - Tracks all information about a task as it flows through the pipeline
  2. Runner (step list) - Defines the sequence of processing steps

Core Concept Diagram

Each task has two important properties:

Diagram 1

Key Concept:

  • Property 1 (Blue): Runner defines WHAT to do (steps) and HOW to monitor (observers)
  • Property 2 (Green): Context contains ALL information about the task throughout the process

Architecture Diagram

Diagram 2

Key Points:

  • Blue (Runner): Orchestrates the step execution
  • Green (Context): Flows through each step, being transformed
  • Yellow (Observer): Monitoring layer, completely separated via notification pattern
  • Dotted lines: Notification/observation (no direct coupling)

Sequence Diagram

Diagram 3


Implementation

1. Context Type

The Context type defines what information flows through your pipeline:

type Ctx = {
  taskName: string;
  input: string;

  prompt?: string;
  rawOutput?: string;
  parsed?: unknown;

  parseOk?: boolean;
};
Enter fullscreen mode Exit fullscreen mode

2. Context View

CtxView provides a summarized view of the context, hiding sensitive fields like prompts and raw outputs from logs:

const view: CtxView<Ctx> = {
  summarize(ctx) {
    // Show only key fields; represent large fields with length/hash
    return {
      taskName: ctx.taskName,
      inputLen: ctx.input.length,
      promptLen: ctx.prompt?.length ?? 0,
      rawLen: ctx.rawOutput?.length ?? 0,
      parseOk: ctx.parseOk ?? null,
      parsedType: ctx.parsed ? typeof ctx.parsed : null,
    };
  },
};
Enter fullscreen mode Exit fullscreen mode

3. Runner (Step List)

The Runner orchestrates step execution and notifies observers:

export interface Step<C> {
  name: string;
  run(ctx: C): Promise<C> | C;
}

export class Runner<C> {
  constructor(
    private readonly steps: Step<C>[],
    private readonly observers: StepObserver<C>[] = []
  ) {}

  async run(runId: string, initial: C): Promise<C> {
    let ctx = initial;

    for (const step of this.steps) {
      const stepName = step.name;
      const before = ctx;

      this.observers.forEach((o) => o.onStart?.(runId, stepName, before));
      const start = Date.now();

      try {
        ctx = await step.run(before);
        const ms = Date.now() - start;
        this.observers.forEach((o) =>
          o.onEnd?.(runId, stepName, before, ctx, ms)
        );
      } catch (err) {
        const ms = Date.now() - start;
        this.observers.forEach((o) =>
          o.onError?.(runId, stepName, before, err, ms)
        );
        throw err;
      }
    }

    return ctx;
  }
}
Enter fullscreen mode Exit fullscreen mode

4. Observer (Monitor)

Observers receive notifications about step execution without coupling to business logic:

export interface StepObserver<C> {
  onStart?(runId: string, stepName: string, ctx: C): void;
  onEnd?(
    runId: string,
    stepName: string,
    before: C,
    after: C,
    latencyMs: number
  ): void;
  onError?(
    runId: string,
    stepName: string,
    before: C,
    err: unknown,
    latencyMs: number
  ): void;
}

function diff(before: Dict, after: Dict): Dict {
  const out: Dict = {};
  const keys = new Set([...Object.keys(before), ...Object.keys(after)]);
  for (const k of keys) {
    const b = before[k];
    const a = after[k];
    if (b !== a) out[k] = { from: b, to: a };
  }
  return out;
}

export class DiffLogger<C> implements StepObserver<C> {
  constructor(private readonly view: CtxView<C>) {}

  onEnd(
    runId: string,
    stepName: string,
    before: C,
    after: C,
    latencyMs: number
  ): void {
    const b = this.view.summarize(before);
    const a = this.view.summarize(after);
    const d = diff(b, a);

    console.log(`\n[${runId}] ✅ ${stepName} (${latencyMs}ms)`);
    if (Object.keys(d).length === 0)
      console.log("  (no ctx changes in summary)");
    else console.log("  changes:", d);
  }

  onError(
    runId: string,
    stepName: string,
    before: C,
    err: unknown,
    latencyMs: number
  ): void {
    console.log(`\n[${runId}] ❌ ${stepName} (${latencyMs}ms) error=`, err);
    // You can also log error type classification here
  }
}
Enter fullscreen mode Exit fullscreen mode

What This Design Accomplishes

The Task Context Observer pattern provides clear separation of concerns through four components:

  1. Runner - Defines which steps to execute and which observers to notify
  2. Step - Contains pure business logic for each processing stage
  3. Context - Carries all task information through the pipeline
  4. Observer - Handles monitoring and logging without coupling to business logic

This separation makes your code easier to:

  • Test - Mock observers without affecting business logic
  • Maintain - Change monitoring without touching core functionality
  • Extend - Add new observers or steps independently
  • Debug - See exactly what changed at each step

Real-World Applications

This pattern works well for:

  • AI task pipelines (prompt generation → LLM call → parsing → validation)
  • Data processing workflows
  • Multi-step API integrations
  • ETL pipelines
  • Any sequential processing that needs monitoring

Conclusion

The Task Context Observer pattern solves a common problem in AI engineering: how to monitor complex pipelines without tangling monitoring logic with business logic. By using immutable context flow and the observer pattern, you get complete visibility into your AI tasks while maintaining clean, testable code.

The pattern scales well as your application grows—adding new monitoring capabilities is as simple as implementing a new observer. Your business logic remains pure and focused on its core responsibility: transforming data.

If you're building AI applications with multi-step pipelines, consider adopting this pattern early. The investment in clean architecture pays dividends as complexity grows.

Top comments (0)