DEV Community

AskPaul
AskPaul

Posted on

OSpark: Building Event-Driven Streaming Responses with MiroMind's MiroFlow Foundation

Introduction

OSparkApi, a universal intelligent agent orchestration system built upon MiroMindAI's open-source MiroFlow project, centers its innovation around one critical capability: sophisticated event-driven streaming response processing. This architecture, rooted in MiroFlow's robust foundations, enables seamless handling of dynamic agent interactions while maintaining exceptional flexibility and scalability.

Core Architectural Design

1. Event System: The Event-Driven Engine

Leveraging MiroFlow's architectural principles, OSparkApi adopts an event-driven architecture where all state changes flow through well-defined event streams, creating a loosely coupled ecosystem where components interact via standardized events.

Core Event Type Definitions:

ThreadStreamEvent = Annotated[
    ThreadCreatedEvent | ThreadUpdatedEvent | ThreadItemAddedEvent | 
    ThreadItemUpdated | ThreadItemDoneEvent | ProgressUpdateEvent | 
    ErrorEvent | NoticeEvent,
    Field(discriminator="type"),
]
Enter fullscreen mode Exit fullscreen mode

Asynchronous Event Streaming:

class AgentContext(BaseModel):
    _events: asyncio.Queue[ThreadStreamEvent | object] = asyncio.Queue()

    async def stream(self, event: ThreadStreamEvent) -> None:
        await self._events.put(event)
Enter fullscreen mode Exit fullscreen mode

This event system—enhanced from MiroFlow's original design—effectively decouples producers from consumers, significantly enhancing extensibility and allowing independent evolution of system components.

2. Store Layer: Persistence Abstraction

Building on MiroFlow's flexible data handling, the Store layer provides a unified storage interface with support for multiple backend implementations, ensuring adaptability in data persistence strategies.

Abstract Storage Interface:

class Store(ABC, Generic[TContext]):
    @abstractmethod
    async def load_thread(self, thread_id: str, context: TContext) -> ThreadMetadata:
        pass

    @abstractmethod
    async def add_thread_item(self, thread_id: str, item: ThreadItem, context: TContext) -> None:
        pass
Enter fullscreen mode Exit fullscreen mode

ID Generation Strategy:

_ID_PREFIXES = {"thread": "thr", "message": "msg", "tool_call": "tc", "workflow": "wf"}
def default_generate_id(item_type: StoreItemType) -> str:
    prefix = _ID_PREFIXES[item_type]
    return f"{prefix}_{uuid.uuid4().hex[:16]}"
Enter fullscreen mode Exit fullscreen mode

This design—extending MiroFlow's storage agnosticism—enables seamless switching between different storage backends without affecting core business logic.

3. Context System: Execution State Management

AgentContext, a critical enhancement to MiroFlow's execution model, serves as the core of response processing, maintaining complete execution context for thread operations and ensuring state consistency across complex interactions.

Core Data Structure:

class AgentContext(BaseModel):
    thread: ThreadMetadata
    store: Store[TContext]
    request_context: TContext

    workflow_item: WorkflowItem | None = None
    client_tool_call: ClientToolCall | None = None
Enter fullscreen mode Exit fullscreen mode

Workflow Lifecycle Management:

async def start_workflow(self, workflow: Any) -> None:
    self.workflow_item = WorkflowItem(
        id=self.generate_id("workflow"),
        created_at=datetime.now(),
        workflow=workflow,
        thread_id=self.thread.id,
    )
    await self.stream(ThreadItemAddedEvent(item=self.workflow_item))

async def end_workflow(self, summary: Optional[Any] = None, expanded: bool = False) -> None:
    await self.stream(ThreadItemDoneEvent(item=self.workflow_item))
    self.workflow_item = None
Enter fullscreen mode Exit fullscreen mode

The Context system—building on MiroFlow's state management capabilities—guarantees state consistency and operational atomicity throughout complex agent interactions.

4. Thread System: Conversation Session Management

Threads represent the core abstraction in OSparkApi, extending MiroFlow's session handling to encapsulate complete conversational sessions and provide a structured container for interaction history.

Thread Data Model:

class ThreadMetadata(BaseModel):
    id: str
    created_at: datetime
    status: ThreadStatus = Field(default_factory=lambda: ActiveStatus())
    title: str | None = None
    user_id: int | None = None
Enter fullscreen mode Exit fullscreen mode

Rich Item Type Hierarchy:

ThreadItem = Annotated[
    UserMessageItem | AssistantMessageItem | ClientToolCallItem | 
    ServerToolCallItem | WidgetItem | WorkflowItem | TaskItem | EndOfTurnItem,
    Field(discriminator="type"),
]
Enter fullscreen mode Exit fullscreen mode

The Thread system—enhanced from MiroFlow's conversation primitives—provides comprehensive lifecycle management for conversations, supporting diverse interaction patterns and content types.

Processing Flow

Streaming Response Handling

Agent response processing follows a standardized flow—evolved from MiroFlow's execution pipeline—with clear separation of concerns between components:

Response Handler Interface:

class AgentResponseHandler(ABC):
    @abstractmethod
    async def stream_agent_response(
        self, context: AgentContext, result: RunResultStreaming
    ) -> AsyncIterator[ThreadStreamEvent]:
        pass
Enter fullscreen mode Exit fullscreen mode

Core Processing Logic:

async def stream_agent_response(self, context: AgentContext, result: RunResultStreaming):
    async for event in result.stream_events():
        thread_event = await self._process_stream_event(event, context)
        if thread_event:
            yield thread_event
Enter fullscreen mode Exit fullscreen mode

Key Processing Capabilities

  • Content Streaming: Real-time delivery of incremental text with annotation and formatting information
  • Tool Invocation: Complete lifecycle management for both client and server tool calls
  • Error Handling: Unified exception capture and structured error event generation
  • Workflow Orchestration: Task state tracking and progress updates throughout execution

Architectural Advantages

Design Principles

  1. Type Safety: Comprehensive use of Pydantic—consistent with MiroFlow's design philosophy—ensures data consistency
  2. Async-First: Native asynchronous support based on asyncio for high-performance operations
  3. Event-Driven: Decoupled architecture enables horizontal scalability and component isolation
  4. Abstract Layering: Clear separation of responsibilities facilitates maintenance and evolution

Extensibility Features

  • Processor Factory: Supports custom processors for different LLM providers, building on MiroFlow's flexibility
  • Storage Plugins: Easy integration with new storage backends through abstract interfaces
  • Event Extension: New functionality can be implemented by adding event types
  • Config-Driven: Flexible configuration management based on OmegaConf

Enterprise-Grade Characteristics

  • Observability: Comprehensive event logging and tracing capabilities
  • Error Recovery: Event-based checkpointing and resume functionality
  • Concurrency Control: Thread-level state isolation for safe parallel execution
  • Permission Management: User-based access control for secure multi-tenant operation

Conclusion

OSparkApi's agent response processing architecture—built upon MiroMind's MiroFlow foundation—represents a mature evolution of modern streaming AI systems. By extending MiroFlow's robust core with a carefully crafted four-layer architecture (Event, Store, Context, and Thread), it delivers a high-performance, extensible, and type-safe response processing system.

This architecture not only enhances MiroFlow's original capabilities but also provides a solid technical foundation for future AI application innovation. It demonstrates strong adaptability across real-time interactions, multimodality, and complex reasoning scenarios, positioning OSparkApi as a robust framework for the next generation of intelligent agent applications—all while honoring its MiroMind heritage.

Top comments (0)