DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

In-Process Pub/Sub for Agent Events: Decouple Your Observability From Your Logic

Your agent loop has tool calls, LLM calls, cost tracking, logging, and decision recording all mixed together. Adding one more concern — say, real-time alerting for high-cost runs — means modifying the core loop again.

agent-event-bus is an in-process pub/sub system that lets agent components emit events and let observability code subscribe to them, without the emitter knowing who is listening.


The Shape of the Fix

from agent_event_bus import EventBus

bus = EventBus()

# Subscribers registered at startup
@bus.subscribe("agent.tool_call")
def log_tool_call(event: dict) -> None:
    logger.info("tool_call", tool=event["tool"], duration_ms=event["duration_ms"])

@bus.subscribe("agent.cost_update")
def alert_on_high_cost(event: dict) -> None:
    if event["total_usd"] > 1.00:
        send_alert(f"Run {event['run_id'][:8]} exceeded $1.00")

# In your agent loop — no knowledge of subscribers
def execute_tool(name: str, args: dict) -> dict:
    start = time.monotonic()
    result = call_tool(name, args)
    duration_ms = (time.monotonic() - start) * 1000

    bus.publish("agent.tool_call", {
        "tool": name,
        "duration_ms": duration_ms,
        "run_id": str(current_run_id()),
    })

    return result
Enter fullscreen mode Exit fullscreen mode

The agent loop emits events. Observers subscribe. Neither knows about the other.


What It Does NOT Do

agent-event-bus does not distribute events across processes or machines. It is in-process only. For cross-process or cross-machine pub/sub, use Redis pub/sub, Kafka, or a message queue.

It does not guarantee delivery. If a subscriber raises an exception, the bus catches it and logs a warning, then continues. Events are not retried.

It does not persist events. Events that are published before a subscriber registers are lost. If you need persistence, emit to a JSONL file using agent-step-log and consume from there.


Inside the Library

The bus stores subscribers as lists of callables per topic:

class EventBus:
    def __init__(self):
        self._subscribers: dict[str, list[Callable]] = defaultdict(list)
        self._lock = threading.Lock()

    def subscribe(self, topic: str):
        def decorator(fn: Callable) -> Callable:
            with self._lock:
                self._subscribers[topic].append(fn)
            return fn
        return decorator

    def publish(self, topic: str, payload: dict) -> None:
        with self._lock:
            subscribers = list(self._subscribers.get(topic, []))

        for subscriber in subscribers:
            try:
                subscriber({"topic": topic, **payload})
            except Exception as e:
                logger.warning("subscriber_error", topic=topic, error=str(e))

    def unsubscribe(self, topic: str, fn: Callable) -> None:
        with self._lock:
            self._subscribers[topic] = [
                s for s in self._subscribers[topic] if s is not fn
            ]
Enter fullscreen mode Exit fullscreen mode

Async subscribers: AsyncEventBus uses asyncio.gather() to call async subscribers concurrently. Mix-and-match is not supported — use EventBus for sync, AsyncEventBus for async contexts.

Wildcard topics: bus.subscribe("agent.*") matches any topic starting with agent.. Implemented with prefix matching.


When to Use It

Use it when you have more than one observability concern and you do not want to add each one to the core loop directly. The bus keeps the core loop clean. Subscribers handle the side effects.

Use it for cross-cutting concerns: logging, cost alerting, metrics collection, real-time dashboards. These all want to know about agent events but have no business being in the agent loop itself.

Use it for extensible agent frameworks. If you are building a framework that others extend, a bus lets third-party code subscribe to agent events without modifying the framework core.

Skip it for simple agents. If you have one logging subscriber and nothing else, the bus adds indirection without benefit. A direct function call is simpler.


Install

pip install git+https://github.com/MukundaKatta/agent-event-bus
Enter fullscreen mode Exit fullscreen mode
from agent_event_bus import EventBus
from agentsnap import AgentSnap
from agent_decision_log import DecisionLog

bus = EventBus()
snap = AgentSnap()
decisions = DecisionLog(path="./logs/decisions.jsonl")

# Wire up observability subscribers once at startup
@bus.subscribe("agent.llm_call")
def track_usage(event: dict) -> None:
    snap.record(
        run_id=event["run_id"],
        model=event["model"],
        input_tokens=event["input_tokens"],
        output_tokens=event["output_tokens"],
    )

@bus.subscribe("agent.decision")
def record_decision(event: dict) -> None:
    decisions.record(
        run_id=event["run_id"],
        decision=event["decision"],
        why=event["why"],
    )

@bus.subscribe("agent.*")  # catch all
def count_events(event: dict) -> None:
    metrics.increment(f"agent.events.{event['topic']}")

# Agent loop just publishes
def run_agent(task: str) -> str:
    run_id = str(RunId.generate())

    while True:
        response = call_llm(messages)
        bus.publish("agent.llm_call", {
            "run_id": run_id,
            "model": "claude-sonnet-4-6",
            "input_tokens": response.usage.input_tokens,
            "output_tokens": response.usage.output_tokens,
        })

        if response.stop_reason == "end_turn":
            return extract_text(response)
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
agentsnap Usage tracking subscriber for cost events
agent-decision-log Decision recording subscriber
agent-run-id Run IDs to include in event payloads
agent-step-log Durable step logging (complements transient bus)
agenttap Wire-level capture (use alongside bus for full picture)

The observability architecture: agent-event-bus for in-process event routing, agenttap for wire-level capture, agent-step-log for durable step records, agent-run-id for correlation.


What's Next

Event replay for testing: bus.capture() context manager that records all published events during a block. Load the recording and replay it to test subscribers. This would make subscriber unit tests much cleaner.

Priority subscribers: call higher-priority subscribers before lower-priority ones. Useful for subscribers that must run before others (auth/security checks before logging).

Dead letter queue: events that could not be delivered because all subscribers raised go to a dead letter queue for inspection. Currently errors are just logged and dropped.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)