Hey builders! π Let's talk about something that separates hobby projects from production systems: error handling. When you're running a real-time service with hundreds of concurrent users, errors aren't edge casesβthey're everyday reality.
Let me share how we built error handling that actually works at scale, and how it saved us countless 3 AM wake-up calls.
The Reality of Real-Time Systems
Here's what nobody tells you: in a real-time service, everything will fail:
- Networks drop mid-stream
- Clients disconnect without warning
- GPUs run out of memory
- External APIs timeout
- Users send garbage data
- Load spikes crash services
Your job isn't to prevent failures (you can't). Your job is to handle them gracefully.
The Error Categorization Framework
First rule: not all errors are created equal. We categorize every error into one of these buckets:
1. Connection Errors (CONN_*)
Problems with network connectivity or WebSocket state.
class ConnectionError(Exception):
"""Base class for connection-related errors"""
pass
class CONN_CLIENT_DISCONNECT(ConnectionError):
"""Client closed connection normally"""
code = "CONN_001"
severity = "INFO"
user_message = "Connection closed"
class CONN_TIMEOUT(ConnectionError):
"""Connection timed out"""
code = "CONN_002"
severity = "WARNING"
user_message = "Connection timed out. Please reconnect."
class CONN_INVALID_MESSAGE(ConnectionError):
"""Received malformed message"""
code = "CONN_003"
severity = "ERROR"
user_message = "Invalid message format"
2. Riva Errors (RIVA_*)
Problems with the AI service backend.
class RivaError(Exception):
"""Base class for Riva-related errors"""
pass
class RIVA_CONNECTION_FAILED(RivaError):
"""Cannot connect to Riva service"""
code = "RIVA_001"
severity = "CRITICAL"
user_message = "Service temporarily unavailable"
retry_strategy = "exponential_backoff"
class RIVA_MODEL_NOT_FOUND(RivaError):
"""Requested model not available"""
code = "RIVA_002"
severity = "ERROR"
user_message = "Language not supported"
retry_strategy = "no_retry"
class RIVA_QUOTA_EXCEEDED(RivaError):
"""API quota exceeded"""
code = "RIVA_003"
severity = "WARNING"
user_message = "Service capacity reached. Please try again later."
retry_strategy = "rate_limit"
3. Audio Errors (AUDIO_*)
Problems with audio data or processing.
class AudioError(Exception):
"""Base class for audio-related errors"""
pass
class AUDIO_INVALID_FORMAT(AudioError):
"""Audio format not supported"""
code = "AUDIO_001"
severity = "ERROR"
user_message = "Invalid audio format. Please use PCM16."
class AUDIO_CHUNK_TOO_LARGE(AudioError):
"""Audio chunk exceeds size limit"""
code = "AUDIO_002"
severity = "WARNING"
user_message = "Audio chunk too large"
class AUDIO_PROCESSING_FAILED(AudioError):
"""Failed to process audio"""
code = "AUDIO_003"
severity = "ERROR"
user_message = "Failed to process audio"
The Error Handler Pattern
Here's the pattern that saved our sanity:
from typing import Optional, Callable
import logging
from dataclasses import dataclass
@dataclass
class ErrorContext:
"""Context information for error handling"""
session_id: str
user_id: Optional[str]
endpoint: str
error_code: str
severity: str
original_error: Exception
retry_attempt: int = 0
class ErrorHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_callbacks = {}
def register_callback(self, error_class, callback: Callable):
"""Register a callback for specific error types"""
self.error_callbacks[error_class] = callback
async def handle(self, error: Exception, context: ErrorContext):
"""Central error handling"""
# Log with context
self.logger.log(
level=self._get_log_level(context.severity),
msg=f"[{context.session_id}] {context.error_code}: {str(error)}",
extra={
"session_id": context.session_id,
"error_code": context.error_code,
"severity": context.severity,
"endpoint": context.endpoint,
"retry_attempt": context.retry_attempt
}
)
# Execute registered callback if exists
callback = self.error_callbacks.get(type(error))
if callback:
await callback(error, context)
# Return user-friendly error message
return self._get_user_message(error)
def _get_log_level(self, severity: str) -> int:
levels = {
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL
}
return levels.get(severity, logging.ERROR)
def _get_user_message(self, error: Exception) -> dict:
"""Convert error to user-friendly message"""
return {
"event": "error",
"code": getattr(error, 'code', 'UNKNOWN'),
"message": getattr(error, 'user_message', 'An error occurred')
}
# Global error handler instance
error_handler = ErrorHandler()
Using the Error Handler in WebSocket Endpoints
Now let's put it to work:
@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
context = ErrorContext(
session_id=session_id,
user_id=None, # Set from auth
endpoint="/transcribe"
)
try:
await websocket.accept()
# Main processing loop
while True:
try:
message = await websocket.receive_text()
data = json.loads(message)
# Process message
await process_message(session_id, data)
except json.JSONDecodeError as e:
# Invalid JSON
error = CONN_INVALID_MESSAGE()
context.error_code = error.code
context.severity = error.severity
context.original_error = e
error_msg = await error_handler.handle(error, context)
await websocket.send_json(error_msg)
except AudioError as e:
# Audio processing error
context.error_code = e.code
context.severity = e.severity
context.original_error = e
error_msg = await error_handler.handle(e, context)
await websocket.send_json(error_msg)
# Continue processing (don't close connection)
except RivaError as e:
# Riva service error
context.error_code = e.code
context.severity = e.severity
context.original_error = e
error_msg = await error_handler.handle(e, context)
await websocket.send_json(error_msg)
# Decide whether to retry or close
if e.retry_strategy == "no_retry":
break
elif e.retry_strategy == "exponential_backoff":
await asyncio.sleep(2 ** context.retry_attempt)
context.retry_attempt += 1
except WebSocketDisconnect:
# Normal disconnect - just log
logger.info(f"[{session_id}] Client disconnected")
except Exception as e:
# Unexpected error - log and notify user
context.error_code = "UNKNOWN"
context.severity = "CRITICAL"
context.original_error = e
logger.exception(f"[{session_id}] Unexpected error: {e}")
try:
await websocket.send_json({
"event": "error",
"message": "An unexpected error occurred"
})
except:
pass # Connection might be dead
finally:
# Always cleanup
await cleanup_session(session_id)
Graceful Degradation
When things fail, degrade gracefully instead of crashing:
class TranscriptionService:
def __init__(self):
self.primary_riva = RivaClient(primary=True)
self.fallback_riva = RivaClient(primary=False)
self.local_cache = TranscriptionCache()
async def transcribe(self, audio: bytes, session_id: str):
"""Transcribe with fallbacks"""
# Try primary Riva service
try:
return await self.primary_riva.transcribe(audio)
except RIVA_CONNECTION_FAILED as e:
logger.warning(f"[{session_id}] Primary Riva failed, trying fallback")
# Try fallback service
try:
return await self.fallback_riva.transcribe(audio)
except RIVA_CONNECTION_FAILED:
logger.error(f"[{session_id}] Both Riva services failed")
# Check cache for similar audio
cached_result = self.local_cache.get_similar(audio)
if cached_result:
logger.info(f"[{session_id}] Returning cached result")
return cached_result
# All options exhausted
raise RIVA_CONNECTION_FAILED(
"All transcription services unavailable"
)
Monitoring Integration
Errors should trigger alerts and metrics:
from prometheus_client import Counter, Histogram
# Metrics
error_counter = Counter(
'transcription_errors_total',
'Total transcription errors',
['error_code', 'severity']
)
error_duration = Histogram(
'error_handling_duration_seconds',
'Time spent handling errors'
)
class MonitoredErrorHandler(ErrorHandler):
async def handle(self, error: Exception, context: ErrorContext):
with error_duration.time():
# Count error
error_counter.labels(
error_code=context.error_code,
severity=context.severity
).inc()
# Handle error
result = await super().handle(error, context)
# Alert on critical errors
if context.severity == "CRITICAL":
await self.send_alert(error, context)
return result
async def send_alert(self, error: Exception, context: ErrorContext):
"""Send alert to monitoring system"""
# Integration with PagerDuty, Slack, etc.
pass
Client-Side Error Handling
Don't forget the client! Give users actionable feedback:
class TranscriptionClient {
constructor(sessionId) {
this.sessionId = sessionId;
this.connect();
}
connect() {
this.ws = new WebSocket(`ws://api.example.com/transcribe/${this.sessionId}`);
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.event === 'error') {
this.handleError(data);
} else {
this.handleTranscription(data);
}
};
}
handleError(error) {
const errorHandlers = {
'CONN_TIMEOUT': () => {
this.showNotification('Connection timeout. Reconnecting...', 'warning');
this.reconnect();
},
'RIVA_QUOTA_EXCEEDED': () => {
this.showNotification('Service busy. Please try again in a moment.', 'warning');
this.retryAfterDelay(5000);
},
'AUDIO_INVALID_FORMAT': () => {
this.showNotification('Microphone error. Please check your settings.', 'error');
this.stopRecording();
},
'default': () => {
this.showNotification('An error occurred. Please refresh the page.', 'error');
}
};
const handler = errorHandlers[error.code] || errorHandlers['default'];
handler();
}
}
Best Practices
- Always categorize errors: Use consistent error codes
- Log with context: Include session IDs, user IDs, timestamps
- Separate user messages from logs: Users don't need stack traces
- Implement retry strategies: Not all errors should retry the same way
- Monitor error rates: Set alerts for unusual patterns
- Test error paths: Deliberately inject failures in tests
- Document error codes: Create an error code reference for your team
The Results
After implementing this error handling framework:
- Mean Time To Recovery (MTTR) dropped by 70%
- User complaints about cryptic errors dropped to nearly zero
- Production incidents became easier to debug
- Team confidence in deploying changes increased dramatically
Final Thoughts
Good error handling is invisible when it works. Users don't know you just failed over to a backup service, retried a request, or pulled from cache. They just know things worked.
That's the goal: resilience that's transparent to users but visible to operators.
What's your approach to error handling in production? Share your battle stories below! π
Top comments (0)