DEV Community

Cover image for Building Resilient Real-Time Services: Error Handling at Scale
alfchee
alfchee

Posted on

Building Resilient Real-Time Services: Error Handling at Scale

Hey builders! πŸ‘‹ Let's talk about something that separates hobby projects from production systems: error handling. When you're running a real-time service with hundreds of concurrent users, errors aren't edge casesβ€”they're everyday reality.

Let me share how we built error handling that actually works at scale, and how it saved us countless 3 AM wake-up calls.

The Reality of Real-Time Systems

Here's what nobody tells you: in a real-time service, everything will fail:

  • Networks drop mid-stream
  • Clients disconnect without warning
  • GPUs run out of memory
  • External APIs timeout
  • Users send garbage data
  • Load spikes crash services

Your job isn't to prevent failures (you can't). Your job is to handle them gracefully.

The Error Categorization Framework

First rule: not all errors are created equal. We categorize every error into one of these buckets:

1. Connection Errors (CONN_*)

Problems with network connectivity or WebSocket state.

class ConnectionError(Exception):
    """Base class for connection-related errors"""
    pass

class CONN_CLIENT_DISCONNECT(ConnectionError):
    """Client closed connection normally"""
    code = "CONN_001"
    severity = "INFO"
    user_message = "Connection closed"

class CONN_TIMEOUT(ConnectionError):
    """Connection timed out"""
    code = "CONN_002"
    severity = "WARNING"
    user_message = "Connection timed out. Please reconnect."

class CONN_INVALID_MESSAGE(ConnectionError):
    """Received malformed message"""
    code = "CONN_003"
    severity = "ERROR"
    user_message = "Invalid message format"
Enter fullscreen mode Exit fullscreen mode

2. Riva Errors (RIVA_*)

Problems with the AI service backend.

class RivaError(Exception):
    """Base class for Riva-related errors"""
    pass

class RIVA_CONNECTION_FAILED(RivaError):
    """Cannot connect to Riva service"""
    code = "RIVA_001"
    severity = "CRITICAL"
    user_message = "Service temporarily unavailable"
    retry_strategy = "exponential_backoff"

class RIVA_MODEL_NOT_FOUND(RivaError):
    """Requested model not available"""
    code = "RIVA_002"
    severity = "ERROR"
    user_message = "Language not supported"
    retry_strategy = "no_retry"

class RIVA_QUOTA_EXCEEDED(RivaError):
    """API quota exceeded"""
    code = "RIVA_003"
    severity = "WARNING"
    user_message = "Service capacity reached. Please try again later."
    retry_strategy = "rate_limit"
Enter fullscreen mode Exit fullscreen mode

3. Audio Errors (AUDIO_*)

Problems with audio data or processing.

class AudioError(Exception):
    """Base class for audio-related errors"""
    pass

class AUDIO_INVALID_FORMAT(AudioError):
    """Audio format not supported"""
    code = "AUDIO_001"
    severity = "ERROR"
    user_message = "Invalid audio format. Please use PCM16."

class AUDIO_CHUNK_TOO_LARGE(AudioError):
    """Audio chunk exceeds size limit"""
    code = "AUDIO_002"
    severity = "WARNING"
    user_message = "Audio chunk too large"

class AUDIO_PROCESSING_FAILED(AudioError):
    """Failed to process audio"""
    code = "AUDIO_003"
    severity = "ERROR"
    user_message = "Failed to process audio"
Enter fullscreen mode Exit fullscreen mode

The Error Handler Pattern

Here's the pattern that saved our sanity:

from typing import Optional, Callable
import logging
from dataclasses import dataclass

@dataclass
class ErrorContext:
    """Context information for error handling"""
    session_id: str
    user_id: Optional[str]
    endpoint: str
    error_code: str
    severity: str
    original_error: Exception
    retry_attempt: int = 0

class ErrorHandler:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.error_callbacks = {}

    def register_callback(self, error_class, callback: Callable):
        """Register a callback for specific error types"""
        self.error_callbacks[error_class] = callback

    async def handle(self, error: Exception, context: ErrorContext):
        """Central error handling"""
        # Log with context
        self.logger.log(
            level=self._get_log_level(context.severity),
            msg=f"[{context.session_id}] {context.error_code}: {str(error)}",
            extra={
                "session_id": context.session_id,
                "error_code": context.error_code,
                "severity": context.severity,
                "endpoint": context.endpoint,
                "retry_attempt": context.retry_attempt
            }
        )

        # Execute registered callback if exists
        callback = self.error_callbacks.get(type(error))
        if callback:
            await callback(error, context)

        # Return user-friendly error message
        return self._get_user_message(error)

    def _get_log_level(self, severity: str) -> int:
        levels = {
            "INFO": logging.INFO,
            "WARNING": logging.WARNING,
            "ERROR": logging.ERROR,
            "CRITICAL": logging.CRITICAL
        }
        return levels.get(severity, logging.ERROR)

    def _get_user_message(self, error: Exception) -> dict:
        """Convert error to user-friendly message"""
        return {
            "event": "error",
            "code": getattr(error, 'code', 'UNKNOWN'),
            "message": getattr(error, 'user_message', 'An error occurred')
        }

# Global error handler instance
error_handler = ErrorHandler()
Enter fullscreen mode Exit fullscreen mode

Using the Error Handler in WebSocket Endpoints

Now let's put it to work:

@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
    context = ErrorContext(
        session_id=session_id,
        user_id=None,  # Set from auth
        endpoint="/transcribe"
    )

    try:
        await websocket.accept()

        # Main processing loop
        while True:
            try:
                message = await websocket.receive_text()
                data = json.loads(message)

                # Process message
                await process_message(session_id, data)

            except json.JSONDecodeError as e:
                # Invalid JSON
                error = CONN_INVALID_MESSAGE()
                context.error_code = error.code
                context.severity = error.severity
                context.original_error = e

                error_msg = await error_handler.handle(error, context)
                await websocket.send_json(error_msg)

            except AudioError as e:
                # Audio processing error
                context.error_code = e.code
                context.severity = e.severity
                context.original_error = e

                error_msg = await error_handler.handle(e, context)
                await websocket.send_json(error_msg)
                # Continue processing (don't close connection)

            except RivaError as e:
                # Riva service error
                context.error_code = e.code
                context.severity = e.severity
                context.original_error = e

                error_msg = await error_handler.handle(e, context)
                await websocket.send_json(error_msg)

                # Decide whether to retry or close
                if e.retry_strategy == "no_retry":
                    break
                elif e.retry_strategy == "exponential_backoff":
                    await asyncio.sleep(2 ** context.retry_attempt)
                    context.retry_attempt += 1

    except WebSocketDisconnect:
        # Normal disconnect - just log
        logger.info(f"[{session_id}] Client disconnected")

    except Exception as e:
        # Unexpected error - log and notify user
        context.error_code = "UNKNOWN"
        context.severity = "CRITICAL"
        context.original_error = e

        logger.exception(f"[{session_id}] Unexpected error: {e}")

        try:
            await websocket.send_json({
                "event": "error",
                "message": "An unexpected error occurred"
            })
        except:
            pass  # Connection might be dead

    finally:
        # Always cleanup
        await cleanup_session(session_id)
Enter fullscreen mode Exit fullscreen mode

Graceful Degradation

When things fail, degrade gracefully instead of crashing:

class TranscriptionService:
    def __init__(self):
        self.primary_riva = RivaClient(primary=True)
        self.fallback_riva = RivaClient(primary=False)
        self.local_cache = TranscriptionCache()

    async def transcribe(self, audio: bytes, session_id: str):
        """Transcribe with fallbacks"""

        # Try primary Riva service
        try:
            return await self.primary_riva.transcribe(audio)
        except RIVA_CONNECTION_FAILED as e:
            logger.warning(f"[{session_id}] Primary Riva failed, trying fallback")

            # Try fallback service
            try:
                return await self.fallback_riva.transcribe(audio)
            except RIVA_CONNECTION_FAILED:
                logger.error(f"[{session_id}] Both Riva services failed")

                # Check cache for similar audio
                cached_result = self.local_cache.get_similar(audio)
                if cached_result:
                    logger.info(f"[{session_id}] Returning cached result")
                    return cached_result

                # All options exhausted
                raise RIVA_CONNECTION_FAILED(
                    "All transcription services unavailable"
                )
Enter fullscreen mode Exit fullscreen mode

Monitoring Integration

Errors should trigger alerts and metrics:

from prometheus_client import Counter, Histogram

# Metrics
error_counter = Counter(
    'transcription_errors_total',
    'Total transcription errors',
    ['error_code', 'severity']
)

error_duration = Histogram(
    'error_handling_duration_seconds',
    'Time spent handling errors'
)

class MonitoredErrorHandler(ErrorHandler):
    async def handle(self, error: Exception, context: ErrorContext):
        with error_duration.time():
            # Count error
            error_counter.labels(
                error_code=context.error_code,
                severity=context.severity
            ).inc()

            # Handle error
            result = await super().handle(error, context)

            # Alert on critical errors
            if context.severity == "CRITICAL":
                await self.send_alert(error, context)

            return result

    async def send_alert(self, error: Exception, context: ErrorContext):
        """Send alert to monitoring system"""
        # Integration with PagerDuty, Slack, etc.
        pass
Enter fullscreen mode Exit fullscreen mode

Client-Side Error Handling

Don't forget the client! Give users actionable feedback:

class TranscriptionClient {
    constructor(sessionId) {
        this.sessionId = sessionId;
        this.connect();
    }

    connect() {
        this.ws = new WebSocket(`ws://api.example.com/transcribe/${this.sessionId}`);

        this.ws.onmessage = (event) => {
            const data = JSON.parse(event.data);

            if (data.event === 'error') {
                this.handleError(data);
            } else {
                this.handleTranscription(data);
            }
        };
    }

    handleError(error) {
        const errorHandlers = {
            'CONN_TIMEOUT': () => {
                this.showNotification('Connection timeout. Reconnecting...', 'warning');
                this.reconnect();
            },
            'RIVA_QUOTA_EXCEEDED': () => {
                this.showNotification('Service busy. Please try again in a moment.', 'warning');
                this.retryAfterDelay(5000);
            },
            'AUDIO_INVALID_FORMAT': () => {
                this.showNotification('Microphone error. Please check your settings.', 'error');
                this.stopRecording();
            },
            'default': () => {
                this.showNotification('An error occurred. Please refresh the page.', 'error');
            }
        };

        const handler = errorHandlers[error.code] || errorHandlers['default'];
        handler();
    }
}
Enter fullscreen mode Exit fullscreen mode

Best Practices

  1. Always categorize errors: Use consistent error codes
  2. Log with context: Include session IDs, user IDs, timestamps
  3. Separate user messages from logs: Users don't need stack traces
  4. Implement retry strategies: Not all errors should retry the same way
  5. Monitor error rates: Set alerts for unusual patterns
  6. Test error paths: Deliberately inject failures in tests
  7. Document error codes: Create an error code reference for your team

The Results

After implementing this error handling framework:

  • Mean Time To Recovery (MTTR) dropped by 70%
  • User complaints about cryptic errors dropped to nearly zero
  • Production incidents became easier to debug
  • Team confidence in deploying changes increased dramatically

Final Thoughts

Good error handling is invisible when it works. Users don't know you just failed over to a backup service, retried a request, or pulled from cache. They just know things worked.

That's the goal: resilience that's transparent to users but visible to operators.

What's your approach to error handling in production? Share your battle stories below! πŸš€

Top comments (0)