DEV Community

Cover image for From Flask to FastAPI: Why I Migrated My Real-Time Speech Service
alfchee
alfchee

Posted on

From Flask to FastAPI: Why I Migrated My Real-Time Speech Service

Hey fellow developers! πŸš€ Let me tell you about one of the most impactful decisions I made in my career: migrating a critical real-time speech transcription service from Flask to FastAPI. If you're building anything that needs to handle concurrent connections, real-time data, or just wants to write more maintainable Python code, this story might just change how you approach your next project.

The Problem: Flask Was Holding Us Back

Picture this: We had a Flask-based speech-to-text service using Flask-SocketIO for WebSockets. It worked, but the cracks were showing:

  • Concurrency nightmares: Maxing out at ~50 concurrent sessions
  • Blocking operations everywhere: Every WebSocket message tied up a thread
  • Complex WebSocket handling: Flask-SocketIO added layers of abstraction that made debugging hell
  • No type safety: Manual validation led to runtime errors in production

Our users were complaining about dropped connections and slow response times. We needed a better foundation.

The Solution: FastAPI to the Rescue

FastAPI wasn't just an upgradeβ€”it was a complete paradigm shift. Here's why it transformed our service:

1. Native Async Support That Actually Works

Before (Flask + Flask-SocketIO):

from flask import Flask
from flask_socketio import SocketIO

app = Flask(__name__)
socketio = SocketIO(app)

@socketio.on('start_transcription')
def handle_transcription(data):
    # This blocks the entire thread!
    result = riva_client.transcribe_audio(data['audio'])
    emit('transcription_result', result)
Enter fullscreen mode Exit fullscreen mode

After (FastAPI):

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(title="Speech Service", version="1.0.0")
app.add_middleware(CORSMiddleware, allow_origins=["*"])

@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
    await websocket.accept()

    async for message in websocket.iter_text():
        data = json.loads(message)
        if data["type"] == "start_transcription":
            # Non-blocking async processing!
            await handle_transcription_async(session_id, data)
Enter fullscreen mode Exit fullscreen mode

2. Performance That Scales

The numbers don't lie:

Metric Flask (Before) FastAPI (After) Improvement
Concurrent Sessions ~50 150+ 3x increase
Response Latency 150-200ms 80-120ms 35% faster
CPU Usage High (blocking) Low (async) 60% reduction
Memory Usage Moderate Optimized 25% reduction

3. Type Safety with Pydantic

Before:

def start_transcription(data):
    language = data.get('language', 'en-US')  # Runtime errors waiting to happen
    if 'audio' not in data:
        return {'error': 'Missing audio'}
Enter fullscreen mode Exit fullscreen mode

After:

from pydantic import BaseModel
from typing import Optional

class TranscriptionRequest(BaseModel):
    language: str = "en-US"
    enable_automatic_punctuation: bool = True
    enable_interim_results: bool = True
    max_alternatives: int = 1

@app.post("/start-transcription")
async def start_transcription(request: TranscriptionRequest):
    # Type-safe, validated data - no more runtime surprises!
    return await process_transcription(request)
Enter fullscreen mode Exit fullscreen mode

How to Start Your FastAPI Journey Right

1. Project Structure That Scales

speech-service/
β”œβ”€β”€ app/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ main.py          # FastAPI app instance
β”‚   β”œβ”€β”€ config.py        # Settings management
β”‚   β”œβ”€β”€ routes/
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”‚   β”œβ”€β”€ transcription.py
β”‚   β”‚   └── health.py
β”‚   β”œβ”€β”€ services/
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ models/
β”‚   β”‚   β”œβ”€β”€ __init__.py
β”‚   β”‚   └── schemas.py   # Pydantic models
β”‚   └── core/
β”‚       β”œβ”€β”€ __init__.py
β”‚       └── logging.py
β”œβ”€β”€ tests/
β”œβ”€β”€ requirements.txt
└── Dockerfile
Enter fullscreen mode Exit fullscreen mode

2. Core Application Setup

A solid foundation is key. Here’s how to set up your main.py for scalability and maintainability.

main.py:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.routes import transcription, health
import logging

logger = logging.getLogger(__name__)

app = FastAPI(
    title=settings.PROJECT_NAME,
    version=settings.VERSION,
    description="Real-time speech transcription service"
)

# CORS is essential for web clients to connect from different origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Routers keep your API endpoints organized and decoupled from the main app
app.include_router(health.router, prefix="/api/v1")
app.include_router(transcription.router, prefix="/api/v1")

@app.on_event("startup")
async def startup_event():
    # Use startup events to initialize resources that live for the entire application lifecycle.
    # This is the perfect place to warm up ML models, establish database connection pools,
    # or connect to other services.
    logger.info("Starting up the speech service...")

@app.on_event("shutdown")
async def shutdown_event():
    # Shutdown events are critical for graceful termination.
    # Always clean up resources like database connections or file handlers here
    # to prevent resource leaks.
    logger.info("Shutting down gracefully...")
Enter fullscreen mode Exit fullscreen mode

3. Configuration Management

config.py:

from pydantic import BaseSettings

class Settings(BaseSettings):
    PROJECT_NAME: str = "Speech Service"
    VERSION: str = "1.0.0"
    API_V1_STR: str = "/api/v1"

    # Server settings
    HOST: str = "0.0.0.0"
    PORT: int = 8000

    # CORS
    ALLOWED_ORIGINS: list = ["http://localhost:3000", "https://yourapp.com"]

    # Service limits
    MAX_CONCURRENT_SESSIONS: int = 150
    SESSION_TIMEOUT_MINUTES: int = 30

    # Riva settings
    RIVA_SERVER: str = "grpc.nvcf.nvidia.com:443"
    RIVA_API_KEY: str

    class Config:
        env_file = ".env"

settings = Settings()
Enter fullscreen mode Exit fullscreen mode

4. WebSocket Implementation Best Practices

Showing a bunch of code isn't enough. Let's break down why this implementation is robust and production-ready. These are the best practices that will save you from late-night debugging sessions:

  1. Centralized Logic in a Service Layer: The TranscriptionService handles all business logic. The WebSocket endpoint is only responsible for receiving messages and managing the connection lifecycle. This separation of concerns is crucial for testing and maintainability.
  2. Graceful Connection Handling: The entire connection's logic is wrapped in a try...except...finally block. This is non-negotiable for production services.
  3. Specific Exception for Disconnects: We explicitly catch WebSocketDisconnect. This allows you to handle a client closing the connection normally, without treating it as an unexpected error.
  4. Guaranteed Cleanup with finally: The finally block ensures that cleanup_session is always called, whether the client disconnects gracefully, an error occurs, or the connection times out. This prevents resource leaks (like orphaned sessions or lingering connections to Riva).
  5. Clear Session Management: A unique session_id is used to track each client. This is fundamental for managing state, routing messages, and logging in a concurrent environment.
  6. Asynchronous Operations: Every I/O-bound call (e.g., websocket.accept(), receive_text(), send_json(), and all service calls) uses await, ensuring the server remains non-blocking and can handle other clients while waiting.

Here is the code that puts all these practices into action:

routes/transcription.py:

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json
import logging
from app.services.transcription_service import TranscriptionService
from app.models.schemas import TranscriptionConfig

router = APIRouter()
transcription_service = TranscriptionService()
logger = logging.getLogger(__name__)

@router.websocket("/transcribe/{session_id}")
async def transcribe_websocket(websocket: WebSocket, session_id: str):
    await websocket.accept()

    try:
        # Initialize session
        session = await transcription_service.create_session(session_id, websocket)

        while True:
            # Receive JSON control messages or binary audio data
            message = await websocket.receive_text()
            data = json.loads(message)

            if data["type"] == "start_transcription":
                config = TranscriptionConfig(**data["config"])
                await transcription_service.start_transcription(session, config)
                await websocket.send_json({
                    "event": "transcription_started",
                    "session_id": session_id
                })

            elif data["type"] == "stop_transcription":
                await transcription_service.stop_transcription(session)
                break

    except WebSocketDisconnect:
        logger.info(f"Client {session_id} disconnected gracefully.")
    except Exception as e:
        logger.error(f"An unexpected error occurred in session {session_id}: {e}")
        await websocket.send_json({
            "event": "error",
            "message": "An unexpected error occurred. Please try reconnecting."
        })
    finally:
        # This block is CRITICAL. It guarantees that resources are released.
        logger.info(f"Cleaning up resources for session {session_id}.")
        await transcription_service.cleanup_session(session_id)
Enter fullscreen mode Exit fullscreen mode

5. Service Layer for Business Logic

Your business logic is the heart of your application. Keep it clean, organized, and decoupled from the API layer.

services/transcription_service.py:

from typing import Dict, Optional
import asyncio
import logging
from fastapi import WebSocket
from app.core.riva_client import RivaClient
from app.models.schemas import TranscriptionConfig, Session

logger = logging.getLogger(__name__)

class TranscriptionService:
    def __init__(self):
        self.riva_client = RivaClient()
        self.active_sessions: Dict[str, Session] = {}
        # An asyncio.Lock is essential to prevent race conditions when modifying
        # shared state (like the active_sessions dict) from concurrent tasks.
        self._lock = asyncio.Lock()

    async def create_session(self, session_id: str, websocket: WebSocket) -> Session:
        async with self._lock:
            if session_id in self.active_sessions:
                raise ValueError(f"Session {session_id} already exists")

            session = Session(id=session_id, websocket=websocket)
            self.active_sessions[session_id] = session
            logger.info(f"Session {session_id} created.")
            return session

    async def start_transcription(self, session: Session, config: TranscriptionConfig):
        # Connect to Riva and start streaming
        await self.riva_client.start_transcription_stream(
            session_id=session.id,
            config=config
        )

        # Launching a background task with create_task is the key to non-blocking processing.
        # The main WebSocket loop can continue to handle other messages while this runs.
        session.processing_task = asyncio.create_task(self._process_results(session))

    async def _process_results(self, session: Session):
        """Background task to handle Riva streaming results and send them to the client."""
        try:
            async for result in self.riva_client.stream_results(session.id):
                # Forward results directly to the client's WebSocket
                await session.websocket.send_json(result)
        except Exception as e:
            logger.error(f"Error processing results for {session.id}: {e}")
            await session.websocket.send_json({"event": "error", "message": "Result processing failed."})

    async def cleanup_session(self, session_id: str):
        async with self._lock:
            if session_id in self.active_sessions:
                session = self.active_sessions.pop(session_id)
                if session.processing_task:
                    session.processing_task.cancel() # Ensure background tasks are stopped
                await self.riva_client.disconnect(session_id)
                logger.info(f"Successfully cleaned up session {session_id}.")
Enter fullscreen mode Exit fullscreen mode

Key Takeaways for Your FastAPI Journey

1. Start with Type Safety

Pydantic models catch errors at the API boundary. Don't skip thisβ€”it's your first line of defense.

2. Structure for Scalability

Separate concerns: routes for API endpoints, services for business logic, models for data validation.

3. Async Everything

If you're building real-time services, async is non-negotiable. FastAPI makes it natural.

4. Handle WebSocket Lifecycle Properly

Connections drop, errors happen. Always implement proper cleanup and error handling.

5. Monitor and Observe

FastAPI's auto-generated docs are great, but add health checks, metrics, and structured logging.

The Results Speak for Themselves

After migration:

  • Zero downtime deployment
  • 3x more concurrent users
  • 35% faster response times
  • Development velocity increased by 50%
  • Runtime errors reduced by 80%

Ready to Make the Switch?

If you're still using Flask for real-time applications, I urge you: make the switch to FastAPI. The initial learning curve is worth every second. Your users will thank you, your team will thank you, and future-you will definitely thank you.

Start small: pick one service, migrate it to FastAPI, and measure the improvements. The results will convince you.

What's your experience with FastAPI? Have you migrated from Flask? Share in the commentsβ€”I'd love to hear your stories! πŸš€


This article is based on real-world experience building production speech AI services. The code examples are simplified for clarity but demonstrate production-ready patterns.

Top comments (0)