Hey builders! π Let's talk about something that sounds boring but becomes absolutely critical in production: logging. When you're running hundreds of concurrent sessions, bad logging is the difference between finding bugs in minutes vs. spending days debugging.
Let me share how we built a logging system that actually helps instead of drowns you in noise.
The Real-Time Logging Challenge
Traditional logging advice doesn't work for real-time apps. Here's why:
Traditional app logging:
2024-01-15 10:30:45 INFO Processing request
2024-01-15 10:30:46 ERROR Failed to connect to database
2024-01-15 10:30:47 INFO Processing request
Real-time app with 100 concurrent sessions:
2024-01-15 10:30:45.123 INFO Processing audio
2024-01-15 10:30:45.124 INFO Processing audio
2024-01-15 10:30:45.125 ERROR Connection failed
2024-01-15 10:30:45.126 INFO Processing audio
2024-01-15 10:30:45.127 INFO Processing audio
2024-01-15 10:30:45.128 INFO Processing audio
Which session failed? Good luck finding out.
Strategy #1: Session-Based Logging
Every log entry MUST include session context:
import logging
import uuid
from contextvars import ContextVar
from typing import Optional
# Context variable for session tracking
session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None)
class SessionLoggerAdapter(logging.LoggerAdapter):
"""Logger that automatically includes session context"""
def process(self, msg, kwargs):
session_id = session_context.get()
if session_id:
return f'[{session_id}] {msg}', kwargs
return msg, kwargs
def get_logger(name: str) -> SessionLoggerAdapter:
"""Get a session-aware logger"""
base_logger = logging.getLogger(name)
return SessionLoggerAdapter(base_logger, {})
# Usage in your endpoint
logger = get_logger(__name__)
@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
# Set session context for this async task
session_context.set(session_id)
logger.info("Session started") # Logs: [abc-123] Session started
try:
await process_transcription(websocket)
except Exception as e:
logger.error(f"Transcription failed: {e}") # Logs: [abc-123] Transcription failed: ...
finally:
logger.info("Session ended") # Logs: [abc-123] Session ended
Now every log line is traceable to a specific session!
Strategy #2: Structured Logging
Stop logging strings. Log structured data:
import logging
import json
from datetime import datetime
from typing import Any, Dict
class StructuredLogger:
"""Logger that outputs structured JSON"""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def _log(self, level: int, event: str, **kwargs):
"""Log structured data as JSON"""
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"session_id": session_context.get(),
**kwargs
}
self.logger.log(level, json.dumps(log_data))
def info(self, event: str, **kwargs):
self._log(logging.INFO, event, **kwargs)
def error(self, event: str, error: Exception = None, **kwargs):
error_data = kwargs
if error:
error_data.update({
"error_type": type(error).__name__,
"error_message": str(error)
})
self._log(logging.ERROR, event, **error_data)
# Usage
logger = StructuredLogger(__name__)
logger.info(
"audio_received",
audio_size=len(audio_data),
sample_rate=16000,
channels=1
)
# Outputs:
# {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received",
# "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1}
Now you can search logs by specific fields!
Strategy #3: Correlation IDs
Track requests across multiple services:
from contextvars import ContextVar
import uuid
# Correlation ID for tracking across services
correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None)
class CorrelatedLogger(StructuredLogger):
"""Logger with correlation ID support"""
def _log(self, level: int, event: str, **kwargs):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"session_id": session_context.get(),
"correlation_id": correlation_id.get(),
**kwargs
}
self.logger.log(level, json.dumps(log_data))
@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
# Generate correlation ID for this request
corr_id = str(uuid.uuid4())
correlation_id.set(corr_id)
session_context.set(session_id)
logger = CorrelatedLogger(__name__)
logger.info("session_started")
# When calling Riva service, pass correlation ID
await riva_client.transcribe(
audio_data,
metadata={"correlation_id": corr_id}
)
Now you can trace a request from client β your service β Riva β back!
Strategy #4: Performance Logging
Log performance metrics for every operation:
import time
from functools import wraps
from typing import Callable
def log_performance(operation: str):
"""Decorator to log operation performance"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
logger = CorrelatedLogger(func.__module__)
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(
f"{operation}_completed",
duration_ms=round(duration * 1000, 2),
success=True
)
return result
except Exception as e:
duration = time.time() - start_time
logger.error(
f"{operation}_failed",
duration_ms=round(duration * 1000, 2),
success=False,
error=e
)
raise
return wrapper
return decorator
# Usage
@log_performance("audio_transcription")
async def transcribe_audio(audio: bytes, session_id: str) -> str:
# Transcription logic
return await riva_client.transcribe(audio)
# Logs:
# {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true}
Strategy #5: Log Levels by Component
Different components need different log levels:
import logging.config
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": "%(timestamp)s %(level)s %(name)s %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "json",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "json",
"filename": "logs/app.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5
}
},
"loggers": {
# Your app - verbose logging
"app": {
"level": "DEBUG",
"handlers": ["console", "file"],
"propagate": False
},
# Riva client - only warnings and errors
"riva_client": {
"level": "WARNING",
"handlers": ["console", "file"],
"propagate": False
},
# Third-party libraries - minimal logging
"uvicorn": {
"level": "INFO",
"handlers": ["console"],
"propagate": False
},
"grpc": {
"level": "ERROR",
"handlers": ["console"],
"propagate": False
}
},
"root": {
"level": "INFO",
"handlers": ["console", "file"]
}
}
# Apply configuration
logging.config.dictConfig(LOGGING_CONFIG)
Strategy #6: Sampling for High-Volume Events
Don't log EVERY audio chunk - sample intelligently:
import random
class SampledLogger(CorrelatedLogger):
"""Logger with sampling support for high-frequency events"""
def __init__(self, name: str, sample_rate: float = 0.01):
super().__init__(name)
self.sample_rate = sample_rate
def sample(self, event: str, **kwargs):
"""Log with sampling"""
if random.random() < self.sample_rate:
self.info(event, sampled=True, **kwargs)
logger = SampledLogger(__name__, sample_rate=0.01) # Log 1% of events
# Log every 100th audio chunk
logger.sample(
"audio_chunk_processed",
chunk_size=len(chunk),
total_chunks=chunk_count
)
Strategy #7: Error Context Preservation
When errors happen, log EVERYTHING relevant:
import traceback
import sys
class ErrorContextLogger(CorrelatedLogger):
"""Logger with rich error context"""
def error_with_context(
self,
event: str,
error: Exception,
**kwargs
):
"""Log error with full context"""
# Get exception info
exc_type, exc_value, exc_traceback = sys.exc_info()
# Build error context
error_context = {
"error_type": type(error).__name__,
"error_message": str(error),
"error_code": getattr(error, 'code', None),
"traceback": traceback.format_exc(),
**kwargs
}
self.error(event, **error_context)
# Usage
logger = ErrorContextLogger(__name__)
try:
await riva_client.transcribe(audio)
except Exception as e:
logger.error_with_context(
"transcription_failed",
error=e,
audio_size=len(audio),
sample_rate=sample_rate,
language=language,
riva_endpoint=riva_client.endpoint
)
raise
Strategy #8: Log Aggregation & Search
Use ELK Stack or Loki for log aggregation:
# Docker Compose for Loki + Grafana
version: '3'
services:
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:latest
volumes:
- ./logs:/var/log
- ./promtail-config.yaml:/etc/promtail/config.yaml
command: -config.file=/etc/promtail/config.yaml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
Now you can query logs with LogQL:
# Find all errors for a specific session
{job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR"
# Find slow transcriptions
{job="transcription-service"} | json | duration_ms > 1000
# Count errors by type
sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h]))
Strategy #9: Monitoring Integration
Connect logs to metrics:
from prometheus_client import Counter, Histogram
# Metrics
transcription_requests = Counter(
'transcription_requests_total',
'Total transcription requests',
['session_id', 'language', 'status']
)
transcription_duration = Histogram(
'transcription_duration_seconds',
'Transcription duration',
['language']
)
class MonitoredLogger(ErrorContextLogger):
"""Logger integrated with metrics"""
@log_performance("transcription")
async def log_transcription(
self,
session_id: str,
language: str,
audio_size: int
):
start_time = time.time()
try:
result = await transcribe(audio_data, language)
# Log success
self.info(
"transcription_completed",
audio_size=audio_size,
language=language,
result_length=len(result)
)
# Update metrics
transcription_requests.labels(
session_id=session_id,
language=language,
status="success"
).inc()
transcription_duration.labels(
language=language
).observe(time.time() - start_time)
return result
except Exception as e:
# Log failure
self.error_with_context(
"transcription_failed",
error=e,
audio_size=audio_size,
language=language
)
# Update metrics
transcription_requests.labels(
session_id=session_id,
language=language,
status="error"
).inc()
raise
Best Practices
- Always include session/correlation IDs - Makes debugging possible
- Use structured logging - JSON is searchable and parseable
- Sample high-frequency events - Don't fill disk with audio chunk logs
- Log performance metrics - Know what's slow before users complain
- Preserve error context - Log everything needed to debug
- Set appropriate log levels - Debug in dev, Info in production
- Rotate log files - Don't fill up disk
- Centralize logs - Use log aggregation for multiple instances
- Alert on log patterns - Error rate spikes should trigger alerts
- Test your logging - Verify logs are useful during incidents
The Results
After implementing these logging strategies:
- Mean Time to Resolution (MTTR) dropped from hours to minutes
- Debug sessions became productive instead of frustrating
- Production incidents were traceable across services
- Performance bottlenecks became immediately visible
- Customer support could look up exact session issues
Final Thoughts
Good logging is invisible when everything works, but invaluable when things break. The goal isn't to log everything - it's to log the right things at the right level with the right context.
Think of logs as breadcrumbs for future you. When you're debugging at 3 AM, you'll thank past you for logging that session ID.
What's your logging setup? Any horror stories about debugging without proper logs? Share below! π
Top comments (0)