The web scraping tool hung. The API it called was down. No timeout was set. The agent waited. And waited. The user's 30-second deadline had passed by the time the tool gave up.
Most agents have a timeout on the LLM call. Few have timeouts on individual tool calls. tool-timeout-wrap adds per-tool timeouts with a clean exception on breach.
The Shape of the Fix
from tool_timeout_wrap import timeout_wrap, ToolTimeoutError
# Wrap tools at definition time
@timeout_wrap(seconds=10.0)
def scrape_webpage(url: str) -> str:
return requests.get(url, timeout=30).text # internal timeout is secondary
@timeout_wrap(seconds=5.0)
def call_external_api(endpoint: str, data: dict) -> dict:
return requests.post(endpoint, json=data).json()
# Tools now enforce their own timeouts
try:
content = scrape_webpage("https://example.com")
except ToolTimeoutError as e:
print(f"Timed out after {e.timeout_seconds}s")
content = "[scraping unavailable]"
Each tool has its own timeout. Slow tools do not delay fast tools. Hung tools get cancelled cleanly.
What It Does NOT Do
tool-timeout-wrap does not cancel in-flight HTTP requests. When the timeout expires, the wrapper raises ToolTimeoutError, but the underlying thread or async task may continue running until its own timeout or completion. For truly cancellable work, you need async with asyncio.timeout() or the tool's own cancellation mechanism.
It does not retry after timeout. Use llm-retry-py for retry logic. tool-timeout-wrap handles the timeout; retry is a separate concern.
It does not apply to async tools automatically. Pass is_async=True to use the async variant.
Inside the Library
Sync implementation uses concurrent.futures.ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
import functools
def timeout_wrap(seconds: float, is_async: bool = False):
def decorator(fn):
if is_async:
@functools.wraps(fn)
async def async_wrapper(*args, **kwargs):
import asyncio
try:
return await asyncio.wait_for(fn(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
raise ToolTimeoutError(tool_name=fn.__name__, timeout_seconds=seconds)
return async_wrapper
@functools.wraps(fn)
def sync_wrapper(*args, **kwargs):
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(fn, *args, **kwargs)
try:
return future.result(timeout=seconds)
except FuturesTimeout:
raise ToolTimeoutError(tool_name=fn.__name__, timeout_seconds=seconds)
return sync_wrapper
return decorator
ToolTimeoutError carries tool_name and timeout_seconds for structured logging:
except ToolTimeoutError as e:
logger.warning("tool_timeout",
tool=e.tool_name,
timeout_seconds=e.timeout_seconds)
The @timeout_wrap(seconds=N) decorator form is the primary interface. A function form timeout_wrap_fn(fn, seconds=N) is also available for wrapping tools you do not own.
When to Use It
Use it for every tool that makes a network call, reads a file, or does any I/O that could hang. The timeout should be set to 2-3x the expected latency for the tool in normal operation:
- Fast API calls (< 1s expected): 5s timeout
- Web scraping (1-5s expected): 15s timeout
- External AI/ML inference (5-15s expected): 30s timeout
- Database queries (< 1s expected): 5s timeout
If you are not sure what the expected latency is, set a conservative default (10-15s) and adjust based on observation.
Use it in combination with agent-deadline for two-level timeout enforcement: the deadline on the outer loop, tool timeouts on individual calls. The tool timeout is specific (this tool has this budget), the deadline is global (the entire run has this budget).
Install
pip install git+https://github.com/MukundaKatta/tool-timeout-wrap
from tool_timeout_wrap import timeout_wrap, timeout_wrap_fn, ToolTimeoutError
# Wrap your own functions
@timeout_wrap(seconds=8.0)
def get_stock_price(ticker: str) -> float:
return market_api.get_price(ticker)
# Wrap third-party functions
safe_requests_get = timeout_wrap_fn(requests.get, seconds=10.0)
# Async tools
@timeout_wrap(seconds=5.0, is_async=True)
async def async_search(query: str) -> list:
return await search_api.search(query)
# In your tool execution handler
def execute_tool(name: str, args: dict) -> dict:
tool_fn = TOOLS[name] # all already wrapped with timeout_wrap
try:
return tool_fn(**args)
except ToolTimeoutError as e:
return {
"error": "timeout",
"tool": e.tool_name,
"timeout_seconds": e.timeout_seconds,
}
Sibling Libraries
| Library | What it solves |
|---|---|
agent-deadline |
Wall-clock deadline for the entire agent run |
llm-retry-py |
Retry after timeout or other errors |
tool-error-classify |
Classify timeout as ErrorKind.TIMEOUT |
llm-stop-conditions |
Stop the loop after N timeouts (via custom condition) |
agent-health-check |
Pre-flight check that tools are responding |
The timeout stack: tool-timeout-wrap per tool, agent-deadline for the full run, tool-error-classify to classify timeout errors for the retry decision.
What's Next
Adaptive timeouts: track p95 latency per tool over recent calls and set the timeout dynamically. If scrape_webpage usually takes 2s but is starting to creep toward 4s, the adaptive timeout would widen automatically.
Timeout metrics: expose a counter per tool of how many times it timed out. Feed this into your observability stack to detect tools that are becoming unreliable.
Circuit breaker integration: after N consecutive timeouts from the same tool, open a circuit that rejects calls immediately instead of waiting for the timeout each time. This is a performance optimization for tools that are consistently unavailable.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)