When processing large batches of items through multiple LLM analyses, you quickly run into orchestration complexity. Consider a customer review pipeline that needs to:
- Run sentiment analysis and topic extraction (independent tasks)
- Generate summaries that depend on both sentiment and topics
- Filter spam before expensive operations
- Handle failures gracefully
- Route to different models based on cost/capability
- Track spending in real-time
The Problem
The typical progression:
Sequential approach - Simple but slow:
for (const review of reviews) {
const sentiment = await runSentiment(review);
const topics = await runTopics(review);
const summary = await runSummary(review, sentiment, topics);
}
Parallel approach - Faster but fragile:
const [sentiments, topics] = await Promise.all([
Promise.all(reviews.map(r => runSentiment(r))),
Promise.all(reviews.map(r => runTopics(r)))
]);
const summaries = await Promise.all(
reviews.map((r, i) => runSummary(r, sentiments[i], topics[i]))
);
This works until you need error handling, retries, rate limiting, concurrency controls, and cost tracking. At scale, you end up with substantial orchestration code.
Existing Solutions
Several tools address this:
- LangChain/LCEL - Has parallel execution with RunnableParallel, good for chaining operations
- Prefect/Temporal - Durable workflow execution with state persistence, excellent for long-running workflows but requires infrastructure
- LangGraph - State-based approach that can model DAGs through state transitions
These are production-ready tools. The gap I found: I wanted pure dependency-based orchestration focused specifically on LLM batch processing, without the infrastructure overhead or state management abstractions.
The DAG Approach
The core observation: for batch LLM processing, you're primarily describing analysis dependencies:
sentiment ─┐
├─→ summary
topics ────┘
Dependencies define execution order. Parallelization emerges automatically.
I built dagengine around this principle.
Implementation
Here's the complete working example:
import { DagEngine, Plugin } from '@dagengine/core';
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyze reviews');
this.dimensions = ['sentiment', 'topics', 'summary'];
}
// Declare dependencies
defineDependencies() {
return {
summary: ['sentiment', 'topics']
};
}
// Define prompts for each dimension
createPrompt(context) {
const review = context.sections[0]?.content;
if (context.dimension === 'sentiment') {
return `Analyze sentiment: "${review}"
Return JSON: {
"sentiment": "positive|negative|neutral",
"confidence": 0.0-1.0
}`;
}
if (context.dimension === 'topics') {
return `Extract topics: "${review}"
Return JSON: {
"topics": ["topic1", "topic2", ...]
}`;
}
if (context.dimension === 'summary') {
const sentiment = context.dependencies.sentiment.data;
const topics = context.dependencies.topics.data;
return `Summarize with context:
- Sentiment: ${sentiment.sentiment} (${sentiment.confidence})
- Topics: ${topics.topics.join(', ')}
- Review: "${review}"
Return JSON: {
"summary": "...",
"key_points": ["point1", "point2", ...]
}`;
}
}
// Select model
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
// Run
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
}
});
const result = await engine.process(reviews);
What happens automatically:
- ✅ Parallel execution of independent dimensions
- ✅ Dependency waiting
- ✅ Concurrent section processing (configurable batch size)
- ✅ Automatic retries with exponential backoff
- ✅ Cost tracking
- ✅ JSON parsing and validation
What you control:
- Dimensions to run
- Dependencies between them
- Prompt generation
- Model selection
Design Choice: JSON Responses
The engine expects JSON responses. Include "Return JSON: {schema}" in prompts.
Rationale:
- Works across all providers without provider-specific syntax
- Reliable parsing without regex
- Clean dependency data access
- Enables skip logic based on structured fields
- Consistent cost tracking
The engine automatically parses responses and makes them available in context.dependencies[dimension].data.
Practical Features
1. Skip Logic for Cost Savings
class ReviewAnalyzer extends Plugin {
constructor() {
super('analyzer', 'Review Analyzer', 'Analyze reviews');
this.dimensions = [
'filter_spam',
'sentiment',
'topics',
'summary'
];
}
defineDependencies() {
return {
sentiment: ['filter_spam'],
topics: ['filter_spam'],
summary: ['sentiment', 'topics']
};
}
shouldSkipSectionDimension(context) {
if (context.dimension === 'sentiment' ||
context.dimension === 'topics') {
const spam = context.dependencies.filter_spam?.data;
return spam?.is_spam === true;
}
return false;
}
}
In testing with 20 reviews where 10 were spam, this skipped 20 unnecessary API calls.
2. Multi-Model Routing
selectProvider(context) {
// Cheap model for filtering
if (context.dimension === 'filter_spam') {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
// More capable model for analysis
return {
provider: 'anthropic',
options: { model: 'claude-3-7-sonnet-20250219' },
fallbacks: [
{
provider: 'openai',
options: { model: 'gpt-4o' }
}
]
};
}
Using Haiku ($0.80/1M input) for filtering and Sonnet ($3/1M input) for analysis reduces costs compared to using Sonnet for everything.
3. Gateway Integration
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: {
anthropic: {
apiKey: process.env.ANTHROPIC_API_KEY,
gateway: 'portkey',
gatewayApiKey: process.env.PORTKEY_API_KEY,
gatewayConfig: {
retry: {
attempts: 5,
on_status_codes: [429, 500, 502, 503, 504]
}
}
}
}
});
Portkey handles rate limiting and retries transparently.
4. Partial Results on Failures
const engine = new DagEngine({
plugin: new ReviewAnalyzer(),
providers: { /* ... */ },
execution: {
maxRetries: 3,
retryDelay: 1000,
continueOnError: true
}
});
// Provide fallback values
class ReviewAnalyzer extends Plugin {
async handleDimensionFailure(context) {
return {
data: {
sentiment: 'neutral',
confidence: 0.0
},
metadata: {
fallback: true,
error: context.error.message
}
};
}
}
If one item fails, you still get results for the rest.
5. Cost Tracking
const result = await engine.process(reviews);
console.log(result.costs);
// {
// totalCost: 0.0282,
// totalTokens: 12289,
// byDimension: {
// filter_spam: {
// cost: 0.0115,
// tokens: { input: 6548, output: 1559 }
// },
// ...
// }
// }
Cross-Item Analysis Pattern
For operations that need to aggregate across all items:
class CategoryAnalyzer extends Plugin {
constructor() {
super('category', 'Category Analyzer', 'Analyze by category');
this.dimensions = [
'classify',
{ name: 'group', scope: 'global' },
'analyze'
];
}
defineDependencies() {
return {
group: ['classify'],
analyze: ['group']
};
}
createPrompt(context) {
if (context.dimension === 'classify') {
return `Classify: "${context.sections[0].content}"
Return JSON: { "category": "product|service|delivery|other" }`;
}
if (context.dimension === 'group') {
// Access all classification results
const allClassifications = context.sections.map((section, i) => ({
review: section.content,
category: context.results.classify[i].data.category
}));
return `Group ${allClassifications.length} reviews by category.
${JSON.stringify(allClassifications, null, 2)}
Return JSON: { "categories": [...] }`;
}
if (context.dimension === 'analyze') {
const category = context.sections[0].metadata.category;
return `Analyze ${category} reviews:
${context.sections[0].content}
Return JSON: { "insights": [...], "themes": [...] }`;
}
}
transformSections(context) {
if (context.dimension !== 'group') {
return context.currentSections;
}
// Transform: N reviews → M category groups
const groups = context.result.data.categories;
return groups.map(group => ({
content: group.reviews.join('\n\n'),
metadata: {
category: group.name,
count: group.count
}
}));
}
}
This handles: classify all items → group by category → analyze each group.
Performance Observations
These numbers are from my testing environment and will vary based on infrastructure, models, and batch sizes:
Small batch (20 reviews):
- API calls: ~50
- Time: ~4 seconds
- Pattern: Skip logic reduced unnecessary calls
Medium batch (100 items):
- API calls: ~290
- Time: ~4 seconds
- Gateway handled rate limits transparently
Cross-item analysis (10 reviews, 5 categories):
- API calls: 16 (10 classify + 1 group + 5 analyze)
- Time: ~10 seconds
Your mileage will vary.
When to Use This Approach
✅ Good fit:
- Batch LLM processing with dependencies
- Multi-model workflows
- Need built-in retry/fallback
- Want real-time cost tracking
- Processing reviews, emails, documents
- Requiring structured JSON outputs
❌ Not a good fit:
- Complex state machines → Use Temporal or LangGraph
- General data pipelines without LLMs → Use Prefect or Airflow
- Conversational agents with multi-turn state → Use LangChain
- Streaming responses → Not supported (coming)
- Simple sequential workflows → Use async/await
- Long-running jobs needing durable execution → Use Temporal
Tradeoffs
Advantages:
- Declarative dependency model
- Automatic parallelization
- Built-in LLM-specific features
Limitations:
- Requires thinking in DAG terms (learning curve)
- All responses must be JSON (by design)
Current Status
This is beta software. I use it in production, but be aware:
- ✅ Supported: Anthropic, OpenAI, Gemini
- 🚧 Coming: Azure OpenAI, Cohere
- 🚧 In development: Streaming support, enhanced observability
- 🎯 Needed: Better debugging tools for complex DAGs
Getting Started
npm install @dagengine/core
Minimal example:
import { DagEngine, Plugin } from '@dagengine/core';
class SimpleAnalyzer extends Plugin {
constructor() {
super('simple', 'Simple Analyzer', 'Analyze items');
this.dimensions = ['analysis'];
}
createPrompt(context) {
return `Analyze: ${context.sections[0].content}
Return JSON: { "result": "...", "score": 0.0-1.0 }`;
}
selectProvider() {
return {
provider: 'anthropic',
options: { model: 'claude-3-5-haiku-20241022' }
};
}
}
const engine = new DagEngine({
plugin: new SimpleAnalyzer(),
providers: {
anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
}
});
const result = await engine.process([
{ content: 'First item' },
{ content: 'Second item' }
]);
Start simple, add features as needed.
Resources
- 📦 GitHub: github.com/dagengine/dagengine
- 📖 Docs: dagengine.ai
- 💡 Examples: github.com/dagengine/dagengine/tree/main/examples
Conclusion
For batch LLM processing where you can express the workflow as dependencies between analyses, a DAG approach reduces orchestration code while handling parallelization, retries, and cost tracking automatically.
Whether it fits your use case depends on your specific requirements. The "When to Use" section should help you decide.
Feedback welcome via GitHub discussions or LinkedIn.
Top comments (1)
I think it something node js and ts community really need to have to simplify ai workflows