I was running a parallel evaluation. 200 test cases. Each one spun up its own AsyncAnthropic client, built its messages, and fired messages.create. All 200 started at roughly the same time.
They all failed at roughly the same time too.
The error was 429 Too Many Requests. Rate limit. I had launched a small stampede.
My first fix was await asyncio.sleep(random.uniform(0, 2)) before each call. That spread the load a little. The eval finished. It also took four times longer. What had been a two-minute job turned into eight minutes, and the jitter meant some cases waited the full two seconds for no reason at all.
I tried batching them manually. I split the 200 into groups of 20, ran each group with asyncio.gather, then slept between groups. This worked but now I had batch-size logic tangled into the evaluation code. Change the model and I had to recalibrate the group size by hand.
The underlying problem is coordination. Two hundred coroutines each want to call the API. None of them knows what the others are doing. The fix is a shared dispatcher that knows about all of them.
That is what llmfleet is.
The shape of the fix
You replace direct client calls with FleetDispatcher.submit(). The dispatcher collects calls from all your coroutines and sends them as a coordinated group.
import asyncio
from anthropic import AsyncAnthropic
from llmfleet import FleetDispatcher
async def evaluate_one(fleet, case):
result = await fleet.submit(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": case["prompt"]}],
)
return result.content[0].text
async def main():
client = AsyncAnthropic()
cases = load_test_cases() # 200 items
async with FleetDispatcher(client) as fleet:
results = await asyncio.gather(*[
evaluate_one(fleet, case) for case in cases
])
Each evaluate_one call hits fleet.submit() instead of the client directly. That is the only change to the caller. The dispatcher handles the rest.
You can tune the window:
from llmfleet import FleetDispatcher, FleetConfig
config = FleetConfig(
gather_window_ms=80, # wait up to 80ms to collect stragglers
max_batch_size=50, # send at most 50 at once
)
async with FleetDispatcher(client, config=config) as fleet:
...
If you want to see what happened after a run:
async with FleetDispatcher(client) as fleet:
results = await asyncio.gather(...)
print(fleet.stats.total_calls)
print(fleet.stats.total_batches)
print(fleet.stats.avg_batch_size)
That is the whole API surface.
What it does NOT do
- It does not route to the Anthropic Message Batches API. That is a separate latency/cost tradeoff. llmfleet targets real-time throughput, not the 50% batch discount.
- It does not work across processes. The dispatcher is in-process only. Cross-process pooling needs something like Redis or SQS in front.
- It does not retry failed calls. If an individual call inside a batch errors, that error surfaces to the caller that submitted it. Pair with llm-retry-py if you need per-call backoff.
- It does not manage token budgets. It dispatches calls; it does not track spend. Pair with token-budget-py if you need a cap.
Inside the lib: the gather window design
The core idea is a short deliberate wait before dispatch.
When your first coroutine calls fleet.submit(), the dispatcher does not fire immediately. It starts a timer. The default is 50 milliseconds. During those 50ms, every other coroutine that calls fleet.submit() gets added to the same pending group. When the timer expires, the whole group dispatches together via asyncio.gather.
t=0ms coroutine A calls submit() -- starts the window
t=12ms coroutine B calls submit() -- joins the window
t=31ms coroutines C-F call submit() -- join the window
t=50ms window closes, A+B+C+D+E+F dispatch together
t=50ms coroutine G calls submit() -- starts a new window
This is different from "batch whatever arrives at the exact same millisecond." Real coroutines do not arrive at the exact same instant. There is startup jitter, I/O variance, and scheduling noise. Reactive batching catches the few that line up perfectly and misses the stragglers.
The gather window catches the stragglers. If your 200 evaluation coroutines start within a few hundred milliseconds of each other, the window collects them into a handful of large batches instead of 200 individual calls. Large batches stay well inside rate limits. Individual calls all fire at once and trip the limit.
The tradeoff is latency. The first call in a window waits up to gather_window_ms before it gets a response. For an interactive chat path that is unacceptable. For a batch evaluation that is invisible. The default 50ms is chosen to be meaningless for offline work and to still collect most of the coroutines that start together.
If your use case is mixed, you can set a short window (10ms) and accept that batch sizes will be smaller, or run two dispatcher instances: one with a short window for interactive paths and one with a longer window for background work.
When a window fills to max_batch_size before the timer expires, it dispatches early and starts a new window. This prevents any single window from becoming too large to send cleanly.
When this is useful
- You are running evals, grading pipelines, or data extraction jobs where many coroutines fire at roughly the same time.
- You hit 429 errors when running concurrent tasks and your current fix is jitter sleeps or manual batching loops.
- You want the concurrency logic out of your evaluation code.
- Your system prompt is long and repeated across many calls. Large batches with shared context hit the prompt cache more reliably, because the cache prefix is warm by the time the second call lands.
- You want to observe actual batch sizes to tune your concurrency budget.
When this is NOT what you want
- One coroutine, one call. The dispatcher adds overhead and gives you nothing back. Use
AsyncAnthropicdirectly. - You need results in under 100ms. The gather window is intentional latency. Turn it off or use a very short window, at the cost of smaller batch sizes.
- You need the 50% Anthropic Batch API discount. That API has a different shape entirely. See anthropic-batch-kit for that path.
Install
pip install llmfleet
Repo: https://github.com/MukundaKatta/llmfleet
v0.1.0 ships on Python 3.10 and later. No dependencies beyond the Anthropic SDK.
Sibling libraries
| Lib | Boundary | Repo |
|---|---|---|
| anthropic-batch-kit | Async Batch API for latency-insensitive jobs. Different from real-time pooling. | MukundaKatta/anthropic-batch-kit |
| token-budget-py | Shared token/USD budget pool across the fleet. Cap spend before you dispatch. | MukundaKatta/token-budget-py |
| llm-retry-py | Per-call retry with full-jitter backoff after dispatch. llmfleet reduces 429s; llm-retry handles the ones that still slip through. | MukundaKatta/llm-retry-py |
| agenttrace | Cost and latency rollup across the fleet. See total tokens, p50/p95 latency, per-model breakdown. | MukundaKatta/AgentTracePy |
The combination that made my eval pipeline clean: llmfleet for dispatch, token-budget-py to prevent runaway spend, agenttrace to see where the time went, and llm-retry-py as a safety net.
What's next
v0.1.0 is Anthropic only. OpenAI and Bedrock have similar rate limit structures and the dispatcher is provider-agnostic. The per-provider adapter is the only missing piece. If you want to add one, the interface is small.
The gather window default of 50ms came from my own eval runs. I would like to see data from other workloads to know whether that number is broadly sensible or if it wants to be tunable based on observed batch sizes. The stats surface is there to make that observable.
If you are running parallel evals and your current fix is sleep(random()), try swapping in the dispatcher and see what happens to your batch sizes. The change to the caller is one line.
Top comments (0)