DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

agent-event-bus: Sync and Async Pub/Sub for Agent Events

The bug that started this

I wired cost tracking directly into the agent loop. At the time it seemed fine. The agent calls a model, the cost tracker updates, the loop continues.

Then the cost tracker threw an exception. The agent stopped.

Not because the agent logic was broken. Not because the model call failed. The agent stopped because an observability component panicked and there was nothing in between.

That is the wrong coupling. A budget tracker should never be able to crash the agent it is watching. Neither should a logger, a tracer, or an alerting hook. These are observers. Observers should not have control-plane authority over what they observe.

That is what agent-event-bus fixes.


The shape of the fix

Before: cost tracking mixed into the loop.

def agent_loop(messages):
    response = model.call(messages)
    cost_tracker.record(response)  # crashes here = agent dies
    return response
Enter fullscreen mode Exit fullscreen mode

After: the agent emits an event. The cost tracker listens.

from agent_event_bus import EventBus

bus = EventBus()

@bus.on("model_responded")
def track_cost(event):
    cost_tracker.record(event["response"])

def agent_loop(messages):
    response = model.call(messages)
    bus.emit("model_responded", {"response": response})
    return response
Enter fullscreen mode Exit fullscreen mode

If track_cost throws, the bus catches it, logs it, and moves on. The agent loop never sees the exception.

Wildcard subscriptions

@bus.on("*")
def log_everything(event_name, event):
    logger.info(f"{event_name}: {event}")
Enter fullscreen mode Exit fullscreen mode

A * listener sees every event. Useful for development, debugging, or a catch-all audit log.

One-time listeners

@bus.once("budget_exceeded")
def alert_once(event):
    pagerduty.send_alert(event)
Enter fullscreen mode Exit fullscreen mode

once removes the handler after the first fire. Good for setup completion events or single-shot alerts.

Async support

import asyncio
from agent_event_bus import EventBus

bus = EventBus()

@bus.on("tool_called")
async def async_trace(event):
    await tracer.record(event)

async def agent_loop(messages):
    result = await model.call(messages)
    await bus.emit_async("tool_called", {"tool": "search", "args": {}})
    return result
Enter fullscreen mode Exit fullscreen mode

Sync and async listeners can coexist on the same bus. Async emit awaits all async handlers and runs sync handlers inline.


What it does NOT do

  • It does not persist events. The bus is in-memory. If you need a durable event log, pair with agent-replay-trace or write your own handler that appends to a file.
  • It does not route events between processes. This is a single-process coordination point, not a message queue.
  • It does not enforce schema on events. You can emit any dict. Validation is the caller's responsibility.
  • It does not retry failed handlers. If your handler crashes once, the exception is logged and the handler is not called again for the same event.

Inside the lib: error isolation

The core design decision is that a crashing handler must not propagate to the agent.

The implementation is straightforward. Each handler is called inside its own try/except. If it raises, the bus logs the exception with the event name and handler name, then continues to the next handler.

for handler in self._handlers.get(event_name, []):
    try:
        handler(event)
    except Exception as exc:
        logger.exception(
            f"Handler {handler.__name__!r} raised on event {event_name!r}: {exc}"
        )
Enter fullscreen mode Exit fullscreen mode

No handler can stop the iteration. No handler can raise to the caller of emit.

This is the same pattern you see in browser event systems and in Python's logging module. Handlers are guests. The bus is the host. The host does not let a crashing guest break the party.

The rationale: observability code is some of the most likely code to have bugs. It touches external systems (log aggregators, metrics endpoints, alerting APIs). Those systems fail. If your tracer is down, your agent should keep running. If your cost hook has a bug, your agent should keep running. The agent is the primary concern. The observers are secondary.


When this is useful

You have multiple cross-cutting concerns. Logging, tracing, budget tracking, and alerting all want to know when something happens in the agent loop. Without a bus, each of these gets injected into the loop explicitly. With a bus, each subscribes independently and the loop just emits.

You want to add or remove observability without touching agent code. Registering a new listener is one decorator. Removing it is one line. The agent loop does not change.

You are writing tests that need to inspect agent behavior. Register a listener in your test, run the agent, check what events were emitted. No mocking of internals required.

events = []
bus.on("tool_called")(lambda e: events.append(e))

run_agent(bus=bus)

assert any(e["tool"] == "search" for e in events)
Enter fullscreen mode Exit fullscreen mode

Your observability handlers call external services. If those services can fail, you want them isolated. The bus gives you that for free.


When NOT to use this

When you need synchronous acknowledgment from a handler before continuing. The bus is fire-and-continue. If you need the agent to wait for a handler to confirm something before moving forward, a callback or an explicit await on a result is cleaner.

When you need event ordering guarantees across processes. Use a real message broker (Kafka, Redis Streams, SQS) for that. The bus is in-process only.

When you need to replay missed events. There is no persistence and no replay. Listeners that register after an event fires do not receive it.

When your events carry large payloads. The bus passes dicts by reference. For large objects this is fine, but if you need copy-on-emit semantics or schema enforcement, add that at the call site.


Install

pip install agent-event-bus
Enter fullscreen mode Exit fullscreen mode

Zero dependencies. Supports Python 3.9 and above. 25 tests covering sync, async, wildcard, once, and error isolation.

Source: MukundaKatta/agent-event-bus


Siblings

These libraries in the same stack plug into the event bus as listeners or producers.

Lib Boundary Repo
agenttrace Cost and latency listener subscribes to model_responded events via the bus MukundaKatta/agenttrace
agent-decision-log Decision events emitted when the agent chooses a branch MukundaKatta/agent-decision-log
agentsnap Snapshot test hooks subscribe to tool call and response events MukundaKatta/agentsnap
agent-deadline Timeout events emitted when the agent exceeds its deadline MukundaKatta/agent-deadline

Each of these is designed to be a listener, not a participant. They observe and record. The event bus is the coordination layer that keeps them from being wired into agent logic directly.


What's next

A few things would make this more useful.

Named buses with dependency injection. Right now you instantiate a bus and pass it around. A named registry would let you get the same bus from anywhere in the call stack without explicit passing.

Event schemas. Optional validation on emit so you catch shape mismatches at the call site rather than in the handler. Could be implemented as a decorator on emit or as a separate define_event call.

Handler priority. Some listeners need to run before others. A simple integer priority on registration would handle most cases.

Async-first mode. The current implementation runs async handlers with asyncio.ensure_future or inline depending on context. A strict async-only bus could be cleaner for fully async agent stacks.

None of these are in scope for the current release. The goal was to get the core pattern right: emit events, isolate handlers, keep the agent loop clean. That is what ships.


The agent loop is control flow. Logging, tracing, and cost tracking are not. The event bus keeps them separate without any ceremony. That is the whole idea.

Top comments (0)