- 1. Every Agent Starts as a Loop
- 2. Gear up Your Agent
- 3. Pack the Conversation And Carry On
- 4. Beyond the CLI
- 5. Many of Them
- 6. Agents are Running, Your are Sleeping
- 7. More Context! More Context!
Beyond the CLI
Your agent works great in the terminal. But what if you want to talk to it from Telegram? Or your phone? Or another program? Or even multiple of them.
Event-Driven Architecture
To make the agent more scalable, we introduce event-driven architecture before adding more feature.
The pattern is pub/sub, and you already know it. An event bus sits at the center. Messages come in as events, workers process them, responses go out as events.
@dataclass
class InboundEvent:
session_id: str
content: str
source: EventSource
@dataclass
class OutboundEvent:
session_id: str
content: str
error: str | None = None
class EventBus(Worker):
def subscribe(self, event_class, handler):
"""Subscribe a handler to an event class."""
async def publish(self, event: Event) -> None:
"""Publish an event to the internal queue."""
await self._queue.put(event)
async def run(self) -> None:
while True:
event = await self._queue.get()
await self._dispatch(event)
Channels & Agent Worker & Delivery Worker
Three workers form a pipeline:
-
Channel Worker — Receives messages from platforms (CLI, Telegram, WebSocket), publishes
InboundEvents -
Agent Worker — Subscribes to
InboundEvents, runs the agent session, publishesOutboundEvents -
Delivery Worker — Subscribes to
OutboundEvents, routes responses back to the right channel
A channel is an abstraction over a messaging platform. CLI, Telegram, Discord, WebSocket. The channel publishes an InboundEvent to the event bus.
class Channel(ABC):
@abstractmethod
async def run(self, on_message: Callable) -> None:
"""Run the channel. Blocks until stop() is called."""
on_message(message) # Inbound Event to event_bus
@abstractmethod
async def reply(self, content: str, source) -> None:
"""Reply to incoming message."""
The agent worker bridges events and sessions:
class AgentWorker:
def __init__(self, context):
self.context.eventbus.subscribe(InboundEvent, self.dispatch_event)
async def dispatch_event(self, event: InboundEvent):
agent = Agent(agent_def, self.context)
session = agent.resume_session(event.session_id)
response = await session.chat(event.content)
# Publish result
result = OutboundEvent(
session_id=event.session_id,
content=response,
)
await self.context.eventbus.publish(result)
The delivery worker picks up OutboundEvents and sends them back through the appropriate channel:
class DeliveryWorker:
def __init__(self, context):
self.context.eventbus.subscribe(OutboundEvent, self.deliver)
async def deliver(self, event: OutboundEvent):
# Look up which channel this session belongs to
channel = self._get_channel_for_session(event.session_id)
await channel.reply(event.content, event.source)
# Confirm delivery - removes persisted event file
self.context.eventbus.ack(event)
Event Persistence: Don't Lose Messages
What happens if the server crashes after the agent responds but before delivery? The message is lost.
The fix: persist OutboundEvents to disk before dispatching, delete only after successful delivery:
class EventBus(Worker):
async def run(self) -> None:
await self._recover() # Re-dispatch pending events on startup
while True:
event = await self._queue.get()
await self._dispatch(event)
async def _dispatch(self, event: Event) -> None:
await self._persist_outbound(event) # Write to disk first
await self._notify_subscribers(event)
def ack(self, event: Event) -> None:
"""Called by DeliveryWorker after successful delivery."""
filename = f"{event.timestamp}_{event.session_id}.json"
(self.pending_dir / filename).unlink() # Delete persisted file
The flow is complete:
user message → channel → InboundEvent → AgentWorker → OutboundEvent → persist → DeliveryWorker → channel → user → ack → delete.
WebSocket
Sometimes you want code to talk to your agent, not a human. WebSocket provides a programmatic interface.
The WebSocketWorker has two roles:
-
Channel — Receives messages from WebSocket clients, publishes
InboundEvents - Broadcaster — Subscribes to all events, broadcasts them to every connected client
class WebSocketWorker:
def __init__(self, context):
self.clients: Set[WebSocket] = set()
# Broadcaster role: subscribe to ALL events
for event_class in [InboundEvent, OutboundEvent]:
self.context.eventbus.subscribe(event_class, self.handle_event)
# Channel role: receive from clients
async def handle_connection(self, ws: WebSocket) -> None:
self.clients.add(ws)
try:
await self._run_client_loop(ws) # Publishes InboundEvent
finally:
self.clients.discard(ws)
# Broadcaster role: send to all clients
async def handle_event(self, event: Event) -> None:
for client in list(self.clients):
try:
await client.send_json(event_dict)
except Exception:
self.clients.discard(client)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
await context.websocket_worker.handle_connection(websocket)
Next Steps
Previous: Pack the Conversation And Carry On | Next: Many of Them
⭐ Star the repo if you found this series helpful!



Top comments (0)