As we embark on developing our universal Agent product OSpark, we've chosen to build upon MiroMindAI's open-source project MiroFlow. A key limitation we're addressing first is MiroFlow's lack of streaming output support. In this technical blog, we'll detail how we've enhanced MiroFlow to deliver real-time streaming capabilities through OSpark's orchestrator upgrade.
Core Changes in the Orchestrator
1. New Task Guidance System
def _get_task_guidance(chinese_context: bool = False) -> str:
"""Provides detailed task execution guidance with support for Chinese context"""
- Emphasizes comprehensive information collection and transparent reporting
- Offers specialized handling guidance for Chinese-language tasks
- Avoids premature conclusions while preserving all potential candidate answers
2. Streaming LLM Calling Capability
async def _handle_llm_call_streaming(self, ...):
"""Supports real-time streaming of LLM responses"""
- Returns real-time chunks of LLM-generated content
- Integrates tool call information extraction
- Supports streaming calls for both primary and sub-agents
3. Streaming Summary Generation
async def _handle_summary_with_context_limit_retry_streaming(self, ...):
"""Streaming version of summary generation with context limit retry support"""
- Makes the summary generation process visible in real-time
- Automatically handles context length exceeded issues
- Features an intelligent retry mechanism
Design of the StreamingOrchestrator
1. Architectural Design
- Decorator Pattern: Instead of replacing the existing orchestrator, we add streaming capabilities on top of it
- Event-Driven: Implements streaming interactions based on a complete event system
- Task Management: Supports task cancellation, status queries, and lifecycle management
2. Core Functionality
async def execute_task_streaming(self, ...) -> AsyncGenerator[StreamEvent, None]:
"""Executes tasks and returns events in a streaming fashion"""
Three Types of Real-time Interactions:
- LLM Response Streaming: Returns LLM-generated content in real-time
- Tool Execution Transparency: Displays tool call start, execution, and completion statuses
- Progress Status Real-time Updates: Provides detailed execution steps and progress information
3. Event System
class EventType(str, Enum):
TASK_STARTED = "task_started" # Task initiation
TASK_PROGRESS = "task_progress" # Task progress updates
LLM_RESPONSE = "llm_response" # LLM-generated content
TOOL_CALL_START = "tool_call_start" # Tool invocation beginning
TOOL_CALL_COMPLETE = "tool_call_complete" # Tool invocation finished
ERROR = "error" # Error occurrences
Relationship Between the Two Components
1. Collaboration Model: Decorator Pattern
# Streaming orchestrator creates an orchestrator instance internally
orchestrator = Orchestrator(...)
- Composition Relationship: The streaming orchestrator contains an instance of the original orchestrator
- Function Enhancement: Adds streaming capabilities on top of the original orchestrator
- Interface Compatibility: Maintains the original orchestrator's interface unchanged
2. Responsibility Division
Component | Responsibilities |
---|---|
Orchestrator | Core business logic: task execution, LLM calls, tool management |
StreamingOrchestrator | Streaming interaction layer: event management, real-time feedback, task lifecycle |
3. Data Flow
API Request → StreamingOrchestrator → Create Orchestrator Instance → Execute Core Logic → Generate Streaming Events → Return in Real-time
Core Value
1. User Experience Enhancement
- Transparent Execution: Users can observe the AI's real-time thinking process
- Instant Feedback: Real-time visibility of LLM responses, tool execution, and task progress
- Controllability: Supports task cancellation and status monitoring
2. Technical Architecture Advantages
- Backward Compatibility: All original functionalities are preserved, enabling gradual migration
- Separation of Concerns: Clear separation between core logic and interaction experience
- Extensibility: Event-driven architecture supports flexible feature expansion
3. Practical Applications
- Real-time AI Assistant: Web interfaces that display the AI's thinking process in real-time
- Debugging and Monitoring: Developers can monitor execution processes and performance metrics in real-time
- Interactive Development: Supports task cancellation, parameter adjustment, and result verification
Conclusion
With the introduction of the StreamingOrchestrator, OSpark achieves a significant transformation from "black box" execution to "transparent" interaction. Using a decorator pattern design, we provide users with an improved interaction experience while maintaining backward compatibility, and offer developers enhanced observability and extensibility. This architecture not only addresses current real-time interaction needs but also lays a solid foundation for future feature expansion.
Top comments (0)