The Problem With Wiring Everything Together
You have a Python agent. When it calls a tool, several things need to happen:
-
agenttraceneeds to record the call and latency - your budget tracker needs to decrement the tool call count
- your replay system needs to write a JSONL entry
- your Slack notifier should fire if the tool failed
The naive approach is to call each of those directly from your tool dispatch function. That works for one or two integrations. By the time you have five, your tool runner is a mess of imports and conditionals. Adding a sixth means editing core dispatch logic.
The fix is a pub/sub event bus. Publish one event. Let each consumer subscribe independently.
agent-event-bus is a Python library built specifically for this pattern in agent workloads. It gives you sync and async support, wildcard topic matching, and a once() helper for single-shot listeners. No external broker needed. No Redis, no Kafka, no sidecar.
Main Code Example
Install:
pip install agent-event-bus
Wire up a bus, attach subscribers, and publish from your tool runner:
from agent_event_bus import EventBus
bus = EventBus()
# Subscriber 1: observability
@bus.subscribe("tool.*")
def log_tool_event(topic, event):
print(f"[trace] {topic} | tool={event['tool']} latency={event.get('latency_ms')}ms")
# Subscriber 2: budget tracker
@bus.subscribe("tool.called")
def decrement_budget(topic, event):
budget_registry.decrement(event["agent_id"])
# Subscriber 3: single-shot alert on first failure
@bus.once("tool.failed")
def alert_on_first_failure(topic, event):
slack.send(f"First tool failure this run: {event['tool']}")
# Your tool runner
def run_tool(agent_id, tool_name, args):
import time
start = time.monotonic()
bus.publish("tool.called", {"agent_id": agent_id, "tool": tool_name})
try:
result = tool_registry[tool_name](**args)
latency = (time.monotonic() - start) * 1000
bus.publish("tool.succeeded", {"agent_id": agent_id, "tool": tool_name, "latency_ms": latency})
return result
except Exception as exc:
bus.publish("tool.failed", {"agent_id": agent_id, "tool": tool_name, "error": str(exc)})
raise
Async version with AsyncEventBus:
from agent_event_bus import AsyncEventBus
import asyncio
bus = AsyncEventBus()
@bus.subscribe("model.response")
async def handle_response(topic, event):
await metrics.record_tokens(event["usage"]["total_tokens"])
async def run_agent_turn(messages):
response = await llm.create(messages=messages)
await bus.publish("model.response", {
"usage": response.usage.model_dump(),
"stop_reason": response.stop_reason,
})
return response
Wildcard topics use dot-separated segments. tool.* matches tool.called, tool.succeeded, and tool.failed. *.failed matches any failure event across all namespaces.
What It Does NOT Do
agent-event-bus is an in-process pub/sub bus. It does not:
- Persist events to disk. If the process dies, undelivered events are gone.
- Distribute events across processes or machines. For that you need a real broker.
- Guarantee ordering across subscribers. Subscribers on the same topic fire in registration order, but that is an implementation detail you should not depend on.
- Replay past events to a new subscriber. A subscriber only receives events published after it subscribes.
If you need durability or cross-process delivery, use this library for the in-process leg and write events to a queue or log from a subscriber. agenttap pairs well here. Tap captures the wire-level events; the bus connects in-process consumers.
Design Reasoning
The core tension in agent observability is: you want rich telemetry everywhere, but you do not want every component to know about every other component.
Direct calls create a dependency graph that grows quadratically with the number of integrations. The bus keeps it linear. Each component knows about the bus. Nothing else.
The wildcard matching (tool.*) is the other key decision. Agent events cluster by namespace naturally. You want budget tracking to fire on any tool event. You want tracing to fire on any event at all. Glob-style wildcards on dot-separated topics are the minimal way to express that without a query language.
once() addresses a common pattern in agent runs: "alert me the first time X happens, then stop." Without it, you either accumulate state in a closure or unsubscribe manually. once() wraps the unsubscription for you.
Both sync and async buses share the same API. If you are mixing sync tool calls with an async model client, use AsyncEventBus and wrap sync handlers with asyncio.to_thread.
When This Applies (and When It Does Not)
This pattern works well when:
- You have more than two observability or budget consumers wired into tool dispatch
- Your team adds new integrations frequently and you want to do that without touching core dispatch
- You are writing tests and want to assert on events without mocking internals
This pattern is overkill when:
- You have one consumer and it will never grow
- You need cross-process delivery (use a real broker instead)
- Your agent is single-turn and stateless; the overhead of a bus adds no value
The bus adds roughly zero latency to the hot path. Subscribers are called synchronously (or awaited). The cost is the same as a direct function call per subscriber. For most agents this is irrelevant next to LLM and tool call latency.
Install or Quick-Start
pip install agent-event-bus
Minimal working example:
from agent_event_bus import EventBus
bus = EventBus()
@bus.subscribe("tool.called")
def on_tool(topic, event):
print(event)
bus.publish("tool.called", {"tool": "search", "query": "python asyncio"})
# prints: {'tool': 'search', 'query': 'python asyncio'}
GitHub: MukundaKatta/agent-event-bus
Siblings Table
These libraries are designed to work alongside agent-event-bus in the same stack:
| Library | What it does | Works with this? |
|---|---|---|
agenttap |
Wire-level prompt/response capture to JSONL | Yes: subscribe to bus events and write tap entries |
agenttrace |
Per-run cost and latency aggregation | Yes: subscribe to tool.* and model.* events |
agentsnap |
Agent trace snapshots for replay | Yes: snapshot on bus events at turn boundaries |
token-budget-pool |
Concurrent token and USD budget enforcement | Yes: decrement budget in a tool.called subscriber |
agent-step-log |
Per-step JSONL logger | Yes: write step entries in a model.response subscriber |
Testing With the Bus
One underrated benefit of the bus is how it simplifies unit tests.
Without a bus, testing that your tool runner notified the budget tracker requires either a mock passed into the runner or a real budget tracker instance. Both add boilerplate.
With the bus, you assert on published events directly:
from agent_event_bus import EventBus
def test_tool_failure_publishes_event():
bus = EventBus()
captured = []
@bus.subscribe("tool.failed")
def capture(topic, event):
captured.append(event)
# inject the test bus into your runner
runner = ToolRunner(bus=bus, tool_registry={"fail_tool": raises_error})
try:
runner.run_tool(agent_id="test", tool_name="fail_tool", args={})
except Exception:
pass
assert len(captured) == 1
assert captured[0]["tool"] == "fail_tool"
assert "error" in captured[0]
No mocks needed. You subscribe in the test, run the code, assert on the captured events. The bus is the seam.
This pattern works for any consumer. Test that budget decrement fires, test that the trace entry is published, test that the single-shot alert fires exactly once. Each subscriber gets its own isolated assertion.
What is Next
The immediate next feature is ordered delivery guarantees. Today, subscribers on the same topic fire in registration order, but the library makes no guarantee about that. A priority field on subscribe() would make the ordering explicit.
Subscriber error handling is the other open question. Right now, if a subscriber raises, the exception propagates to the publisher. A on_error callback per subscriber would let you isolate failures without wrapping every subscriber in a try/except.
Cross-process delivery is a longer-term consideration. The API is compatible with a broker-backed implementation. A future version could drop in a Redis Streams backend without changing the caller.
For the Hermes Agent Challenge sprint, this library is one of four that address the "connect components without coupling them" problem. The others cover citations, conversation persistence, and trace replay. If you are building a multi-component agent and want to add observability without rewriting your tool runner, start here.
Top comments (0)