DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Priority Queue for Agent Sub-Tasks: Stop Processing Low-Priority Work First

The agent had 12 pending sub-tasks. It picked them up in the order they were added. The first 8 were "gather background information" tasks. The last 4 were "draft the executive summary" tasks.

The user needed the executive summary. The agent spent 40 minutes gathering background before touching the summary. A priority queue would have delivered the summary in 10 minutes.

agent-task-queue is a priority queue for agent sub-tasks.


The Shape of the Fix

from agent_task_queue import TaskQueue, Task, Priority

queue = TaskQueue()

queue.push(Task(
    id="task-001",
    description="Gather competitor pricing data",
    priority=Priority.LOW,
    payload={"company": "Competitor A"},
))

queue.push(Task(
    id="task-002",
    description="Draft executive summary",
    priority=Priority.HIGH,
    payload={"sections": ["overview", "recommendations"]},
))

queue.push(Task(
    id="task-003",
    description="Research market trends",
    priority=Priority.MEDIUM,
    payload={"topic": "Q3 growth"},
))

# Always returns highest priority task first
while not queue.is_empty():
    task = queue.pop()
    print(f"Processing: [{task.priority.name}] {task.description}")

# Output:
# Processing: [HIGH] Draft executive summary
# Processing: [MEDIUM] Research market trends
# Processing: [LOW] Gather competitor pricing data
Enter fullscreen mode Exit fullscreen mode

Tasks are processed in priority order, not insertion order.


What It Does NOT Do

agent-task-queue does not execute tasks. It manages ordering. You pop tasks from the queue and execute them however you like.

It does not persist the queue across process restarts. The queue is in-memory. For durable task queues, use a real task queue system (Celery, Redis Queue, etc.).

It does not handle task dependencies. Task A must complete before Task B can start — that is not expressed here. For dependency graphs, use agent-tool-graph.


Inside the Library

The implementation uses Python's heapq for efficient priority ordering:

import heapq
from dataclasses import dataclass, field
from enum import IntEnum

class Priority(IntEnum):
    CRITICAL = 0  # lowest number = highest priority
    HIGH = 1
    MEDIUM = 2
    LOW = 3
    BACKGROUND = 4

@dataclass
class Task:
    id: str
    description: str
    priority: Priority
    payload: dict = field(default_factory=dict)
    created_at: float = field(default_factory=time.monotonic)

class TaskQueue:
    def __init__(self):
        self._heap = []  # (priority, counter, task)
        self._counter = 0  # tiebreaker for equal priorities
        self._all_ids = set()

    def push(self, task: Task) -> None:
        if task.id in self._all_ids:
            raise DuplicateTaskError(task.id)
        heapq.heappush(self._heap, (task.priority, self._counter, task))
        self._counter += 1
        self._all_ids.add(task.id)

    def pop(self) -> Task:
        if not self._heap:
            raise EmptyQueueError()
        _, _, task = heapq.heappop(self._heap)
        self._all_ids.discard(task.id)
        return task

    def peek(self) -> Task:
        if not self._heap:
            raise EmptyQueueError()
        return self._heap[0][2]
Enter fullscreen mode Exit fullscreen mode

The counter tiebreaker ensures FIFO ordering within the same priority level. Equal-priority tasks are processed in insertion order.

remove(task_id): cancel a pending task before it is processed. Implemented by marking removed IDs in a set and skipping them on pop.

update_priority(task_id, new_priority): change a task's priority while it is in the queue. Implemented by remove + re-push.


When to Use It

Use it for agents that decompose tasks into sub-tasks with different urgency levels. Research agents, planning agents, any agent that generates work items as it runs.

The priority levels are useful for distinguishing:

  • CRITICAL: user-blocking work that must complete before the agent returns
  • HIGH: important deliverables the user explicitly requested
  • MEDIUM: supporting research and context gathering
  • LOW: nice-to-have background information
  • BACKGROUND: speculative pre-fetching

Skip it for agents with simple, linear task sequences where each task is equally important. A queue adds overhead that is not worth it for three tasks.


Install

pip install git+https://github.com/MukundaKatta/agent-task-queue
Enter fullscreen mode Exit fullscreen mode
from agent_task_queue import TaskQueue, Task, Priority

queue = TaskQueue()

def process_user_request(request: str) -> str:
    # Agent decomposes the request
    subtasks = decompose(request)

    for subtask in subtasks:
        queue.push(Task(
            id=f"sub-{uuid4()}",
            description=subtask.description,
            priority=classify_priority(subtask),
            payload=subtask.data,
        ))

    results = []
    while not queue.is_empty():
        task = queue.pop()
        result = execute_subtask(task)
        results.append(result)

        # High priority tasks may generate more high priority work
        if result.followup_tasks:
            for ft in result.followup_tasks:
                queue.push(Task(id=ft.id, description=ft.desc, priority=Priority.HIGH))

    return aggregate_results(results)
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
agent-tool-graph Declarative tool prerequisites and dependency resolution
agent-resume Checkpoint/resume long-running job processing
agent-deadline Time-bound task processing
llm-stop-conditions Stop the processing loop at task limits
agent-scratchpad Share intermediate state between tasks

The combination: agent-task-queue for priority ordering, agent-resume for crash recovery (checkpoint completed task IDs), agent-deadline to stop when the time budget runs out.


What's Next

Scheduled tasks: queue.push(task, run_after=time.time() + 300) to queue a task that should not be popped until a future time. Useful for rate-limited subtasks that need to wait before retrying.

Task grouping: a group_id field that lets you cancel or reprioritize all tasks in a group at once. Useful when the user cancels a high-level request and you need to drop all its sub-tasks.

Max queue size with overflow handling: TaskQueue(max_size=100, overflow="drop_lowest") that drops the lowest-priority pending task when capacity is exceeded.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)