DEV Community

Egeo Minotti
Egeo Minotti

Posted on

Building Multi-Stage LLM Pipelines with Job Dependencies

Modern AI apps don't call a single LLM. They chain multiple steps: embed a query, search a vector DB, generate a response. Each step depends on the previous one.

The Pattern

┌─────────┐     ┌─────────┐     ┌──────────┐
│  Embed  │ ──▶ │ Search  │ ──▶ │ Generate │
└─────────┘     └─────────┘     └──────────┘
Enter fullscreen mode Exit fullscreen mode

The Code

import { Queue, Worker } from 'flashq';
import OpenAI from 'openai';

const openai = new OpenAI();
const queue = new Queue('ai-pipeline');

// Define the pipeline
async function ask(question: string) {
  const embed = await queue.add('embed', { text: question });
  const search = await queue.add('search', { topK: 5 }, { dependsOn: [embed.id] });
  const generate = await queue.add('generate', {}, { dependsOn: [search.id] });

  return queue.finished(generate.id);
}

// Worker handles all stages
new Worker('ai-pipeline', async (job) => {
  switch (job.name) {
    case 'embed':
      const embed = await openai.embeddings.create({
        input: job.data.text,
        model: 'text-embedding-3-small'
      });
      return { embedding: embed.data[0].embedding, text: job.data.text };

    case 'search':
      const results = await vectorDB.search({
        vector: job.parentResult.embedding,
        topK: job.data.topK
      });
      return { documents: results, query: job.parentResult.text };

    case 'generate':
      const context = job.parentResult.documents.map(d => d.content).join('\n\n');
      const response = await openai.chat.completions.create({
        model: 'gpt-4',
        messages: [
          { role: 'system', content: 'Answer based on context.' },
          { role: 'user', content: `Context:\n${context}\n\nQuestion: ${job.parentResult.query}` }
        ]
      });
      return response.choices[0].message.content;
  }
});

// Usage
const answer = await ask('What is the capital of France?');
Enter fullscreen mode Exit fullscreen mode

https://github.com/egeominotti/flashq
https://flashq.dev/

Key Points

  • dependsOn: [jobId] chains jobs in sequence
  • job.parentResult accesses the previous job's output
  • queue.finished(id) waits for the entire pipeline
  • Failed jobs stop the pipeline — configure attempts for retries

Works great for RAG, document processing, content generation, or any multi-step AI workflow.

Top comments (0)