Hey there, Python devs! 👋
Let’s explore a practical approach to giving users control over stopping those AI-generated responses?
The Scenario
Imagine you're building a FastAPI application that uses OpenAI's API. You've got streaming responses working smoothly, but there's one thing missing: the ability for users to stop the stream mid-generation.
The Challenge
Stopping a stream isn't as straightforward as you might think. OpenAI's API keeps pumping out tokens, and you need a clean way to interrupt that flow without breaking your entire application.
The Solution
Here's a killer implementation that'll make your users happy:
import asyncio
from fastapi import FastAPI, WebSocket
from openai import AsyncOpenAI
from typing import Optional
class StreamController:
def __init__(self):
self.stop_generation = False
def request_stop(self):
self.stop_generation = True
class AIResponseGenerator:
def __init__(self, client: AsyncOpenAI):
self.client = client
self.stream_controller = StreamController()
async def generate_streaming_response(self, prompt: str):
# Reset the stop flag
self.stream_controller.stop_generation = False
try:
stream = await self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_response = ""
for chunk in stream:
# Check if stop was requested
if self.stream_controller.stop_generation:
break
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
except Exception as e:
print(f"Stream generation error: {e}")
def stop_stream(self):
# Trigger the stop mechanism
self.stream_controller.request_stop()
Let's unpack what's happening here:
StreamController: This is our traffic cop. It manages a simple boolean flag to control stream generation.
-
AIResponseGenerator: The main class that handles AI response streaming.
- Uses AsyncOpenAI for non-blocking API calls
- Implements a generator that can be stopped mid-stream
- Provides a
stop_stream()
method to interrupt generation
Pro Tips
- 🚀 Performance: This approach is memory-efficient and doesn't block the event loop.
- 🛡️ Error Handling: Includes basic error catching to prevent unexpected crashes.
- 🔧 Flexibility: Easy to adapt to different streaming scenarios.
Potential Improvements
- Add timeout mechanisms
- Implement more granular error handling
- Create a more sophisticated stop mechanism for complex streams
See u guys!
Top comments (0)