DEV Community

Cover image for I kept rewriting AI orchestration code, so I built dagengine
Ivan Holovach
Ivan Holovach

Posted on

I kept rewriting AI orchestration code, so I built dagengine

Stop me if you've heard this one:

You're processing data with AI. You write some async code. It works.

Then you need to:

  • Add another analysis
  • Make some run in parallel
  • Add retry logic
  • Track costs

You rewrite 50 lines. Again. And again.

I did this five times before building dagengine.


The Problem

I was analyzing my book manuscript—checking emotional pacing across 30 chapters. Each chapter needed multiple analyses: emotion score, readability, consistency checks.

First attempt: Sequential processing. 45 seconds per chapter × 30 chapters = 22.5 minutes.

Then I tried Promise.all():

await Promise.all([
  analyzeEmotion(chapter).catch(retry),
  checkReadability(chapter).catch(retry),
  verifyConsistency(chapter).catch(retry),
  // ... more analyses
])
Enter fullscreen mode Exit fullscreen mode

200 lines of orchestration code. Manual retry logic everywhere. Every new analysis meant rewriting coordination.

Week 5, refactoring again: This isn't a book problem.


The Pattern

Every AI workflow has the same structure:

  • Multiple analyses needed
  • Some can run in parallel
  • Some depend on others
  • All need retries
  • All need cost tracking

I was solving a general problem with one-off code.


What I Built

dagengine handles the plumbing. You define what you want analyzed and what depends on what. The engine figures out execution order and parallelization.

Here's the concept:

import { DagEngine, Plugin, type PromptContext, type ProviderSelection } from '@dagengine/core';

// Define result types (TypeScript helps!)
interface SentimentResult {
  sentiment: "positive" | "negative" | "neutral";
  score: number;
}

class ReviewAnalyzer extends Plugin {
  constructor() {
    super('review-analyzer', 'Review Analyzer', 'Analyze customer reviews');

    // Define your analysis tasks
    this.dimensions = [
      'filter_spam',      // Stage 1: Filter
      'sentiment',        // Stage 2: Parallel
      'categorize',       // Stage 2: Parallel
      'analyze_category'  // Stage 3: Deep analysis
    ];
  }

  // Declare what depends on what
  defineDependencies(): Record<string, string[]> {
    return {
      sentiment: ['filter_spam'],
      categorize: ['filter_spam'],
      analyze_category: ['categorize']
    };
  }

  // Build prompts (engine calls this for each dimension)
  createPrompt(context: PromptContext): string {
    const content = context.sections[0]?.content || '';

    if (context.dimension === 'filter_spam') {
      return `Is this spam? "${content}"
Return JSON: { "is_spam": boolean, "confidence": 0-1 }`;
    }

    if (context.dimension === 'sentiment') {
      return `Analyze sentiment: "${content}"
Return JSON: { "sentiment": "positive|negative|neutral", "score": 0-1 }`;
    }

    return '';
  }

  // Choose AI model per dimension
  selectProvider(dimension: string): ProviderSelection {
    // Cheap model for filtering (Haiku: $0.80/$4 per 1M tokens)
    if (dimension === 'filter_spam' || 
        dimension === 'sentiment' || 
        dimension === 'categorize') {
      return {
        provider: 'anthropic',
        options: { model: 'claude-3-5-haiku-20241022' }
      };
    }

    // Powerful model for deep analysis (Sonnet: $3/$15 per 1M tokens)
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-7-sonnet-20250219' }
    };
  }

  // Skip expensive analysis on spam
  shouldSkipSectionDimension(context: any): boolean {
    if (context.dimension === 'sentiment' || context.dimension === 'categorize') {
      const spam = context.dependencies.filter_spam?.data?.is_spam;
      return spam;
    }
    return false;
  }
}
Enter fullscreen mode Exit fullscreen mode

The engine analyzes the dependency graph and automatically:

  • Runs independent tasks in parallel (sentiment + categorize)
  • Waits for dependencies before execution
  • Retries failures with exponential backoff
  • Tracks costs per dimension

Full code example on GitHub →


How It Executes

The DAG determines execution order:

filter_spam (20 reviews)
    ↓
    ├─→ sentiment (parallel)
    └─→ categorize (parallel)
    ↓
group_by_category (10 reviews → 5 groups)
    ↓
analyze_category (5 groups)
Enter fullscreen mode Exit fullscreen mode

No manual coordination. No Promise.all() hell.


Skip Logic Saves Money

Here's the key optimization - skip expensive analysis on spam:

shouldSkipSectionDimension(context: any): boolean {
  if (context.dimension === 'sentiment' || context.dimension === 'categorize') {
    const spam = context.dependencies.filter_spam?.data?.is_spam;
    return spam;  // Skip downstream analysis
  }
  return false;
}
Enter fullscreen mode Exit fullscreen mode

Real impact: This saved 20 API calls (10 spam reviews × 2 dimensions) = 30% efficiency gain


Multi-Model Routing

Use cheap models for filtering, expensive models for analysis:

selectProvider(dimension: string): ProviderSelection {
  // Cheap: Haiku at $0.80/$4 per 1M tokens
  if (dimension === 'filter_spam' || 
      dimension === 'sentiment' || 
      dimension === 'categorize') {
    return {
      provider: 'anthropic',
      options: { model: 'claude-3-5-haiku-20241022' }
    };
  }

  // Powerful: Sonnet at $3/$15 per 1M tokens
  return {
    provider: 'anthropic',
    options: { model: 'claude-3-7-sonnet-20250219' }
  };
}
Enter fullscreen mode Exit fullscreen mode

Real Numbers

Processing 20 customer reviews through this pipeline:

✅ Spam filter detected 10 spam reviews

✅ Skip logic prevented 20 unnecessary API calls

✅ Multi-model routing: Haiku for filtering, Sonnet for deep analysis

Total cost: $0.0282 in 24 seconds

Compare to alternatives:

  • Using only Sonnet: $0.094 (3.3× more expensive)
  • No skip logic: 20 extra API calls wasted on spam

Key insight: 70% cost savings from smart model routing + skip logic

Using the Results

// Create and run engine
const engine = new DagEngine({
  plugin: new ReviewAnalyzer(),
  providers: {
    anthropic: { apiKey: process.env.ANTHROPIC_API_KEY }
  },
  execution: {
    pricing: {
      models: {
        'claude-3-5-haiku-20241022': { inputPer1M: 0.80, outputPer1M: 4.00 },
        'claude-3-7-sonnet-20250219': { inputPer1M: 3.00, outputPer1M: 15.00 }
      }
    }
  }
});

const result = await engine.process(reviews);

// Access results with type safety
result.sections.forEach((section) => {
  const sentiment = section.results.sentiment?.data as SentimentResult;
  console.log(`Review: ${sentiment.sentiment}`);
});

// Check costs
console.log(`Total cost: $${result.stats.totalCost}`);
console.log(`Total tokens: ${result.stats.totalTokens}`);
Enter fullscreen mode Exit fullscreen mode

No spreadsheets. No manual token counting.

Key Features

Automatic parallelization - DAG determines execution order

Skip logic - Avoid wasting API calls on low-quality data

Multi-model routing - Cheap models for filtering, expensive for analysis

Built-in retries - 3 automatic retries with exponential backoff

Cost tracking - Real-time breakdown by dimension and model

Full TypeScript - Type-safe with excellent IDE support

Data transformations - Reshape data between stages (10 reviews → 5 groups)

Multi-provider - Anthropic, OpenAI, Google Gemini

Try It

npm install @dagengine/core
Enter fullscreen mode Exit fullscreen mode

Need help?

💬 Ask a question

📖 Read the docs

🐛 Report bugs

💡 Request features

Star on GitHub

It's beta. I use it daily. Looking for feedback on what breaks for your use case.

Not asking for trust. Asking you to break it and tell me what's wrong.

What AI workflows are you building that could benefit from parallelization? Drop a comment below! 👇

What's Next?

Want to see more advanced patterns?

📖 Section vs Global dimensions - Handle per-item and cross-item analysis

High-volume processing - 100 parallel requests with Portkey gateway

Check out the full examples on GitHub.

Top comments (0)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.