Stop me if you've heard this one:
You're processing data with AI. You write some async code. It works.
Then you need to:
- Add another analysis
- Make some run in parallel
- Add retry logic
- Track costs
You rewrite 50 lines. Again. And again.
I did this five times before building dagengine.
The Problem
I was analyzing my book manuscript—checking emotional pacing across 30 chapters. Each chapter needed multiple analyses: emotion score, readability, consistency checks.
First attempt: Sequential processing. 45 seconds per chapter × 30 chapters = 22.5 minutes.
Then I tried Promise.all():
await Promise.all([
analyzeEmotion(chapter).catch(retry),
checkReadability(chapter).catch(retry),
verifyConsistency(chapter).catch(retry),
// ... more analyses
])
200 lines of orchestration code. Manual retry logic everywhere. Every new analysis meant rewriting coordination.
Week 5, refactoring again: This isn't a book problem.
The Pattern
Every AI workflow has the same structure:
- Multiple analyses needed
- Some can run in parallel
- Some depend on others
- All need retries
- All need cost tracking
I was solving a general problem with one-off code.
What I Built
dagengine handles the plumbing. You define what you want analyzed and what depends on what. The engine figures out execution order and parallelization.
Here's the concept:
import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';
// Define result types (TypeScript helps!)
interface SentimentResult {
sentiment: "positive" | "negative" | "neutral";
score: number;
}
class ReviewAnalyzer extends Plugin {
constructor() {
super('review-analyzer', 'Review Analyzer', 'Analyze customer reviews');
// Define your analysis tasks
this.dimensions = [
'filter_spam', // Stage 1: Filter
'sentiment', // Stage 2: Parallel
'categorize', // Stage 2: Parallel
'analyze_category' // Stage 3: Deep analysis
];
}
// Declare what depends on what
defineDependencies(): Record<string, string[]> {
return {
sentiment: ['filter_spam'],
categorize: ['filter_spam'],
analyze_category: ['categorize']
};
}
// Build prompts (engine calls this for each dimension)
createPrompt(context: PromptContext): string {
const content = context.sections[0]?.content || '';
if (context.dimension === 'filter_spam') {
return `Is this spam? "${content}"
Return JSON: { "is_spam": boolean, "confidence": 0-1 }`;
}
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${content}"
Return JSON: { "sentiment": "positive|negative|neutral", "score": 0-1 }`;
}
return '';
}
// Choose AI model per dimension
selectProvider(dimension: string): ProviderSelection {
// Cheap model for filtering (Haiku: $0.80/$4 per 1M tokens)
if (dimension === 'filter_spam' ||
dimension === 'sentiment' ||
dimension === 'categorize') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
// Powerful model for deep analysis (Sonnet: $3/$15 per 1M tokens)
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' }
};
}
// Skip expensive analysis on spam
shouldSkipSectionDimension(context: any): boolean {
if (context.dimension === 'sentiment' || context.dimension === 'categorize') {
const spam = context.dependencies.filter_spam?.data?.is_spam;
return spam;
}
return false;
}
}
The engine analyzes the dependency graph and automatically:
- Runs independent tasks in parallel (
sentiment+categorize) - Waits for dependencies before execution
- Retries failures with exponential backoff
- Tracks costs per dimension
How It Executes
The DAG determines execution order:
filter_spam (20 reviews)
↓
├─→ sentiment (parallel)
└─→ categorize (parallel)
↓
group_by_category (10 reviews → 5 groups)
↓
analyze_category (5 groups)
No manual coordination. No Promise.all() hell.
Skip Logic Saves Money
Here's the key optimization - skip expensive analysis on spam:
shouldSkipSectionDimension(context: any): boolean {
if (context.dimension === 'sentiment' || context.dimension === 'categorize') {
const spam = context.dependencies.filter_spam?.data?.is_spam;
return spam; // Skip downstream analysis
}
return false;
}
Real impact: This saved 20 API calls (10 spam reviews × 2 dimensions) = 30% efficiency gain
Multi-Model Routing
Use cheap models for filtering, expensive models for analysis:
selectProvider(dimension: string): ProviderSelection {
// Cheap: Haiku at $0.80/$4 per 1M tokens
if (dimension === 'filter_spam' ||
dimension === 'sentiment' ||
dimension === 'categorize') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
// Powerful: Sonnet at $3/$15 per 1M tokens
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' }
};
}
Real Numbers
Processing 20 customer reviews through this pipeline:
✅ Spam filter detected 10 spam reviews
✅ Skip logic prevented 20 unnecessary API calls
✅ Multi-model routing: Haiku for filtering, Sonnet for deep analysis
✅ Total cost: $0.0282 in 24 seconds
Compare to alternatives:
- Using only Sonnet: $0.094 (3.3× more expensive)
- No skip logic: 20 extra API calls wasted on spam
Key insight: 70% cost savings from smart model routing + skip logic
Using the Results
// Create and run engine
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
},
execution: {
pricing: {
models: {
'claude-3-5-haiku-20241022': { inputPer1M: 0.80, outputPer1M: 4.00 },
'claude-3-7-sonnet-20250219': { inputPer1M: 3.00, outputPer1M: 15.00 }
}
}
}
});
const result = await engine.process(reviews);
// Access results with type safety
result.sections.forEach((section) => {
const sentiment = section.results.sentiment?.data as SentimentResult;
console.log(`Review: ${sentiment.sentiment}`);
});
// Check costs
console.log(`Total cost: $${result.stats.totalCost}`);
console.log(`Total tokens: ${result.stats.totalTokens}`);
No spreadsheets. No manual token counting.
Key Features
✅ Automatic parallelization - DAG determines execution order
✅ Skip logic - Avoid wasting API calls on low-quality data
✅ Multi-model routing - Cheap models for filtering, expensive for analysis
✅ Built-in retries - 3 automatic retries with exponential backoff
✅ Cost tracking - Real-time breakdown by dimension and model
✅ Full TypeScript - Type-safe with excellent IDE support
✅ Data transformations - Reshape data between stages (10 reviews → 5 groups)
✅ Multi-provider - Anthropic, OpenAI, Google Gemini
Try It
npm install @dagengine/core
Need help?
💬 Ask a question
📖 Read the docs
🐛 Report bugs
💡 Request features
It's beta. I use it daily. Looking for feedback on what breaks for your use case.
Not asking for trust. Asking you to break it and tell me what's wrong.
What AI workflows are you building that could benefit from parallelization? Drop a comment below! 👇
What's Next?
Want to see more advanced patterns?
📖 Section vs Global dimensions - Handle per-item and cross-item analysis
⚡ High-volume processing - 100 parallel requests with Portkey gateway
Check out the full examples on GitHub.
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.