The agent had 12 pending sub-tasks. It picked them up in the order they were added. The first 8 were "gather background information" tasks. The last 4 were "draft the executive summary" tasks.
The user needed the executive summary. The agent spent 40 minutes gathering background before touching the summary. A priority queue would have delivered the summary in 10 minutes.
agent-task-queue is a priority queue for agent sub-tasks.
The Shape of the Fix
from agent_task_queue import TaskQueue, Task, Priority
queue = TaskQueue()
queue.push(Task(
id="task-001",
description="Gather competitor pricing data",
priority=Priority.LOW,
payload={"company": "Competitor A"},
))
queue.push(Task(
id="task-002",
description="Draft executive summary",
priority=Priority.HIGH,
payload={"sections": ["overview", "recommendations"]},
))
queue.push(Task(
id="task-003",
description="Research market trends",
priority=Priority.MEDIUM,
payload={"topic": "Q3 growth"},
))
# Always returns highest priority task first
while not queue.is_empty():
task = queue.pop()
print(f"Processing: [{task.priority.name}] {task.description}")
# Output:
# Processing: [HIGH] Draft executive summary
# Processing: [MEDIUM] Research market trends
# Processing: [LOW] Gather competitor pricing data
Tasks are processed in priority order, not insertion order.
What It Does NOT Do
agent-task-queue does not execute tasks. It manages ordering. You pop tasks from the queue and execute them however you like.
It does not persist the queue across process restarts. The queue is in-memory. For durable task queues, use a real task queue system (Celery, Redis Queue, etc.).
It does not handle task dependencies. Task A must complete before Task B can start — that is not expressed here. For dependency graphs, use agent-tool-graph.
Inside the Library
The implementation uses Python's heapq for efficient priority ordering:
import heapq
from dataclasses import dataclass, field
from enum import IntEnum
class Priority(IntEnum):
CRITICAL = 0 # lowest number = highest priority
HIGH = 1
MEDIUM = 2
LOW = 3
BACKGROUND = 4
@dataclass
class Task:
id: str
description: str
priority: Priority
payload: dict = field(default_factory=dict)
created_at: float = field(default_factory=time.monotonic)
class TaskQueue:
def __init__(self):
self._heap = [] # (priority, counter, task)
self._counter = 0 # tiebreaker for equal priorities
self._all_ids = set()
def push(self, task: Task) -> None:
if task.id in self._all_ids:
raise DuplicateTaskError(task.id)
heapq.heappush(self._heap, (task.priority, self._counter, task))
self._counter += 1
self._all_ids.add(task.id)
def pop(self) -> Task:
if not self._heap:
raise EmptyQueueError()
_, _, task = heapq.heappop(self._heap)
self._all_ids.discard(task.id)
return task
def peek(self) -> Task:
if not self._heap:
raise EmptyQueueError()
return self._heap[0][2]
The counter tiebreaker ensures FIFO ordering within the same priority level. Equal-priority tasks are processed in insertion order.
remove(task_id): cancel a pending task before it is processed. Implemented by marking removed IDs in a set and skipping them on pop.
update_priority(task_id, new_priority): change a task's priority while it is in the queue. Implemented by remove + re-push.
When to Use It
Use it for agents that decompose tasks into sub-tasks with different urgency levels. Research agents, planning agents, any agent that generates work items as it runs.
The priority levels are useful for distinguishing:
-
CRITICAL: user-blocking work that must complete before the agent returns -
HIGH: important deliverables the user explicitly requested -
MEDIUM: supporting research and context gathering -
LOW: nice-to-have background information -
BACKGROUND: speculative pre-fetching
Skip it for agents with simple, linear task sequences where each task is equally important. A queue adds overhead that is not worth it for three tasks.
Install
pip install git+https://github.com/MukundaKatta/agent-task-queue
from agent_task_queue import TaskQueue, Task, Priority
queue = TaskQueue()
def process_user_request(request: str) -> str:
# Agent decomposes the request
subtasks = decompose(request)
for subtask in subtasks:
queue.push(Task(
id=f"sub-{uuid4()}",
description=subtask.description,
priority=classify_priority(subtask),
payload=subtask.data,
))
results = []
while not queue.is_empty():
task = queue.pop()
result = execute_subtask(task)
results.append(result)
# High priority tasks may generate more high priority work
if result.followup_tasks:
for ft in result.followup_tasks:
queue.push(Task(id=ft.id, description=ft.desc, priority=Priority.HIGH))
return aggregate_results(results)
Sibling Libraries
| Library | What it solves |
|---|---|
agent-tool-graph |
Declarative tool prerequisites and dependency resolution |
agent-resume |
Checkpoint/resume long-running job processing |
agent-deadline |
Time-bound task processing |
llm-stop-conditions |
Stop the processing loop at task limits |
agent-scratchpad |
Share intermediate state between tasks |
The combination: agent-task-queue for priority ordering, agent-resume for crash recovery (checkpoint completed task IDs), agent-deadline to stop when the time budget runs out.
What's Next
Scheduled tasks: queue.push(task, run_after=time.time() + 300) to queue a task that should not be popped until a future time. Useful for rate-limited subtasks that need to wait before retrying.
Task grouping: a group_id field that lets you cancel or reprioritize all tasks in a group at once. Useful when the user cancels a high-level request and you need to drop all its sub-tasks.
Max queue size with overflow handling: TaskQueue(max_size=100, overflow="drop_lowest") that drops the lowest-priority pending task when capacity is exceeded.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)