Your agent calls three tools, thinks for 20 seconds, then dumps a wall of text. The user stares at a blank screen the entire time. That's a broken experience -- and the number one reason users abandon AI-powered features.
The fix takes under 40 lines of Python. Here's how to stream agent responses -- both text tokens and tool call events -- so users see progress in real time.
Why Streaming Changes Everything
Blocking agent calls create two problems:
- Perceived latency. A 15-second wait feels like a minute. Users don't know if the agent is working or crashed.
- Timeout risk. Long-running agents hit HTTP gateway timeouts (typically 30-60s). Streaming keeps the connection alive.
With streaming, users see tool calls firing within 1-2 seconds and text appearing token-by-token immediately after. Same total time, completely different experience.
The Code
import asyncio
from agents import Agent, Runner, function_tool
from openai.types.responses import ResponseTextDeltaEvent
@function_tool
def lookup_price(ticker: str) -> str:
"""Look up the current price of a stock."""
prices = {"AAPL": "$198.50", "GOOG": "$176.30", "TSLA": "$245.10"}
return prices.get(ticker.upper(), f"No data for {ticker}")
agent = Agent(
name="StockAssistant",
instructions="You help users check stock prices. Use the lookup_price tool.",
tools=[lookup_price],
)
async def main():
result = Runner.run_streamed(agent, input="What's the price of AAPL and GOOG?")
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
print(event.data.delta, end="", flush=True)
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print(f"\n>> Calling tool...")
elif event.item.type == "tool_call_output_item":
print(f">> Tool returned: {event.item.output}")
elif event.item.type == "message_output_item":
pass # Already streaming via raw events above
print() # Final newline
if __name__ == "__main__":
asyncio.run(main())
Install and run:
pip install openai-agents
export OPENAI_API_KEY="sk-..."
python stream_agent.py
What You'll See
>> Calling tool...
>> Tool returned: $198.50
>> Calling tool...
>> Tool returned: $176.30
Apple (AAPL) is currently at $198.50 and Alphabet (GOOG)
is at $176.30. Both are showing...
Tokens appear one by one as the agent generates them. Tool calls show up the moment they happen -- not after the entire run completes.
How It Works
Runner.run_streamed() replaces Runner.run(). Instead of blocking until the agent finishes, it returns a result object immediately. You consume events from result.stream_events() as an async iterator.
Three event types matter:
raw_response_event fires for every token the LLM generates. Filter for ResponseTextDeltaEvent to grab the actual text deltas. Print them with end="" and flush=True to get the token-by-token effect.
run_item_stream_event fires when a complete item is generated -- a tool call, a tool output, or a finished message. This is where you show "Calling tool..." progress indicators.
agent_updated_stream_event fires when the current agent changes (during handoffs). You can skip this for single-agent setups.
The key insight: raw events give you real-time text, while item events give you structured milestones. Use both together for the best UX.
Streaming in a Web App (FastAPI + SSE)
The CLI example above works for scripts. For web apps, pipe events into Server-Sent Events:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
from openai.types.responses import ResponseTextDeltaEvent
app = FastAPI()
agent = Agent(
name="Assistant",
instructions="You are a helpful assistant.",
)
@app.get("/stream")
async def stream_response(query: str):
async def event_generator():
result = Runner.run_streamed(agent, input=query)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
yield f"data: {event.data.delta}\n\n"
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
yield f"data: [TOOL_CALL]\n\n"
elif event.item.type == "tool_call_output_item":
yield f"data: [TOOL_RESULT:{event.item.output}]\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Hit /stream?query=Hello and consume the SSE stream from your frontend with EventSource or a fetch reader.
Streaming vs Blocking: The Numbers
| Metric | Runner.run() |
Runner.run_streamed() |
|---|---|---|
| Time to first visible output | 10-30s | 1-2s |
| Connection timeout risk | High (>30s calls fail) | Low (chunked transfer) |
| User perception | "Is it broken?" | "It's working on it" |
| Implementation complexity | 1 line | ~10 extra lines |
Ten extra lines of code. Completely different user experience.
Quick Tips
-
Don't mix streaming with
.final_output. Theresult.final_outputis only available after the stream is fully consumed. Read events first, then access it. -
Handle
Nonedeltas. Some chunks have empty deltas -- theisinstancecheck filters those out. -
Tool approval works too. If a tool requires human approval, the stream pauses at that point. Check
result.interruptionsafter the stream ends. - WebSocket alternative. If your frontend needs bidirectional communication, forward events over a WebSocket instead of SSE. Same event loop, different transport.
Next Steps
Combine streaming with the patterns from earlier articles in this series -- retry logic, structured output, and human approval gates -- to build agents that are responsive, reliable, and safe.
Building agents that need streaming, tools, and orchestration out of the box? Nebula handles the infrastructure so you can focus on the logic.
Top comments (0)