DEV Community

Cover image for AG-UI + LangGraph Streaming: Technical Implementation Guide
Ajay Gupta
Ajay Gupta

Posted on

AG-UI + LangGraph Streaming: Technical Implementation Guide

🎯 Purpose

This guide shows how to achieve real-time event streaming from AI workflows to UI using AG-UI protocol with LangGraph StateGraph execution. The approach provides sub-100ms latency for live user feedback during complex AI operations.


πŸš€ Demo Application

A complete working implementation of this architecture is available at:
https://github.com/cimulink/ai-workflow-engine

The repository includes:

  • Pure LangGraph + AG-UI server implementation
  • React frontend with real-time streaming
  • Document processing workflow example
  • Complete setup and deployment instructions

πŸ“‹ AG-UI Protocol Context

AG-UI defines a standardized set of event types for real-time agent-user interaction. Our goal is to adapt our entire LangGraph workflow to generate these predefined events and stream them from backend to frontend for real-time user feedback.

AG-UI Message Types

AG-UI defines several event categories for different aspects of agent communication:

Lifecycle Events

  • RUN_STARTED, RUN_FINISHED, RUN_ERROR
  • STEP_STARTED, STEP_FINISHED

Text Message Events

  • TEXT_MESSAGE_START, TEXT_MESSAGE_CONTENT, TEXT_MESSAGE_END

Tool Call Events

  • TOOL_CALL_START, TOOL_CALL_ARGS, TOOL_CALL_END

State Management Events

  • STATE_SNAPSHOT, STATE_DELTA, MESSAGES_SNAPSHOT

Special Events

  • RAW, CUSTOM

πŸ—οΈ Architecture Overview

The system combines three key components:

  1. LangGraph StateGraph - Handles workflow orchestration with nodes and conditional edges
  2. AG-UI Protocol - Defines event types for real-time UI communication
  3. HTTP Streaming - Uses Server-Sent Events (SSE) for browser-compatible streaming

High-Level Architecture

architectural overview

Data Flow Overview


πŸ”§ Core Technical Components

1. asyncio.Queue: Event Management Hub

The asyncio.Queue acts as a thread-safe buffer between LangGraph node execution and HTTP streaming. When LangGraph nodes complete, they place events in the queue. The HTTP streaming endpoint continuously reads from the queue and sends events to the frontend.

Key Benefits:

  • Thread Safety: Multiple nodes can emit events concurrently
  • Backpressure: Prevents memory overflow during heavy processing
  • Order Preservation: Events maintain chronological sequence
  • Non-blocking: Event production and consumption happen independently

2. yield Keyword: Python Streaming Pattern

Python's yield keyword transforms regular functions into async generators. Instead of returning all results at once, the function yields events one by one as they're produced. This creates a memory-efficient streaming pipeline where events are processed immediately rather than buffered.

Streaming Benefits:

  • Memory Efficient: Events are yielded as produced, not stored
  • Real-time: Zero buffering delay between event production and consumption
  • Lazy Evaluation: Execution pauses until next event is requested
  • Natural Flow Control: Consumer controls processing pace

3. Custom AGUIStreamingCheckpointer: State + Events

This extends LangGraph's built-in SqliteSaver checkpointer to automatically emit AG-UI events whenever workflow state changes. Every time a node completes and LangGraph saves a checkpoint, the custom checkpointer also emits a corresponding AG-UI event.

state and events

Checkpointer Advantages:

  • Automatic Events: No manual event emission required in nodes
  • State Consistency: Events always reflect actual LangGraph state
  • Unified Persistence: Database and streaming work together
  • Recovery Support: Failed workflows can resume with complete event history

4. HTTP Streaming: Server-Sent Events (SSE)

FastAPI serves AG-UI events using Server-Sent Events format. SSE is a web standard that allows servers to push data to browsers over a single HTTP connection. Each event is formatted as data: {json}\n\n and sent immediately to the client.

SSE Benefits Over WebSocket:

  • Simpler Protocol: Standard HTTP, no connection upgrades needed
  • Auto-Reconnection: Browsers handle reconnection automatically
  • Firewall Friendly: Uses standard HTTP ports, works through proxies
  • One-Way Optimal: Perfect for event streaming (no bidirectional needed)
  • Better Debugging: Standard HTTP tools work (curl, Postman)

πŸ”„ End-to-End Flow

Complete Sequence Diagram

sequence diagram

Step-by-Step Flow

  1. Frontend Request: React component sends document content via useAgent hook
  2. HTTP Streaming Setup: FastAPI creates async generator for event streaming
  3. LangGraph Execution: Workflow nodes execute with conditional routing
  4. Event Generation: Custom checkpointer emits AG-UI events on state changes
  5. Queue Management: Events flow through asyncio.Queue to HTTP response
  6. Frontend Processing: Browser receives SSE events and updates UI in real-time

Event Flow Visualization

event flow


βœ… Pros and Cons

Advantages

  • Real-Time UX: Immediate progress feedback, responsive human-in-the-loop
  • True LangGraph: Preserves StateGraph orchestration, conditional edges, checkpointing
  • Scalable: AsyncIO handles concurrent streams, event sourcing audit trail
  • Developer Friendly: Standard HTTP debugging, type-safe events, familiar React patterns

Disadvantages

  • Complexity: Requires understanding async generators, custom checkpointer maintenance
  • Resource Usage: Persistent connections, database growth
  • Error Handling: Stream interruptions, partial failures, connection recovery logic
  • Testing: Async streaming harder to unit test, timing considerations, race conditions

🎯 Implementation Strategy

Progressive Implementation Roadmap

roadmap for implementation

Event Design Patterns

event design patterns

Error Handling Strategy

  • Graceful Degradation: Emit error events, return partial results
  • Connection Recovery: Frontend auto-reconnection with exponential backoff
  • Event Batching: Reduce network overhead during high-volume periods

πŸš€ When to Use This Architecture

Perfect For:

  • Document Processing: Multi-step analysis with human review
  • Data Pipelines: Real-time ETL progress tracking
  • AI Agents: Conversational workflows with tool usage
  • Long Tasks: Processes >30 seconds needing progress updates

Consider Alternatives:

  • Simple Request/Response: Single API calls
  • Batch Processing: No real-time requirements
  • Resource Constrained: Limited memory/bandwidth

πŸ’‘ Key Takeaways

This architecture delivers production-ready real-time AI workflow interfaces by combining:

  1. LangGraph's orchestration (StateGraph, nodes, edges)
  2. AG-UI's streaming protocol (real-time events)
  3. HTTP SSE streaming (browser-compatible, simple debugging)
  4. asyncio.Queue + yield (memory-efficient event pipeline)

Success Factors: Design events for user value, implement robust error handling, monitor performance, test streaming behavior, plan for horizontal scaling.

The complexity is justified when user experience and workflow transparency are critical. Start simple and add streaming capabilities as real-time interaction becomes essential.

Top comments (1)

Collapse
 
fromzerotogrow profile image
FromZeroToGrow

Recommended