DEV Community

Domonique Luchin
Domonique Luchin

Posted on

How I use Claude Code to dispatch agent tasks from a Supabase queue table

I run 6 AI-powered businesses from a single Vultr VPS. Each business needs different types of agents - some handle customer calls, others process documents, and a few manage data workflows.

The challenge? Coordinating all these agents without chaos.

My solution: a Supabase queue table that feeds tasks to Claude-powered dispatchers. Here's exactly how I built it.

The Queue Table Structure

First, I created a simple queue table in Supabase:

CREATE TABLE task_queue (
  id SERIAL PRIMARY KEY,
  business_id VARCHAR(50) NOT NULL,
  task_type VARCHAR(100) NOT NULL,
  payload JSONB NOT NULL,
  status VARCHAR(20) DEFAULT 'pending',
  priority INTEGER DEFAULT 5,
  created_at TIMESTAMP DEFAULT NOW(),
  assigned_at TIMESTAMP,
  completed_at TIMESTAMP
);

CREATE INDEX idx_queue_status_priority ON task_queue(status, priority DESC);
Enter fullscreen mode Exit fullscreen mode

The business_id field maps to my 6 different companies. Task types include "customer_call", "document_review", "data_sync", and "lead_qualification".

Claude as the Dispatcher

I use Claude's structured output to analyze incoming tasks and assign them to the right agents. Here's my dispatcher function:

import asyncio
from supabase import create_client
from anthropic import Anthropic

class TaskDispatcher:
    def __init__(self):
        self.supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
        self.claude = Anthropic(api_key=CLAUDE_API_KEY)

    async def process_queue(self):
        # Get pending tasks ordered by priority
        response = self.supabase.table('task_queue').select('*').eq('status', 'pending').order('priority', desc=True).limit(10).execute()

        for task in response.data:
            await self.dispatch_task(task)

    async def dispatch_task(self, task):
        # Use Claude to determine the best agent
        prompt = f"""
        Analyze this task and determine the optimal agent assignment:

        Business: {task['business_id']}
        Task Type: {task['task_type']}
        Payload: {task['payload']}

        Return JSON with:
        - agent_type: specific agent category
        - estimated_duration: minutes
        - requirements: list of needed resources
        """

        response = self.claude.messages.create(
            model="claude-3-sonnet-20240229",
            max_tokens=500,
            messages=[{"role": "user", "content": prompt}]
        )

        # Parse Claude's response and route accordingly
        routing_info = json.loads(response.content[0].text)
        await self.assign_to_agent(task, routing_info)
Enter fullscreen mode Exit fullscreen mode

Agent Assignment Logic

Claude doesn't just pick agents randomly. I trained it on my specific business requirements:

  • Real Estate agents: Handle property inquiries, schedule showings
  • Construction agents: Process RFPs, manage vendor communications
  • Document agents: Review contracts, extract key data points
  • Call agents: Handle inbound sales, customer support

The dispatcher updates the queue table with assignment details:

async def assign_to_agent(self, task, routing_info):
    # Mark task as assigned
    self.supabase.table('task_queue').update({
        'status': 'assigned',
        'assigned_at': 'NOW()',
        'agent_type': routing_info['agent_type'],
        'estimated_duration': routing_info['estimated_duration']
    }).eq('id', task['id']).execute()

    # Send to appropriate agent queue
    await self.route_to_agent_system(task, routing_info)
Enter fullscreen mode Exit fullscreen mode

Real Performance Numbers

After 3 months of running this system:

  • Average task assignment time: 2.3 seconds
  • Queue processing rate: 150 tasks per minute
  • Agent utilization: 78% (up from 45% with manual routing)
  • Failed assignments: 0.8%

The key insight: Claude excels at understanding context and making routing decisions that would take me hours to code manually.

Handling Task Failures

Not every agent completes their task successfully. My system automatically retries failed tasks:

async def handle_failed_task(self, task_id, error_reason):
    # Get current task
    task = self.supabase.table('task_queue').select('*').eq('id', task_id).single().execute()

    # Use Claude to analyze the failure
    analysis_prompt = f"""
    Task failed with error: {error_reason}
    Original task: {task.data}

    Should we:
    1. Retry with same agent
    2. Reassign to different agent type
    3. Mark as failed and alert human

    Provide reasoning and recommendation.
    """

    # Get Claude's recommendation and act accordingly
Enter fullscreen mode Exit fullscreen mode

This failure analysis catches 90% of issues before they need human intervention.

Cost Breakdown

Running this dispatcher costs me $23 per month:

  • Claude API calls: $18
  • Supabase compute: $5
  • VPS overhead: $0 (shared with other services)

Compare that to enterprise task management platforms at $200+ monthly.

Your Next Steps

Start simple. Create a basic queue table in Supabase and connect it to Claude for task analysis. You don't need 6 businesses to benefit from intelligent task routing.

The real power comes from teaching Claude your specific business logic through examples and clear prompts. Your agents will work smarter, not just harder.

What tasks are you manually routing that Claude could handle better?

Top comments (0)