Your agent loop has tool calls, LLM calls, cost tracking, logging, and decision recording all mixed together. Adding one more concern — say, real-time alerting for high-cost runs — means modifying the core loop again.
agent-event-bus is an in-process pub/sub system that lets agent components emit events and let observability code subscribe to them, without the emitter knowing who is listening.
The Shape of the Fix
from agent_event_bus import EventBus
bus = EventBus()
# Subscribers registered at startup
@bus.subscribe("agent.tool_call")
def log_tool_call(event: dict) -> None:
logger.info("tool_call", tool=event["tool"], duration_ms=event["duration_ms"])
@bus.subscribe("agent.cost_update")
def alert_on_high_cost(event: dict) -> None:
if event["total_usd"] > 1.00:
send_alert(f"Run {event['run_id'][:8]} exceeded $1.00")
# In your agent loop — no knowledge of subscribers
def execute_tool(name: str, args: dict) -> dict:
start = time.monotonic()
result = call_tool(name, args)
duration_ms = (time.monotonic() - start) * 1000
bus.publish("agent.tool_call", {
"tool": name,
"duration_ms": duration_ms,
"run_id": str(current_run_id()),
})
return result
The agent loop emits events. Observers subscribe. Neither knows about the other.
What It Does NOT Do
agent-event-bus does not distribute events across processes or machines. It is in-process only. For cross-process or cross-machine pub/sub, use Redis pub/sub, Kafka, or a message queue.
It does not guarantee delivery. If a subscriber raises an exception, the bus catches it and logs a warning, then continues. Events are not retried.
It does not persist events. Events that are published before a subscriber registers are lost. If you need persistence, emit to a JSONL file using agent-step-log and consume from there.
Inside the Library
The bus stores subscribers as lists of callables per topic:
class EventBus:
def __init__(self):
self._subscribers: dict[str, list[Callable]] = defaultdict(list)
self._lock = threading.Lock()
def subscribe(self, topic: str):
def decorator(fn: Callable) -> Callable:
with self._lock:
self._subscribers[topic].append(fn)
return fn
return decorator
def publish(self, topic: str, payload: dict) -> None:
with self._lock:
subscribers = list(self._subscribers.get(topic, []))
for subscriber in subscribers:
try:
subscriber({"topic": topic, **payload})
except Exception as e:
logger.warning("subscriber_error", topic=topic, error=str(e))
def unsubscribe(self, topic: str, fn: Callable) -> None:
with self._lock:
self._subscribers[topic] = [
s for s in self._subscribers[topic] if s is not fn
]
Async subscribers: AsyncEventBus uses asyncio.gather() to call async subscribers concurrently. Mix-and-match is not supported — use EventBus for sync, AsyncEventBus for async contexts.
Wildcard topics: bus.subscribe("agent.*") matches any topic starting with agent.. Implemented with prefix matching.
When to Use It
Use it when you have more than one observability concern and you do not want to add each one to the core loop directly. The bus keeps the core loop clean. Subscribers handle the side effects.
Use it for cross-cutting concerns: logging, cost alerting, metrics collection, real-time dashboards. These all want to know about agent events but have no business being in the agent loop itself.
Use it for extensible agent frameworks. If you are building a framework that others extend, a bus lets third-party code subscribe to agent events without modifying the framework core.
Skip it for simple agents. If you have one logging subscriber and nothing else, the bus adds indirection without benefit. A direct function call is simpler.
Install
pip install git+https://github.com/MukundaKatta/agent-event-bus
from agent_event_bus import EventBus
from agentsnap import AgentSnap
from agent_decision_log import DecisionLog
bus = EventBus()
snap = AgentSnap()
decisions = DecisionLog(path="./logs/decisions.jsonl")
# Wire up observability subscribers once at startup
@bus.subscribe("agent.llm_call")
def track_usage(event: dict) -> None:
snap.record(
run_id=event["run_id"],
model=event["model"],
input_tokens=event["input_tokens"],
output_tokens=event["output_tokens"],
)
@bus.subscribe("agent.decision")
def record_decision(event: dict) -> None:
decisions.record(
run_id=event["run_id"],
decision=event["decision"],
why=event["why"],
)
@bus.subscribe("agent.*") # catch all
def count_events(event: dict) -> None:
metrics.increment(f"agent.events.{event['topic']}")
# Agent loop just publishes
def run_agent(task: str) -> str:
run_id = str(RunId.generate())
while True:
response = call_llm(messages)
bus.publish("agent.llm_call", {
"run_id": run_id,
"model": "claude-sonnet-4-6",
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
})
if response.stop_reason == "end_turn":
return extract_text(response)
Sibling Libraries
| Library | What it solves |
|---|---|
agentsnap |
Usage tracking subscriber for cost events |
agent-decision-log |
Decision recording subscriber |
agent-run-id |
Run IDs to include in event payloads |
agent-step-log |
Durable step logging (complements transient bus) |
agenttap |
Wire-level capture (use alongside bus for full picture) |
The observability architecture: agent-event-bus for in-process event routing, agenttap for wire-level capture, agent-step-log for durable step records, agent-run-id for correlation.
What's Next
Event replay for testing: bus.capture() context manager that records all published events during a block. Load the recording and replay it to test subscribers. This would make subscriber unit tests much cleaner.
Priority subscribers: call higher-priority subscribers before lower-priority ones. Useful for subscribers that must run before others (auth/security checks before logging).
Dead letter queue: events that could not be delivered because all subscribers raised go to a dead letter queue for inspection. Currently errors are just logged and dropped.
Built as part of the agent-stack family: composable Python primitives for production LLM agents.
Top comments (0)