Introduction
"Ingest external data once a night", "aggregate user events in real-time" — data pipeline design requires choosing between batch/stream and ensuring data quality. Generate designs with Claude Code.
CLAUDE.md Data Pipeline Rules
## Data Pipeline Design Rules
### Batch vs Stream
- Batch: nightly ETL, report generation (tolerable delay, high volume)
- Stream: real-time aggregation, alerts (low latency required)
- Lambda Architecture: combine batch and stream
### Data Quality
- Schema validation (Zod or dbt) to guarantee quality
- Check for nulls, duplicates, anomalies
- Record quality scores in metadata (for downstream use)
### Error Handling
- Pipelines are idempotent (same result on re-run)
- Failed records saved to DLQ (skip and continue)
- Save processing checkpoints (resume from where it stopped)
Generated ETL Pipeline
// src/pipeline/orderEtl.ts
import { z } from 'zod';
const SourceOrderSchema = z.object({
id: z.string(),
external_order_id: z.string(),
amount: z.number().positive(),
currency: z.enum(['USD', 'EUR', 'JPY', 'GBP']),
category_code: z.string(),
created_at: z.string().datetime(),
customer_email: z.string().email().nullable(),
});
export async function runOrderEtl(runId: string, options: PipelineOptions): Promise<PipelineResult> {
const { batchSize, startDate, endDate } = options;
// Resume from checkpoint
const checkpoint = await prisma.etlCheckpoint.findUnique({ where: { runId } });
let offset = checkpoint?.lastProcessedOffset ?? 0;
let processedCount = 0;
let failedCount = 0;
while (true) {
const rawOrders = await fetchExternalOrders({ startDate, endDate, offset, limit: batchSize });
if (rawOrders.length === 0) break;
const results = await Promise.allSettled(rawOrders.map(async (raw) => {
const parsed = SourceOrderSchema.safeParse(raw);
if (!parsed.success) throw new DataQualityError(`Schema validation failed: ${JSON.stringify(parsed.error.errors)}`);
const transformed = await transformOrder(parsed.data, runId);
// Idempotent upsert
await prisma.analyticsOrder.upsert({
where: { externalOrderId: transformed.externalOrderId },
create: transformed,
update: { amountJpy: transformed.amountJpy, processedAt: transformed.processedAt, etlRunId: runId },
});
return transformed;
}));
for (const [index, result] of results.entries()) {
if (result.status === 'fulfilled') {
processedCount++;
} else {
failedCount++;
// Save failed records to DLQ
await prisma.etlDlq.create({
data: { runId, sourceRecord: rawOrders[index], error: result.reason.message, failedAt: new Date() },
});
}
}
// Update checkpoint (for crash recovery)
offset += rawOrders.length;
await prisma.etlCheckpoint.upsert({
where: { runId },
create: { runId, lastProcessedOffset: offset, updatedAt: new Date() },
update: { lastProcessedOffset: offset, updatedAt: new Date() },
});
logger.info({ runId, offset, processedCount, failedCount }, 'ETL batch completed');
}
return { runId, processedCount, failedCount };
}
ETL Scheduler (GitHub Actions)
# .github/workflows/etl-pipeline.yml
on:
schedule:
- cron: '0 2 * * *' # Daily at 2AM UTC
jobs:
run-etl:
steps:
- run: |
RUN_ID="etl-$(date +%Y%m%d)"
node scripts/runEtl.js --runId "$RUN_ID" --startDate "$(date -d 'yesterday' +%Y-%m-%d)" --endDate "$(date +%Y-%m-%d)"
env:
DATABASE_URL: ${{ secrets.ANALYTICS_DB_URL }}
- name: Check DLQ count
run: |
DLQ_COUNT=$(node scripts/getDlqCount.js --runId "etl-$(date +%Y%m%d)")
if [ "$DLQ_COUNT" -gt 100 ]; then echo "Too many DLQ items"; exit 1; fi
Summary
Design Data Pipelines with Claude Code:
- CLAUDE.md — batch/stream selection criteria, idempotent execution, DLQ policy
- Zod schema validation — check source data quality at pipeline entry
- Checkpoints — resume from crash without re-processing
- DLQ — save failed records, skip and continue, fix manually later
Review data pipeline designs with **Code Review Pack (¥980)* using /code-review at prompt-works.jp*
myouga (@myougatheaxo) — Axolotl VTuber.
Top comments (0)