DEV Community

FuturMix
FuturMix

Posted on

How to Build a Multi-Model AI Pipeline in Python (Claude + GPT + DeepSeek)

Building an AI application with a single model is straightforward. Building one that uses the right model for each task — that's where the real engineering happens.

This tutorial walks through building a multi-model AI pipeline in Python that automatically routes requests to Claude, GPT, or DeepSeek based on task complexity, tracks costs, and handles failures gracefully.

Why Multi-Model?

Single-model architectures have a fundamental problem: you're either overpaying for simple tasks or underperforming on complex ones.

  • Claude Opus 4.7 ($5/$25 per 1M tokens) — Best reasoning, but expensive for simple tasks
  • Claude Sonnet 4.6 ($3/$15) — Great balance for most coding tasks
  • GPT-5.5 ($3/$12) — Excellent structured output
  • DeepSeek V3 ($0.27/$1.10) — 10x cheaper, good enough for bulk work

A smart pipeline uses all of them.

Architecture Overview

User Request
    │
    ▼
┌─────────────┐
│ Task Router  │ ← Classifies complexity
└─────────────┘
    │
    ├── Complex → Claude Opus 4.7
    ├── Standard → Claude Sonnet 4.6
    ├── Structured → GPT-5.5
    └── Bulk/Simple → DeepSeek V3
    │
    ▼
┌─────────────┐
│  Fallback    │ ← Auto-retry with backup model
└─────────────┘
    │
    ▼
┌─────────────┐
│ Cost Tracker │ ← Log usage per model
└─────────────┘
Enter fullscreen mode Exit fullscreen mode

The Code

Step 1: Unified Client Setup

Since all models are accessible through one OpenAI-compatible endpoint, the client setup is simple:

from openai import OpenAI
import time
import json

# One client for all models
client = OpenAI(
    base_url="https://api.futurmix.ai/v1",
    api_key="your-api-key"
)

# Model configs with pricing (per 1M tokens)
MODELS = {
    "complex": {
        "name": "claude-opus-4-7",
        "input_cost": 4.50,   # gateway price
        "output_cost": 22.50,
        "max_tokens": 4096
    },
    "standard": {
        "name": "claude-sonnet-4-6",
        "input_cost": 2.70,
        "output_cost": 13.50,
        "max_tokens": 4096
    },
    "structured": {
        "name": "gpt-5.5",
        "input_cost": 2.10,
        "output_cost": 8.40,
        "max_tokens": 4096
    },
    "bulk": {
        "name": "deepseek-chat",
        "input_cost": 0.19,
        "output_cost": 0.77,
        "max_tokens": 4096
    }
}

# Fallback chain: if primary fails, try next
FALLBACK_CHAIN = {
    "claude-opus-4-7": "claude-sonnet-4-6",
    "claude-sonnet-4-6": "gpt-5.5",
    "gpt-5.5": "claude-sonnet-4-6",
    "deepseek-chat": "gpt-5.5"
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Task Router

The router classifies tasks and picks the right model:

def classify_task(prompt: str) -> str:
    """Classify task complexity based on prompt analysis."""

    prompt_lower = prompt.lower()
    word_count = len(prompt.split())

    # Structured output indicators
    structured_keywords = ["json", "extract", "parse", "schema", "csv",
                          "structured", "format as", "return as"]
    if any(kw in prompt_lower for kw in structured_keywords):
        return "structured"

    # Complex task indicators
    complex_keywords = ["refactor", "architect", "design", "debug",
                       "race condition", "optimize", "security audit",
                       "explain why", "trade-offs", "compare approaches"]
    if any(kw in prompt_lower for kw in complex_keywords):
        return "complex"

    # Bulk/simple task indicators
    bulk_keywords = ["generate tests", "add docstrings", "translate",
                    "boilerplate", "template", "lint", "format",
                    "add comments", "rename"]
    if any(kw in prompt_lower for kw in bulk_keywords):
        return "bulk"

    # Default: standard
    # Long prompts (likely complex context) → upgrade
    if word_count > 500:
        return "complex"

    return "standard"
Enter fullscreen mode Exit fullscreen mode

Step 3: Cost Tracker

class CostTracker:
    def __init__(self):
        self.usage_log = []
        self.total_cost = 0.0

    def log(self, model: str, input_tokens: int, output_tokens: int):
        # Find model config
        config = None
        for tier in MODELS.values():
            if tier["name"] == model:
                config = tier
                break

        if not config:
            return

        cost = (input_tokens * config["input_cost"] / 1_000_000 +
                output_tokens * config["output_cost"] / 1_000_000)

        entry = {
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": round(cost, 6),
            "timestamp": time.time()
        }
        self.usage_log.append(entry)
        self.total_cost += cost

    def summary(self):
        by_model = {}
        for entry in self.usage_log:
            model = entry["model"]
            if model not in by_model:
                by_model[model] = {"calls": 0, "cost": 0, "tokens": 0}
            by_model[model]["calls"] += 1
            by_model[model]["cost"] += entry["cost"]
            by_model[model]["tokens"] += entry["input_tokens"] + entry["output_tokens"]

        return {
            "total_cost": round(self.total_cost, 4),
            "total_calls": len(self.usage_log),
            "by_model": by_model
        }

tracker = CostTracker()
Enter fullscreen mode Exit fullscreen mode

Step 4: The Pipeline

def call_model(prompt: str, model_name: str, system_prompt: str = None,
               max_retries: int = 2) -> dict:
    """Call a model with automatic fallback."""

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})

    current_model = model_name
    for attempt in range(max_retries + 1):
        try:
            response = client.chat.completions.create(
                model=current_model,
                messages=messages,
                max_tokens=4096,
                temperature=0.7
            )

            # Track usage
            usage = response.usage
            tracker.log(current_model, usage.prompt_tokens,
                       usage.completion_tokens)

            return {
                "content": response.choices[0].message.content,
                "model": current_model,
                "tokens": {
                    "input": usage.prompt_tokens,
                    "output": usage.completion_tokens
                }
            }

        except Exception as e:
            print(f"[{current_model}] Error: {e}")
            # Try fallback model
            if current_model in FALLBACK_CHAIN:
                current_model = FALLBACK_CHAIN[current_model]
                print(f"  → Falling back to {current_model}")
            else:
                raise

    raise Exception(f"All models failed for this request")


def pipeline(prompt: str, system_prompt: str = None) -> dict:
    """Main pipeline: classify task → route to model → return result."""

    task_type = classify_task(prompt)
    model_config = MODELS[task_type]

    print(f"Task classified as: {task_type}{model_config['name']}")

    result = call_model(prompt, model_config["name"], system_prompt)
    result["task_type"] = task_type
    return result
Enter fullscreen mode Exit fullscreen mode

Step 5: Usage Examples

# Complex architecture task → Claude Opus
result = pipeline(
    "Design a caching strategy for a multi-tenant SaaS application "
    "that handles 10K requests/second. Consider trade-offs between "
    "Redis cluster, local cache, and CDN caching."
)
print(f"Model used: {result['model']}")
# → claude-opus-4-7

# Structured extraction → GPT-5.5
result = pipeline(
    "Extract all function names, parameters, and return types from "
    "this Python file and return as JSON schema."
)
print(f"Model used: {result['model']}")
# → gpt-5.5

# Bulk generation → DeepSeek
result = pipeline(
    "Generate unit tests for all public methods in this class. "
    "Use pytest with parametrize for edge cases."
)
print(f"Model used: {result['model']}")
# → deepseek-chat

# Check costs
print(json.dumps(tracker.summary(), indent=2))
Enter fullscreen mode Exit fullscreen mode

Advanced: Streaming Support

For interactive applications, add streaming:

def stream_pipeline(prompt: str, system_prompt: str = None):
    """Streaming version of the pipeline."""

    task_type = classify_task(prompt)
    model_config = MODELS[task_type]

    messages = []
    if system_prompt:
        messages.append({"role": "system", "content": system_prompt})
    messages.append({"role": "user", "content": prompt})

    stream = client.chat.completions.create(
        model=model_config["name"],
        messages=messages,
        max_tokens=4096,
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content
Enter fullscreen mode Exit fullscreen mode

Advanced: Parallel Processing for Batch Tasks

When processing many items, run them in parallel with the cheap model:

import concurrent.futures

def batch_process(items: list, prompt_template: str,
                  max_workers: int = 5) -> list:
    """Process a batch of items with DeepSeek (cheapest model)."""

    def process_one(item):
        prompt = prompt_template.format(item=item)
        return call_model(prompt, "deepseek-chat")

    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(process_one, item): item
                  for item in items}

        for future in concurrent.futures.as_completed(futures):
            item = futures[future]
            try:
                result = future.result()
                results.append({"item": item, "result": result})
            except Exception as e:
                results.append({"item": item, "error": str(e)})

    return results

# Example: generate docstrings for 50 functions
functions = ["def calculate_tax(income, rate):", "def validate_email(email):", ...]
results = batch_process(
    functions,
    "Write a clear docstring for this Python function:\n{item}"
)
Enter fullscreen mode Exit fullscreen mode

Cost Impact

Here's what this architecture saves on a typical day of development (8 hours, ~200 API calls):

Approach Monthly Cost
Claude Opus for everything ~$450
Claude Sonnet for everything ~$270
Smart routing (this pipeline) ~$85

That's a 70% reduction from using Opus for everything, because most tasks don't need the most expensive model.

Production Considerations

  1. Add logging — Track which models handle which tasks so you can tune the classifier
  2. Set timeouts — Some models are slower; add timeout parameter to client calls
  3. Rate limiting — Implement token bucket per model to stay under API limits
  4. Caching — Cache responses for identical prompts (hash the prompt + model)
  5. Monitoring — Alert when fallback rate exceeds 5% (indicates provider issues)

Get Started

This pipeline works with any OpenAI-compatible API. FuturMix gives you one API key for 22+ models at 10-30% off — Claude, GPT, DeepSeek, Gemini, and more.

client = OpenAI(
    base_url="https://api.futurmix.ai/v1",
    api_key="your-key"
)
Enter fullscreen mode Exit fullscreen mode

The full code from this tutorial is ready to copy-paste and customize for your use case.


What's your multi-model setup? Share your routing strategies in the comments.

Top comments (0)