DEV Community

AskPaul
AskPaul

Posted on

Building a Multi-Client Subscription Streaming Response System with MiroMind‘s MiroFlow

In the development of OSpark, a universal Agent product powered by MiroMind's open-source project MiroFlow, we encountered a critical challenge: enabling real-time, synchronized streaming responses across multiple clients while optimizing resource utilization. This article dives into the design and implementation of OSparkApi's multi-client subscription-based streaming response system, leveraging MiroFlow's flexible architecture to address the limitations of traditional single-client Server-Sent Events (SSE) solutions.

Background

Server-Sent Events (SSE) have become the backbone of real-time interaction in modern AI chat applications, enabling servers to push incremental updates to clients without constant polling. However, traditional SSE architectures face significant bottlenecks in two key scenarios:

  1. Multi-client synchronization: When users access the same conversation across multiple endpoints (e.g., browser tabs, mobile apps, web interfaces), each client typically initiates a separate SSE connection, leading to redundant AI inference tasks and wasted computational resources.

  2. Disconnection recovery: If a user exits a conversation page while an AI task is still running, re-entering the page should resume real-time updates seamlessly—without restarting the task or losing intermediate state.

To solve these pain points, we designed a streaming response system that supports multi-client subscription, task sharing, and reliable reconnection—all built on the foundation of MiroMind's MiroFlow framework.

Core Challenges

The design of a multi-client streaming system must address four critical requirements:

  1. Resource reuse: Eliminate redundant AI inference by allowing multiple clients to share a single running task.

  2. Event broadcasting: Ensure consistent, real-time updates across all subscribed clients for the same conversation.

  3. Connection management: Gracefully handle client disconnections, reconnections, and cleanups without disrupting ongoing tasks.

  4. Task lifecycle management: Properly create, run, cancel, and clean up tasks while accounting for subscription states.

Architecture Design

The system is composed of two core components—TaskManager and StreamHandler—working in tandem to manage tasks, subscriptions, and event distribution. This architecture decouples task execution from client communication, enabling flexible scaling and robust synchronization.

Core Components

1. TaskManager (Task Orchestrator)

The TaskManager serves as the central hub for managing asynchronous tasks and their subscribers. Its key responsibilities include:

  • Task registry: Maintaining a mapping of thread_id (conversation identifier) to active AI tasks.

  • Subscriber set management: For each thread_id, tracking a collection of subscriber queues (one per client connection).

  • Event broadcasting: Pushing task-generated events to all subscribed client queues.

  • Thread safety: Using asyncio.Lock to ensure safe concurrent access to shared data structures.

Core data structures:

  • Task registry: Dict[str, asyncio.Task] (maps thread_id to running tasks)

  • Subscriber collection: Dict[str, Set[asyncio.Queue]] (maps thread_id to subscriber queues)

2. StreamHandler (Connection & Subscription Manager)

The StreamHandler handles client connections, subscription logic, and event streaming. Its key functionalities include:

  • Task existence check: Verifying if a task for a given thread_id is already running.

  • Subscription mechanism: Allowing new clients to subscribe to existing tasks or initiate new tasks if none exist.

  • Event serialization & distribution: Converting task events to SSE-compatible format and streaming them to subscribed clients.

Workflows

The system’s behavior adapts to three primary user scenarios, ensuring seamless multi-client interaction and recovery.

Scenario 1: First Client Initiates a Request

  1. A client sends a chat request (without an existing thread_id).

  2. The StreamHandler checks with TaskManager and confirms no active task exists for the new conversation.

  3. A new background task is created to execute AI inference (powered by MiroFlow’s asynchronous processing capabilities).

  4. The background task generates events (e.g., partial chat responses, status updates) and pushes them to an event queue.

  5. The StreamHandler streams events from the queue to the client in SSE format.

  6. Once the first event is generated, the thread_id is dynamically extracted and registered in TaskManager alongside the running task.

Scenario 2: Subsequent Clients Subscribe to the Same Task

  1. A client sends a request with an existing thread_id (e.g., from a second browser tab or mobile device).

  2. The StreamHandler queries TaskManager and detects an active task for the thread_id.

  3. The client subscribes to the existing stream via TaskManager.subscribe_to_stream(thread_id), which creates a new dedicated queue for the client.

  4. All subsequent events from the task are broadcast to the new queue (alongside existing subscriber queues).

  5. The StreamHandler streams events from the client’s queue to the new subscriber in real time.

Scenario 3: Client Disconnection & Reconnection

  1. A client disconnects (e.g., network loss, tab closure), triggering an asyncio.CancelledError.

  2. In the finally block of the streaming coroutine, TaskManager.unsubscribe_from_stream(thread_id, queue) is called to remove the client’s queue from the subscriber set.

  3. The background task continues running, and other subscribed clients are unaffected.

  4. If the client reconnects with the same thread_id before the task completes:

  • StreamHandler verifies the task is still active.

  • A new queue is created via subscribe_to_stream().

  • The client resumes receiving events from the current point in the stream.

  1. When all subscribers have unsubscribed and the task completes, the TaskManager cleans up the task after a short delay (to accommodate late reconnections).

Key Technical Implementations

1. Publish-Subscribe Event Broadcasting

At the heart of the system is a publish-subscribe (pub/sub) pattern optimized for streaming:

  • When a task generates a new event, it is serialized into SSE format: data: {JSON-serialized-event}\n\n.

  • The TaskManager iterates over all subscriber queues for the corresponding thread_id and pushes the event to each queue.

  • Failed queue pushes (indicating a disconnected client) trigger automatic cleanup of the stale queue, ensuring the subscriber set remains efficient.

This approach guarantees that all subscribed clients receive identical events in real time, while avoiding overhead from dead connections.

2. Asynchronous Task Management with asyncio

Leveraging Python’s asyncio.Task and MiroFlow’s async capabilities, the system manages background tasks with precision:

  • Task registration: When a new task is created, its thread_id and asyncio.Task object are stored in the task registry.

  • Task cancellation: Users can cancel tasks from any client, with TaskManager propagating the cancellation signal to the background task.

  • Delayed cleanup: After a task completes, it remains in the registry for a configurable window (e.g., 5 minutes) to support reconnections. The task is only removed once the window expires and no subscribers exist.

3. Dynamic thread_id Extraction

For new conversations, the thread_id is not available upfront—it is generated within the first event from the AI task. To handle this:

  • The StreamHandler monitors the initial events from the task.

  • Upon extracting the thread_id from the first event, it immediately registers the task and thread_id with TaskManager.

  • Subsequent events are routed through the pub/sub mechanism to all subscribers.

This dynamic registration ensures seamless task tracking even when thread_id is not known at request time.

4. Robust Reconnection Support

The system’s reconnection feature is designed to minimize user disruption:

  • A dedicated reconnection API allows clients to send a request with a previously used thread_id.

  • TaskManager checks if the task is still active (or within the cleanup window).

  • If valid, a new subscriber queue is created, and the client resumes streaming from the current event sequence.

  • If the task has completed and been cleaned up, the client receives a "task completed" signal with the full conversation history.

Advantages & Value Propositions

  1. Resource Efficiency: By sharing a single AI inference task across multiple clients, the system reduces computational overhead by up to 80% in multi-device scenarios—critical for scaling AI Agent products.

  2. Real-Time Synchronization: All subscribed clients receive identical events simultaneously, ensuring consistent conversation states across devices.

  3. Enhanced User Experience: Supports multi-tab/multi-device usage and seamless reconnection after network interruptions, eliminating frustration from lost progress.

  4. Fault Tolerance: Disconnections of individual clients do not affect ongoing tasks or other subscribers, ensuring system stability.

  5. MiroMind MiroFlow Synergy: Built on MiroFlow’s flexible async framework, the system inherits MiroMind’s strengths in task orchestration and event handling, reducing development complexity.

Application Scenarios

  • Multi-Device Sync: Users can start a conversation on their phone and continue on their laptop, with real-time updates on both devices.

  • Team Collaboration: Team members can jointly monitor AI tasks (e.g., report generation, data analysis) and receive live progress updates.

  • Unstable Network Environments: Users in areas with spotty connectivity can reconnect and resume conversations without losing context.

  • Task Cancellation: A user can initiate a task on one device and cancel it from another, with the cancellation propagated instantly.

Conclusion

By combining MiroMind’s MiroFlow framework with a pub/sub-based architecture, we’ve built a streaming response system that redefines real-time multi-client interaction for AI Agent products. The decoupling of task execution (via TaskManager) and client communication (via StreamHandler) enables efficient resource reuse, robust synchronization, and seamless reconnection—addressing the core pain points of traditional SSE systems.

This design not only enhances the user experience of OSpark but also provides a scalable foundation for future extensions, such as supporting more client types, adding event filtering, or integrating with real-time collaboration features. For teams building AI products with multi-client requirements, this architecture offers a proven, MiroMind-powered solution to deliver responsive, synchronized streaming experiences.

Top comments (0)