You have 50 documents and you're running them through an LLM in a loop. The first one finishes at the 2-second mark. The fiftieth finishes at the 100-second mark — not because it's harder, but because it waited in line behind the other 49. Pool runs all 50 at the same time.
The Problem With Loops
Every developer who works with LLMs writes this code eventually:
import os
from yait_aichain.models import Model
from yait_aichain.skills import Skill
skill = Skill(
model=Model("claude-sonnet-4-6", api_key=os.getenv("ANTHROPIC_API_KEY")),
input={"messages": [{"role": "user", "parts": [
"Summarise in two sentences:\n\n{text}"
]}]},
)
documents = [{"text": f"Document {i} content..."} for i in range(50)]
results = []
for doc in documents:
result = skill.run(doc)
results.append(result)
It works. It's readable. And it's painfully slow.
Each LLM call takes roughly 2 seconds. Multiply that by 50 documents and you're staring at your terminal for almost two minutes. The calls are completely independent — document 37 doesn't need the result of document 12. Yet document 37 sits idle, waiting its turn. That's a scheduling problem, not a computation problem.
I ran into this directly while building a task that pulled N files or links and produced a consolidated report. The sequential version was logically fine but just hemorrhaged time. I needed to fire everything at once without rewriting the Skill logic — no new prompt templates, no restructured code, just a different execution model. That's what Pool is.
Pool: Parallel Map for LLM Calls
Pool takes one Skill (or Chain) and a list of inputs, then launches all of them concurrently. Think of it as Array.map() where every element runs in parallel against an LLM.
import os
from yait_aichain.models import Model
from yait_aichain.skills import Skill
from yait_aichain.pool import Pool, DONE, FAILED
skill = Skill(
model=Model("claude-sonnet-4-6", api_key=os.getenv("ANTHROPIC_API_KEY")),
input={"messages": [{"role": "user", "parts": [
"Summarise in two sentences:\n\n{text}"
]}]},
)
items = [
{"text": "Artificial intelligence is transforming..."},
{"text": "Quantum computing promises..."},
{"text": "Climate change is accelerating..."},
{"text": "Blockchain technology enables..."},
{"text": "Gene editing with CRISPR..."},
]
pool = Pool(skill, items=items, max_flows=5)
results = pool.run()
for result in results:
print(result)
s = pool.status
print(f"done={s[DONE]} failed={s[FAILED]}")
Three things worth noticing:
-
The Skill didn't change. Same model, same prompt template, same
{text}placeholder. Pool wraps existing logic — it doesn't demand new logic. -
pool.run()returns a list in the same order as the input. Item 0 in, result 0 out. No need to track which response belongs to which document. -
Status tracking is built in.
pool.statusgives you a dict withDONEandFAILEDcounts, so you know exactly what succeeded and what didn't.
The math is straightforward. Five items averaging ~2 seconds each, running concurrently: wall-clock time drops from ~10 seconds to ~2 seconds. The overhead is network jitter and provider-side queuing, not sequential waiting. Scale to 50 items and the gap gets embarrassing. Exact numbers depend on your provider and network conditions, but the shape of the improvement is consistent — you pay for one round-trip, not N.
Controlling Concurrency With max_flows
Running everything at once sounds great until your API provider starts returning 429 errors.
LLM providers enforce rate limits, and those limits vary by model, tier, and account type. Blasting 200 concurrent requests is a reliable way to get throttled regardless of your tier. Check your provider's current documentation before tuning this number — don't just guess.
max_flows is your throttle. It sets the maximum number of calls in flight at any given moment:
pool = Pool(skill, items=documents, max_flows=10)
With max_flows=10, Pool processes 50 documents in waves of 10 concurrent calls. Still dramatically faster than sequential, but it keeps you within reasonable rate-limit bounds. Dial it up or down depending on what your provider will tolerate.
Failures Don't Sink the Ship
Batch jobs have a classic failure mode: item 23 out of 50 throws an error and the whole run aborts. You fix the issue, restart from scratch, and wait through items 1–22 again. Deeply annoying.
Pool handles this differently. Each item gets its own outcome — DONE or FAILED. One failed call doesn't stop the rest. After pool.run() completes, check pool.status for the breakdown:
s = pool.status
print(f"done={s[DONE]} failed={s[FAILED]}")
# done=48 failed=2
You get 48 good results. You know exactly which 2 failed. Reprocess those two — not the entire batch.
Pool + Chain: Multi-Step Pipelines in Parallel
Pool isn't limited to a single Skill. It accepts a Chain as its runner, which means multi-step workflows parallelize just as easily.
Here's a real example: fetch a web page, convert it to Markdown, then summarize it — all in parallel across multiple URLs.
import os
from yait_aichain.models import Model
from yait_aichain.skills import Skill
from yait_aichain.chain import Chain
from yait_aichain.pool import Pool, DONE, FAILED
from yait_aichain.tools import convertToMD
fetch = convertToMD()
summarise = Skill(
model=Model("claude-sonnet-4-6", api_key=os.getenv("ANTHROPIC_API_KEY")),
input={"messages": [{"role": "user", "parts": [
"Summarise in one sentence:\n\n{result}"
]}]},
)
# Each Chain step is a tuple of (runner, output_key, input_mapping).
# Here: fetch writes its output to "result", mapped from the item's "source" field.
# The summarise Skill then reads {result} from that output key.
per_item = Chain(steps=[
(fetch, "result", {"input": "source"}), # fetch page; store as "result"
summarise, # summarise reads {result}
])
items = [
{"source": "https://fr.lipsum.com"},
{"source": "https://de.lipsum.com"},
{"source": "https://es.lipsum.com"},
]
pool = Pool(per_item, items=items, max_flows=3)
results = pool.run()
for item, result in zip(items, results):
print(f"[{item['source']}]\n{result}\n")
s = pool.status
print(f"done={s[DONE]} failed={s[FAILED]}")
The Chain step tuple has three elements: the runner, the key under which its output is stored, and a mapping from that key to the next step's input field. So (fetch, "result", {"input": "source"}) means: run fetch using the item's source field as input, store the output under "result". The summarise Skill then receives {result} via its prompt template. Each URL goes through the full fetch-then-summarize pipeline independently, and Pool runs all three chains at once.
When Pool Changes the Math
Consider a weekly report pulling data from 200 sources — summarize each one, then combine. Sequential at ~2 seconds per call: roughly 400 seconds, nearly 7 minutes. With max_flows=20, you process those 200 items in 10 waves: roughly 20 seconds total. What used to need a scheduled overnight job now finishes while you're still looking at the screen.
The API surface is intentionally small:
-
Pool(runner, items, max_flows)— runner is a Skill or Chain; items is a list of dicts; max_flows caps concurrency -
pool.run()— executes everything, returns results in input order -
pool.status— returns{DONE: int, FAILED: int}after the run
No async/await boilerplate. No callbacks. Define your Skill, list your inputs, set a concurrency cap, and Pool handles the scheduling.
Top comments (1)
Nice approach. asyncio.gather with a semaphore is the sweet spot for API calls — avoids the overhead of full thread pools. One edge case worth handling: when providers return different error response times, the slowest failure can block the semaphore slot. Adding a per-call timeout guard prevents that.