DEV Community

myougaTheAxo
myougaTheAxo

Posted on

Designing Data Pipelines with Claude Code: ETL, Stream Processing, Data Quality Checks

Introduction

"Ingest external data once a night", "aggregate user events in real-time" — data pipeline design requires choosing between batch/stream and ensuring data quality. Generate designs with Claude Code.


CLAUDE.md Data Pipeline Rules

## Data Pipeline Design Rules

### Batch vs Stream
- Batch: nightly ETL, report generation (tolerable delay, high volume)
- Stream: real-time aggregation, alerts (low latency required)
- Lambda Architecture: combine batch and stream

### Data Quality
- Schema validation (Zod or dbt) to guarantee quality
- Check for nulls, duplicates, anomalies
- Record quality scores in metadata (for downstream use)

### Error Handling
- Pipelines are idempotent (same result on re-run)
- Failed records saved to DLQ (skip and continue)
- Save processing checkpoints (resume from where it stopped)
Enter fullscreen mode Exit fullscreen mode

Generated ETL Pipeline

// src/pipeline/orderEtl.ts
import { z } from 'zod';

const SourceOrderSchema = z.object({
  id: z.string(),
  external_order_id: z.string(),
  amount: z.number().positive(),
  currency: z.enum(['USD', 'EUR', 'JPY', 'GBP']),
  category_code: z.string(),
  created_at: z.string().datetime(),
  customer_email: z.string().email().nullable(),
});

export async function runOrderEtl(runId: string, options: PipelineOptions): Promise<PipelineResult> {
  const { batchSize, startDate, endDate } = options;

  // Resume from checkpoint
  const checkpoint = await prisma.etlCheckpoint.findUnique({ where: { runId } });
  let offset = checkpoint?.lastProcessedOffset ?? 0;
  let processedCount = 0;
  let failedCount = 0;

  while (true) {
    const rawOrders = await fetchExternalOrders({ startDate, endDate, offset, limit: batchSize });
    if (rawOrders.length === 0) break;

    const results = await Promise.allSettled(rawOrders.map(async (raw) => {
      const parsed = SourceOrderSchema.safeParse(raw);
      if (!parsed.success) throw new DataQualityError(`Schema validation failed: ${JSON.stringify(parsed.error.errors)}`);

      const transformed = await transformOrder(parsed.data, runId);

      // Idempotent upsert
      await prisma.analyticsOrder.upsert({
        where: { externalOrderId: transformed.externalOrderId },
        create: transformed,
        update: { amountJpy: transformed.amountJpy, processedAt: transformed.processedAt, etlRunId: runId },
      });

      return transformed;
    }));

    for (const [index, result] of results.entries()) {
      if (result.status === 'fulfilled') {
        processedCount++;
      } else {
        failedCount++;
        // Save failed records to DLQ
        await prisma.etlDlq.create({
          data: { runId, sourceRecord: rawOrders[index], error: result.reason.message, failedAt: new Date() },
        });
      }
    }

    // Update checkpoint (for crash recovery)
    offset += rawOrders.length;
    await prisma.etlCheckpoint.upsert({
      where: { runId },
      create: { runId, lastProcessedOffset: offset, updatedAt: new Date() },
      update: { lastProcessedOffset: offset, updatedAt: new Date() },
    });

    logger.info({ runId, offset, processedCount, failedCount }, 'ETL batch completed');
  }

  return { runId, processedCount, failedCount };
}
Enter fullscreen mode Exit fullscreen mode

ETL Scheduler (GitHub Actions)

# .github/workflows/etl-pipeline.yml
on:
  schedule:
    - cron: '0 2 * * *'  # Daily at 2AM UTC

jobs:
  run-etl:
    steps:
      - run: |
          RUN_ID="etl-$(date +%Y%m%d)"
          node scripts/runEtl.js --runId "$RUN_ID" --startDate "$(date -d 'yesterday' +%Y-%m-%d)" --endDate "$(date +%Y-%m-%d)"
        env:
          DATABASE_URL: ${{ secrets.ANALYTICS_DB_URL }}

      - name: Check DLQ count
        run: |
          DLQ_COUNT=$(node scripts/getDlqCount.js --runId "etl-$(date +%Y%m%d)")
          if [ "$DLQ_COUNT" -gt 100 ]; then echo "Too many DLQ items"; exit 1; fi
Enter fullscreen mode Exit fullscreen mode

Summary

Design Data Pipelines with Claude Code:

  1. CLAUDE.md — batch/stream selection criteria, idempotent execution, DLQ policy
  2. Zod schema validation — check source data quality at pipeline entry
  3. Checkpoints — resume from crash without re-processing
  4. DLQ — save failed records, skip and continue, fix manually later

Review data pipeline designs with **Code Review Pack (¥980)* using /code-review at prompt-works.jp*

myouga (@myougatheaxo) — Axolotl VTuber.

Top comments (0)