DEV Community

Cover image for A DAG-Based Approach to LLM Workflow Orchestration
Ivan Holovach
Ivan Holovach

Posted on

A DAG-Based Approach to LLM Workflow Orchestration

When processing large batches of items through multiple LLM analyses, you quickly run into orchestration complexity. Consider a customer review pipeline that needs to:

  • Run sentiment analysis and topic extraction (independent tasks)
  • Generate summaries that depend on both sentiment and topics
  • Filter spam before expensive operations
  • Handle failures gracefully
  • Route to different models based on cost/capability
  • Track spending in real-time

The Problem

The typical progression:

Sequential approach - Simple but slow:

for (const review of reviews) {
  const sentiment = await runSentiment(review);
  const topics = await runTopics(review);
  const summary = await runSummary(review, sentiment, topics);
}
Enter fullscreen mode Exit fullscreen mode

Parallel approach - Faster but fragile:

const [sentiments, topics] = await Promise.all([
  Promise.all(reviews.map(r => runSentiment(r))),
  Promise.all(reviews.map(r => runTopics(r)))
]);

const summaries = await Promise.all(
  reviews.map((r, i) => runSummary(r, sentiments[i], topics[i]))
);
Enter fullscreen mode Exit fullscreen mode

This works until you need error handling, retries, rate limiting, concurrency controls, and cost tracking. At scale, you end up with substantial orchestration code.

Existing Solutions

Several tools address this:

  • LangChain/LCEL - Has parallel execution with RunnableParallel, good for chaining operations
  • Prefect/Temporal - Durable workflow execution with state persistence, excellent for long-running workflows but requires infrastructure
  • LangGraph - State-based approach that can model DAGs through state transitions

These are production-ready tools. The gap I found: I wanted pure dependency-based orchestration focused specifically on LLM batch processing, without the infrastructure overhead or state management abstractions.

The DAG Approach

The core observation: for batch LLM processing, you're primarily describing analysis dependencies:

sentiment ─┐
           ├─→ summary
topics ────┘
Enter fullscreen mode Exit fullscreen mode

Dependencies define execution order. Parallelization emerges automatically.

I built dagengine around this principle.

Implementation

Here's the complete working example:

import { DagEngine, Plugin } from '@dagengine/core';

class ReviewAnalyzer extends Plugin {
  constructor() {
    super('analyzer', 'Review Analyzer', 'Analyze reviews');
    this.dimensions = ['sentiment', 'topics', 'summary'];
  }

  // Declare dependencies
  defineDependencies() {
    return {
      summary: ['sentiment', 'topics']
    };
  }

  // Define prompts for each dimension
  createPrompt(context) {
    const review = context.sections[0]?.content;

    if (context.dimension === 'sentiment') {
      return `Analyze sentiment: "${review}"

Return JSON: {
  "sentiment": "positive|negative|neutral",
  "confidence": 0.0-1.0
}`;
    }

    if (context.dimension === 'topics') {
      return `Extract topics: "${review}"

Return JSON: {
  "topics": ["topic1", "topic2", ...]
}`;
    }

    if (context.dimension === 'summary') {
      const sentiment = context.dependencies.sentiment.data;
      const topics = context.dependencies.topics.data;

      return `Summarize with context:
- Sentiment: ${sentiment.sentiment} (${sentiment.confidence})
- Topics: ${topics.topics.join(', ')}
- Review: "${review}"

Return JSON: {
  "summary": "...",
  "key_points": ["point1", "point2", ...]
}`;
    }
  }

  // Select model
  selectProvider() {
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-5-haiku-20241022' }
    };
  }
}

// Run
const engine = new DagEngine({
  plugin: new ReviewAnalyzer(),
  providers: {
    anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
  }
});

const result = await engine.process(reviews);
Enter fullscreen mode Exit fullscreen mode

What happens automatically:

  • ✅ Parallel execution of independent dimensions
  • ✅ Dependency waiting
  • ✅ Concurrent section processing (configurable batch size)
  • ✅ Automatic retries with exponential backoff
  • ✅ Cost tracking
  • ✅ JSON parsing and validation

What you control:

  • Dimensions to run
  • Dependencies between them
  • Prompt generation
  • Model selection

Design Choice: JSON Responses

The engine expects JSON responses. Include "Return JSON: {schema}" in prompts.

Rationale:

  • Works across all providers without provider-specific syntax
  • Reliable parsing without regex
  • Clean dependency data access
  • Enables skip logic based on structured fields
  • Consistent cost tracking

The engine automatically parses responses and makes them available in context.dependencies[dimension].data.

Practical Features

1. Skip Logic for Cost Savings

class ReviewAnalyzer extends Plugin {
  constructor() {
    super('analyzer', 'Review Analyzer', 'Analyze reviews');
    this.dimensions = [
      'filter_spam',
      'sentiment',
      'topics',
      'summary'
    ];
  }

  defineDependencies() {
    return {
      sentiment: ['filter_spam'],
      topics: ['filter_spam'],
      summary: ['sentiment', 'topics']
    };
  }

  shouldSkipSectionDimension(context) {
    if (context.dimension === 'sentiment' || 
        context.dimension === 'topics') {
      const spam = context.dependencies.filter_spam?.data;
      return spam?.is_spam === true;
    }
    return false;
  }
}
Enter fullscreen mode Exit fullscreen mode

In testing with 20 reviews where 10 were spam, this skipped 20 unnecessary API calls.

2. Multi-Model Routing

selectProvider(context) {
  // Cheap model for filtering
  if (context.dimension === 'filter_spam') {
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-5-haiku-20241022' }
    };
  }

  // More capable model for analysis
  return {
    provider: 'anthropic',
    options: { model: 'claude-3-7-sonnet-20250219' },
    fallbacks: [
      { 
        provider: 'openai',
        options: { model: 'gpt-4o' }
      }
    ]
  };
}
Enter fullscreen mode Exit fullscreen mode

Using Haiku ($0.80/1M input) for filtering and Sonnet ($3/1M input) for analysis reduces costs compared to using Sonnet for everything.

3. Gateway Integration

const engine = new DagEngine({
  plugin: new ReviewAnalyzer(),
  providers: {
    anthropic: {
      apiKey: process.env.ANTHROPIC_API_KEY,
      gateway: 'portkey',
      gatewayApiKey: process.env.PORTKEY_API_KEY,
      gatewayConfig: {
        retry: { 
          attempts: 5,
          on_status_codes: [429, 500, 502, 503, 504]
        }
      }
    }
  }
});
Enter fullscreen mode Exit fullscreen mode

Portkey handles rate limiting and retries transparently.

4. Partial Results on Failures

const engine = new DagEngine({
  plugin: new ReviewAnalyzer(),
  providers: { /* ... */ },
  execution: {
    maxRetries: 3,
    retryDelay: 1000,
    continueOnError: true
  }
});

// Provide fallback values
class ReviewAnalyzer extends Plugin {
  async handleDimensionFailure(context) {
    return {
      data: { 
        sentiment: 'neutral',
        confidence: 0.0
      },
      metadata: { 
        fallback: true,
        error: context.error.message
      }
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

If one item fails, you still get results for the rest.

5. Cost Tracking

const result = await engine.process(reviews);

console.log(result.costs);
// {
//   totalCost: 0.0282,
//   totalTokens: 12289,
//   byDimension: {
//     filter_spam: { 
//       cost: 0.0115,
//       tokens: { input: 6548, output: 1559 }
//     },
//     ...
//   }
// }
Enter fullscreen mode Exit fullscreen mode

Cross-Item Analysis Pattern

For operations that need to aggregate across all items:

class CategoryAnalyzer extends Plugin {
  constructor() {
    super('category', 'Category Analyzer', 'Analyze by category');
    this.dimensions = [
      'classify',
      { name: 'group', scope: 'global' },
      'analyze'
    ];
  }

  defineDependencies() {
    return {
      group: ['classify'],
      analyze: ['group']
    };
  }

  createPrompt(context) {
    if (context.dimension === 'classify') {
      return `Classify: "${context.sections[0].content}"
Return JSON: { "category": "product|service|delivery|other" }`;
    }

    if (context.dimension === 'group') {
      // Access all classification results
      const allClassifications = context.sections.map((section, i) => ({
        review: section.content,
        category: context.results.classify[i].data.category
      }));

      return `Group ${allClassifications.length} reviews by category.
${JSON.stringify(allClassifications, null, 2)}

Return JSON: { "categories": [...] }`;
    }

    if (context.dimension === 'analyze') {
      const category = context.sections[0].metadata.category;
      return `Analyze ${category} reviews:
${context.sections[0].content}

Return JSON: { "insights": [...], "themes": [...] }`;
    }
  }

  transformSections(context) {
    if (context.dimension !== 'group') {
      return context.currentSections;
    }

    // Transform: N reviews → M category groups
    const groups = context.result.data.categories;
    return groups.map(group => ({
      content: group.reviews.join('\n\n'),
      metadata: { 
        category: group.name,
        count: group.count
      }
    }));
  }
}
Enter fullscreen mode Exit fullscreen mode

This handles: classify all items → group by category → analyze each group.

Performance Observations

These numbers are from my testing environment and will vary based on infrastructure, models, and batch sizes:

Small batch (20 reviews):

  • API calls: ~50
  • Time: ~4 seconds
  • Pattern: Skip logic reduced unnecessary calls

Medium batch (100 items):

  • API calls: ~290
  • Time: ~4 seconds
  • Gateway handled rate limits transparently

Cross-item analysis (10 reviews, 5 categories):

  • API calls: 16 (10 classify + 1 group + 5 analyze)
  • Time: ~10 seconds

Your mileage will vary.

When to Use This Approach

✅ Good fit:

  • Batch LLM processing with dependencies
  • Multi-model workflows
  • Need built-in retry/fallback
  • Want real-time cost tracking
  • Processing reviews, emails, documents
  • Requiring structured JSON outputs

❌ Not a good fit:

  • Complex state machines → Use Temporal or LangGraph
  • General data pipelines without LLMs → Use Prefect or Airflow
  • Conversational agents with multi-turn state → Use LangChain
  • Streaming responses → Not supported (coming)
  • Simple sequential workflows → Use async/await
  • Long-running jobs needing durable execution → Use Temporal

Tradeoffs

Advantages:

  • Declarative dependency model
  • Automatic parallelization
  • Built-in LLM-specific features

Limitations:

  • Requires thinking in DAG terms (learning curve)
  • All responses must be JSON (by design)

Current Status

This is beta software. I use it in production, but be aware:

  • ✅ Supported: Anthropic, OpenAI, Gemini
  • 🚧 Coming: Azure OpenAI, Cohere
  • 🚧 In development: Streaming support, enhanced observability
  • 🎯 Needed: Better debugging tools for complex DAGs

Getting Started

npm install @dagengine/core
Enter fullscreen mode Exit fullscreen mode

Minimal example:

import { DagEngine, Plugin } from '@dagengine/core';

class SimpleAnalyzer extends Plugin {
  constructor() {
    super('simple', 'Simple Analyzer', 'Analyze items');
    this.dimensions = ['analysis'];
  }

  createPrompt(context) {
    return `Analyze: ${context.sections[0].content}
Return JSON: { "result": "...", "score": 0.0-1.0 }`;
  }

  selectProvider() {
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-5-haiku-20241022' }
    };
  }
}

const engine = new DagEngine({
  plugin: new SimpleAnalyzer(),
  providers: {
    anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
  }
});

const result = await engine.process([
  { content: 'First item' },
  { content: 'Second item' }
]);
Enter fullscreen mode Exit fullscreen mode

Start simple, add features as needed.

Resources

Conclusion

For batch LLM processing where you can express the workflow as dependencies between analyses, a DAG approach reduces orchestration code while handling parallelization, retries, and cost tracking automatically.

Whether it fits your use case depends on your specific requirements. The "When to Use" section should help you decide.


Feedback welcome via GitHub discussions or LinkedIn.

Top comments (1)

Collapse
 
__4c6adf48249428 profile image
Дима Знает

I think it something node js and ts community really need to have to simplify ai workflows