In the development of OSpark, a universal Agent product powered by MiroMind's open-source project MiroFlow, we encountered a critical challenge: enabling real-time, synchronized streaming responses across multiple clients while optimizing resource utilization. This article dives into the design and implementation of OSparkApi's multi-client subscription-based streaming response system, leveraging MiroFlow's flexible architecture to address the limitations of traditional single-client Server-Sent Events (SSE) solutions.
Background
Server-Sent Events (SSE) have become the backbone of real-time interaction in modern AI chat applications, enabling servers to push incremental updates to clients without constant polling. However, traditional SSE architectures face significant bottlenecks in two key scenarios:
Multi-client synchronization: When users access the same conversation across multiple endpoints (e.g., browser tabs, mobile apps, web interfaces), each client typically initiates a separate SSE connection, leading to redundant AI inference tasks and wasted computational resources.
Disconnection recovery: If a user exits a conversation page while an AI task is still running, re-entering the page should resume real-time updates seamlessly—without restarting the task or losing intermediate state.
To solve these pain points, we designed a streaming response system that supports multi-client subscription, task sharing, and reliable reconnection—all built on the foundation of MiroMind's MiroFlow framework.
Core Challenges
The design of a multi-client streaming system must address four critical requirements:
Resource reuse: Eliminate redundant AI inference by allowing multiple clients to share a single running task.
Event broadcasting: Ensure consistent, real-time updates across all subscribed clients for the same conversation.
Connection management: Gracefully handle client disconnections, reconnections, and cleanups without disrupting ongoing tasks.
Task lifecycle management: Properly create, run, cancel, and clean up tasks while accounting for subscription states.
Architecture Design
The system is composed of two core components—TaskManager and StreamHandler—working in tandem to manage tasks, subscriptions, and event distribution. This architecture decouples task execution from client communication, enabling flexible scaling and robust synchronization.
Core Components
1. TaskManager (Task Orchestrator)
The TaskManager serves as the central hub for managing asynchronous tasks and their subscribers. Its key responsibilities include:
Task registry: Maintaining a mapping of
thread_id(conversation identifier) to active AI tasks.Subscriber set management: For each
thread_id, tracking a collection of subscriber queues (one per client connection).Event broadcasting: Pushing task-generated events to all subscribed client queues.
Thread safety: Using
asyncio.Lockto ensure safe concurrent access to shared data structures.
Core data structures:
Task registry:
Dict[str, asyncio.Task](mapsthread_idto running tasks)Subscriber collection:
Dict[str, Set[asyncio.Queue]](mapsthread_idto subscriber queues)
2. StreamHandler (Connection & Subscription Manager)
The StreamHandler handles client connections, subscription logic, and event streaming. Its key functionalities include:
Task existence check: Verifying if a task for a given
thread_idis already running.Subscription mechanism: Allowing new clients to subscribe to existing tasks or initiate new tasks if none exist.
Event serialization & distribution: Converting task events to SSE-compatible format and streaming them to subscribed clients.
Workflows
The system’s behavior adapts to three primary user scenarios, ensuring seamless multi-client interaction and recovery.
Scenario 1: First Client Initiates a Request
A client sends a chat request (without an existing
thread_id).The
StreamHandlerchecks withTaskManagerand confirms no active task exists for the new conversation.A new background task is created to execute AI inference (powered by MiroFlow’s asynchronous processing capabilities).
The background task generates events (e.g., partial chat responses, status updates) and pushes them to an event queue.
The
StreamHandlerstreams events from the queue to the client in SSE format.Once the first event is generated, the
thread_idis dynamically extracted and registered inTaskManageralongside the running task.
Scenario 2: Subsequent Clients Subscribe to the Same Task
A client sends a request with an existing
thread_id(e.g., from a second browser tab or mobile device).The
StreamHandlerqueriesTaskManagerand detects an active task for thethread_id.The client subscribes to the existing stream via
TaskManager.subscribe_to_stream(thread_id), which creates a new dedicated queue for the client.All subsequent events from the task are broadcast to the new queue (alongside existing subscriber queues).
The
StreamHandlerstreams events from the client’s queue to the new subscriber in real time.
Scenario 3: Client Disconnection & Reconnection
A client disconnects (e.g., network loss, tab closure), triggering an
asyncio.CancelledError.In the
finallyblock of the streaming coroutine,TaskManager.unsubscribe_from_stream(thread_id, queue)is called to remove the client’s queue from the subscriber set.The background task continues running, and other subscribed clients are unaffected.
If the client reconnects with the same
thread_idbefore the task completes:
StreamHandlerverifies the task is still active.A new queue is created via
subscribe_to_stream().The client resumes receiving events from the current point in the stream.
- When all subscribers have unsubscribed and the task completes, the
TaskManagercleans up the task after a short delay (to accommodate late reconnections).
Key Technical Implementations
1. Publish-Subscribe Event Broadcasting
At the heart of the system is a publish-subscribe (pub/sub) pattern optimized for streaming:
When a task generates a new event, it is serialized into SSE format:
data: {JSON-serialized-event}\n\n.The
TaskManageriterates over all subscriber queues for the correspondingthread_idand pushes the event to each queue.Failed queue pushes (indicating a disconnected client) trigger automatic cleanup of the stale queue, ensuring the subscriber set remains efficient.
This approach guarantees that all subscribed clients receive identical events in real time, while avoiding overhead from dead connections.
2. Asynchronous Task Management with asyncio
Leveraging Python’s asyncio.Task and MiroFlow’s async capabilities, the system manages background tasks with precision:
Task registration: When a new task is created, its
thread_idandasyncio.Taskobject are stored in the task registry.Task cancellation: Users can cancel tasks from any client, with
TaskManagerpropagating the cancellation signal to the background task.Delayed cleanup: After a task completes, it remains in the registry for a configurable window (e.g., 5 minutes) to support reconnections. The task is only removed once the window expires and no subscribers exist.
3. Dynamic thread_id Extraction
For new conversations, the thread_id is not available upfront—it is generated within the first event from the AI task. To handle this:
The
StreamHandlermonitors the initial events from the task.Upon extracting the
thread_idfrom the first event, it immediately registers the task andthread_idwithTaskManager.Subsequent events are routed through the pub/sub mechanism to all subscribers.
This dynamic registration ensures seamless task tracking even when thread_id is not known at request time.
4. Robust Reconnection Support
The system’s reconnection feature is designed to minimize user disruption:
A dedicated reconnection API allows clients to send a request with a previously used
thread_id.TaskManagerchecks if the task is still active (or within the cleanup window).If valid, a new subscriber queue is created, and the client resumes streaming from the current event sequence.
If the task has completed and been cleaned up, the client receives a "task completed" signal with the full conversation history.
Advantages & Value Propositions
Resource Efficiency: By sharing a single AI inference task across multiple clients, the system reduces computational overhead by up to 80% in multi-device scenarios—critical for scaling AI Agent products.
Real-Time Synchronization: All subscribed clients receive identical events simultaneously, ensuring consistent conversation states across devices.
Enhanced User Experience: Supports multi-tab/multi-device usage and seamless reconnection after network interruptions, eliminating frustration from lost progress.
Fault Tolerance: Disconnections of individual clients do not affect ongoing tasks or other subscribers, ensuring system stability.
MiroMind MiroFlow Synergy: Built on MiroFlow’s flexible async framework, the system inherits MiroMind’s strengths in task orchestration and event handling, reducing development complexity.
Application Scenarios
Multi-Device Sync: Users can start a conversation on their phone and continue on their laptop, with real-time updates on both devices.
Team Collaboration: Team members can jointly monitor AI tasks (e.g., report generation, data analysis) and receive live progress updates.
Unstable Network Environments: Users in areas with spotty connectivity can reconnect and resume conversations without losing context.
Task Cancellation: A user can initiate a task on one device and cancel it from another, with the cancellation propagated instantly.
Conclusion
By combining MiroMind’s MiroFlow framework with a pub/sub-based architecture, we’ve built a streaming response system that redefines real-time multi-client interaction for AI Agent products. The decoupling of task execution (via TaskManager) and client communication (via StreamHandler) enables efficient resource reuse, robust synchronization, and seamless reconnection—addressing the core pain points of traditional SSE systems.
This design not only enhances the user experience of OSpark but also provides a scalable foundation for future extensions, such as supporting more client types, adding event filtering, or integrating with real-time collaboration features. For teams building AI products with multi-client requirements, this architecture offers a proven, MiroMind-powered solution to deliver responsive, synchronized streaming experiences.
Top comments (0)