DEV Community

AskPaul
AskPaul

Posted on

OSpark Orchestrator: Enabling Streaming Output for MiroMind's MiroFlow

As we embark on developing our universal Agent product OSpark, we've chosen to build upon MiroMindAI's open-source project MiroFlow. A key limitation we're addressing first is MiroFlow's lack of streaming output support. In this technical blog, we'll detail how we've enhanced MiroFlow to deliver real-time streaming capabilities through OSpark's orchestrator upgrade.

Core Changes in the Orchestrator

1. New Task Guidance System

def _get_task_guidance(chinese_context: bool = False) -> str:
    """Provides detailed task execution guidance with support for Chinese context"""
Enter fullscreen mode Exit fullscreen mode
  • Emphasizes comprehensive information collection and transparent reporting
  • Offers specialized handling guidance for Chinese-language tasks
  • Avoids premature conclusions while preserving all potential candidate answers

2. Streaming LLM Calling Capability

async def _handle_llm_call_streaming(self, ...):
    """Supports real-time streaming of LLM responses"""
Enter fullscreen mode Exit fullscreen mode
  • Returns real-time chunks of LLM-generated content
  • Integrates tool call information extraction
  • Supports streaming calls for both primary and sub-agents

3. Streaming Summary Generation

async def _handle_summary_with_context_limit_retry_streaming(self, ...):
    """Streaming version of summary generation with context limit retry support"""
Enter fullscreen mode Exit fullscreen mode
  • Makes the summary generation process visible in real-time
  • Automatically handles context length exceeded issues
  • Features an intelligent retry mechanism

Design of the StreamingOrchestrator

1. Architectural Design

  • Decorator Pattern: Instead of replacing the existing orchestrator, we add streaming capabilities on top of it
  • Event-Driven: Implements streaming interactions based on a complete event system
  • Task Management: Supports task cancellation, status queries, and lifecycle management

2. Core Functionality

async def execute_task_streaming(self, ...) -> AsyncGenerator[StreamEvent, None]:
    """Executes tasks and returns events in a streaming fashion"""
Enter fullscreen mode Exit fullscreen mode

Three Types of Real-time Interactions:

  • LLM Response Streaming: Returns LLM-generated content in real-time
  • Tool Execution Transparency: Displays tool call start, execution, and completion statuses
  • Progress Status Real-time Updates: Provides detailed execution steps and progress information

3. Event System

class EventType(str, Enum):
    TASK_STARTED = "task_started"           # Task initiation
    TASK_PROGRESS = "task_progress"         # Task progress updates
    LLM_RESPONSE = "llm_response"           # LLM-generated content
    TOOL_CALL_START = "tool_call_start"     # Tool invocation beginning
    TOOL_CALL_COMPLETE = "tool_call_complete" # Tool invocation finished
    ERROR = "error"                         # Error occurrences
Enter fullscreen mode Exit fullscreen mode

Relationship Between the Two Components

1. Collaboration Model: Decorator Pattern

# Streaming orchestrator creates an orchestrator instance internally
orchestrator = Orchestrator(...)
Enter fullscreen mode Exit fullscreen mode
  • Composition Relationship: The streaming orchestrator contains an instance of the original orchestrator
  • Function Enhancement: Adds streaming capabilities on top of the original orchestrator
  • Interface Compatibility: Maintains the original orchestrator's interface unchanged

2. Responsibility Division

Component Responsibilities
Orchestrator Core business logic: task execution, LLM calls, tool management
StreamingOrchestrator Streaming interaction layer: event management, real-time feedback, task lifecycle

3. Data Flow

API Request → StreamingOrchestrator → Create Orchestrator Instance → Execute Core Logic → Generate Streaming Events → Return in Real-time
Enter fullscreen mode Exit fullscreen mode

Core Value

1. User Experience Enhancement

  • Transparent Execution: Users can observe the AI's real-time thinking process
  • Instant Feedback: Real-time visibility of LLM responses, tool execution, and task progress
  • Controllability: Supports task cancellation and status monitoring

2. Technical Architecture Advantages

  • Backward Compatibility: All original functionalities are preserved, enabling gradual migration
  • Separation of Concerns: Clear separation between core logic and interaction experience
  • Extensibility: Event-driven architecture supports flexible feature expansion

3. Practical Applications

  • Real-time AI Assistant: Web interfaces that display the AI's thinking process in real-time
  • Debugging and Monitoring: Developers can monitor execution processes and performance metrics in real-time
  • Interactive Development: Supports task cancellation, parameter adjustment, and result verification

Conclusion

With the introduction of the StreamingOrchestrator, OSpark achieves a significant transformation from "black box" execution to "transparent" interaction. Using a decorator pattern design, we provide users with an improved interaction experience while maintaining backward compatibility, and offer developers enhanced observability and extensibility. This architecture not only addresses current real-time interaction needs but also lays a solid foundation for future feature expansion.

Top comments (0)