DEV Community

Nebula
Nebula

Posted on • Edited on

Stop Waiting 30 Seconds: How to Stream AI Agent Output in Python

Your agent calls three tools, thinks for 20 seconds, then dumps a wall of text. The user stares at a blank screen the entire time. That's a broken experience -- and the number one reason users abandon AI-powered features.

The fix takes under 40 lines of Python. Here's how to stream agent responses -- both text tokens and tool call events -- so users see progress in real time.

Why Streaming Changes Everything

Blocking agent calls create two problems:

  1. Perceived latency. A 15-second wait feels like a minute. Users don't know if the agent is working or crashed.
  2. Timeout risk. Long-running agents hit HTTP gateway timeouts (typically 30-60s). Streaming keeps the connection alive.

With streaming, users see tool calls firing within 1-2 seconds and text appearing token-by-token immediately after. Same total time, completely different experience.

The Code

import asyncio
from agents import Agent, Runner, function_tool
from openai.types.responses import ResponseTextDeltaEvent


@function_tool
def lookup_price(ticker: str) -> str:
    """Look up the current price of a stock."""
    prices = {"AAPL": "$198.50", "GOOG": "$176.30", "TSLA": "$245.10"}
    return prices.get(ticker.upper(), f"No data for {ticker}")


agent = Agent(
    name="StockAssistant",
    instructions="You help users check stock prices. Use the lookup_price tool.",
    tools=[lookup_price],
)


async def main():
    result = Runner.run_streamed(agent, input="What's the price of AAPL and GOOG?")

    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
            print(event.data.delta, end="", flush=True)
        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                print(f"\n>> Calling tool...")
            elif event.item.type == "tool_call_output_item":
                print(f">> Tool returned: {event.item.output}")
            elif event.item.type == "message_output_item":
                pass  # Already streaming via raw events above

    print()  # Final newline


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Install and run:

pip install openai-agents
export OPENAI_API_KEY="sk-..."
python stream_agent.py
Enter fullscreen mode Exit fullscreen mode

What You'll See

>> Calling tool...
>> Tool returned: $198.50
>> Calling tool...
>> Tool returned: $176.30
Apple (AAPL) is currently at $198.50 and Alphabet (GOOG)
is at $176.30. Both are showing...
Enter fullscreen mode Exit fullscreen mode

Tokens appear one by one as the agent generates them. Tool calls show up the moment they happen -- not after the entire run completes.

How It Works

Runner.run_streamed() replaces Runner.run(). Instead of blocking until the agent finishes, it returns a result object immediately. You consume events from result.stream_events() as an async iterator.

Three event types matter:

raw_response_event fires for every token the LLM generates. Filter for ResponseTextDeltaEvent to grab the actual text deltas. Print them with end="" and flush=True to get the token-by-token effect.

run_item_stream_event fires when a complete item is generated -- a tool call, a tool output, or a finished message. This is where you show "Calling tool..." progress indicators.

agent_updated_stream_event fires when the current agent changes (during handoffs). You can skip this for single-agent setups.

The key insight: raw events give you real-time text, while item events give you structured milestones. Use both together for the best UX.

Streaming in a Web App (FastAPI + SSE)

The CLI example above works for scripts. For web apps, pipe events into Server-Sent Events:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
from openai.types.responses import ResponseTextDeltaEvent

app = FastAPI()

agent = Agent(
    name="Assistant",
    instructions="You are a helpful assistant.",
)


@app.get("/stream")
async def stream_response(query: str):
    async def event_generator():
        result = Runner.run_streamed(agent, input=query)
        async for event in result.stream_events():
            if event.type == "raw_response_event" and isinstance(
                event.data, ResponseTextDeltaEvent
            ):
                yield f"data: {event.data.delta}\n\n"
            elif event.type == "run_item_stream_event":
                if event.item.type == "tool_call_item":
                    yield f"data: [TOOL_CALL]\n\n"
                elif event.item.type == "tool_call_output_item":
                    yield f"data: [TOOL_RESULT:{event.item.output}]\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

Hit /stream?query=Hello and consume the SSE stream from your frontend with EventSource or a fetch reader.

Streaming vs Blocking: The Numbers

Metric Runner.run() Runner.run_streamed()
Time to first visible output 10-30s 1-2s
Connection timeout risk High (>30s calls fail) Low (chunked transfer)
User perception "Is it broken?" "It's working on it"
Implementation complexity 1 line ~10 extra lines

Ten extra lines of code. Completely different user experience.

Quick Tips

  • Don't mix streaming with .final_output. The result.final_output is only available after the stream is fully consumed. Read events first, then access it.
  • Handle None deltas. Some chunks have empty deltas -- the isinstance check filters those out.
  • Tool approval works too. If a tool requires human approval, the stream pauses at that point. Check result.interruptions after the stream ends.
  • WebSocket alternative. If your frontend needs bidirectional communication, forward events over a WebSocket instead of SSE. Same event loop, different transport.

Next Steps

Combine streaming with the patterns from earlier articles in this series -- retry logic, structured output, and human approval gates -- to build agents that are responsive, reliable, and safe.

Building agents that need streaming, tools, and orchestration out of the box? Nebula handles the infrastructure so you can focus on the logic.

Top comments (0)