DEV Community

Cover image for Logging Strategies for Real-Time Applications: Session Tracking at Scale
alfchee
alfchee

Posted on

Logging Strategies for Real-Time Applications: Session Tracking at Scale

Hey builders! πŸ‘‹ Let's talk about something that sounds boring but becomes absolutely critical in production: logging. When you're running hundreds of concurrent sessions, bad logging is the difference between finding bugs in minutes vs. spending days debugging.

Let me share how we built a logging system that actually helps instead of drowns you in noise.

The Real-Time Logging Challenge

Traditional logging advice doesn't work for real-time apps. Here's why:

Traditional app logging:

2024-01-15 10:30:45 INFO Processing request
2024-01-15 10:30:46 ERROR Failed to connect to database
2024-01-15 10:30:47 INFO Processing request
Enter fullscreen mode Exit fullscreen mode

Real-time app with 100 concurrent sessions:

2024-01-15 10:30:45.123 INFO Processing audio
2024-01-15 10:30:45.124 INFO Processing audio
2024-01-15 10:30:45.125 ERROR Connection failed
2024-01-15 10:30:45.126 INFO Processing audio
2024-01-15 10:30:45.127 INFO Processing audio
2024-01-15 10:30:45.128 INFO Processing audio
Enter fullscreen mode Exit fullscreen mode

Which session failed? Good luck finding out.

Strategy #1: Session-Based Logging

Every log entry MUST include session context:

import logging
import uuid
from contextvars import ContextVar
from typing import Optional

# Context variable for session tracking
session_context: ContextVar[Optional[str]] = ContextVar('session_context', default=None)

class SessionLoggerAdapter(logging.LoggerAdapter):
    """Logger that automatically includes session context"""

    def process(self, msg, kwargs):
        session_id = session_context.get()
        if session_id:
            return f'[{session_id}] {msg}', kwargs
        return msg, kwargs

def get_logger(name: str) -> SessionLoggerAdapter:
    """Get a session-aware logger"""
    base_logger = logging.getLogger(name)
    return SessionLoggerAdapter(base_logger, {})

# Usage in your endpoint
logger = get_logger(__name__)

@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
    # Set session context for this async task
    session_context.set(session_id)

    logger.info("Session started")  # Logs: [abc-123] Session started

    try:
        await process_transcription(websocket)
    except Exception as e:
        logger.error(f"Transcription failed: {e}")  # Logs: [abc-123] Transcription failed: ...
    finally:
        logger.info("Session ended")  # Logs: [abc-123] Session ended
Enter fullscreen mode Exit fullscreen mode

Now every log line is traceable to a specific session!

Strategy #2: Structured Logging

Stop logging strings. Log structured data:

import logging
import json
from datetime import datetime
from typing import Any, Dict

class StructuredLogger:
    """Logger that outputs structured JSON"""

    def __init__(self, name: str):
        self.logger = logging.getLogger(name)

    def _log(self, level: int, event: str, **kwargs):
        """Log structured data as JSON"""
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": event,
            "session_id": session_context.get(),
            **kwargs
        }

        self.logger.log(level, json.dumps(log_data))

    def info(self, event: str, **kwargs):
        self._log(logging.INFO, event, **kwargs)

    def error(self, event: str, error: Exception = None, **kwargs):
        error_data = kwargs
        if error:
            error_data.update({
                "error_type": type(error).__name__,
                "error_message": str(error)
            })
        self._log(logging.ERROR, event, **error_data)

# Usage
logger = StructuredLogger(__name__)

logger.info(
    "audio_received",
    audio_size=len(audio_data),
    sample_rate=16000,
    channels=1
)

# Outputs:
# {"timestamp": "2024-01-15T10:30:45.123Z", "event": "audio_received", 
#  "session_id": "abc-123", "audio_size": 16000, "sample_rate": 16000, "channels": 1}
Enter fullscreen mode Exit fullscreen mode

Now you can search logs by specific fields!

Strategy #3: Correlation IDs

Track requests across multiple services:

from contextvars import ContextVar
import uuid

# Correlation ID for tracking across services
correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None)

class CorrelatedLogger(StructuredLogger):
    """Logger with correlation ID support"""

    def _log(self, level: int, event: str, **kwargs):
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "event": event,
            "session_id": session_context.get(),
            "correlation_id": correlation_id.get(),
            **kwargs
        }

        self.logger.log(level, json.dumps(log_data))

@app.websocket("/transcribe/{session_id}")
async def transcribe_endpoint(websocket: WebSocket, session_id: str):
    # Generate correlation ID for this request
    corr_id = str(uuid.uuid4())
    correlation_id.set(corr_id)
    session_context.set(session_id)

    logger = CorrelatedLogger(__name__)
    logger.info("session_started")

    # When calling Riva service, pass correlation ID
    await riva_client.transcribe(
        audio_data,
        metadata={"correlation_id": corr_id}
    )
Enter fullscreen mode Exit fullscreen mode

Now you can trace a request from client β†’ your service β†’ Riva β†’ back!

Strategy #4: Performance Logging

Log performance metrics for every operation:

import time
from functools import wraps
from typing import Callable

def log_performance(operation: str):
    """Decorator to log operation performance"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            logger = CorrelatedLogger(func.__module__)
            start_time = time.time()

            try:
                result = await func(*args, **kwargs)
                duration = time.time() - start_time

                logger.info(
                    f"{operation}_completed",
                    duration_ms=round(duration * 1000, 2),
                    success=True
                )

                return result

            except Exception as e:
                duration = time.time() - start_time

                logger.error(
                    f"{operation}_failed",
                    duration_ms=round(duration * 1000, 2),
                    success=False,
                    error=e
                )
                raise

        return wrapper
    return decorator

# Usage
@log_performance("audio_transcription")
async def transcribe_audio(audio: bytes, session_id: str) -> str:
    # Transcription logic
    return await riva_client.transcribe(audio)

# Logs:
# {"event": "audio_transcription_completed", "duration_ms": 245.67, "success": true}
Enter fullscreen mode Exit fullscreen mode

Strategy #5: Log Levels by Component

Different components need different log levels:

import logging.config

LOGGING_CONFIG = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "json": {
            "()": "pythonjsonlogger.jsonlogger.JsonFormatter",
            "format": "%(timestamp)s %(level)s %(name)s %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "json",
            "stream": "ext://sys.stdout"
        },
        "file": {
            "class": "logging.handlers.RotatingFileHandler",
            "formatter": "json",
            "filename": "logs/app.log",
            "maxBytes": 10485760,  # 10MB
            "backupCount": 5
        }
    },
    "loggers": {
        # Your app - verbose logging
        "app": {
            "level": "DEBUG",
            "handlers": ["console", "file"],
            "propagate": False
        },
        # Riva client - only warnings and errors
        "riva_client": {
            "level": "WARNING",
            "handlers": ["console", "file"],
            "propagate": False
        },
        # Third-party libraries - minimal logging
        "uvicorn": {
            "level": "INFO",
            "handlers": ["console"],
            "propagate": False
        },
        "grpc": {
            "level": "ERROR",
            "handlers": ["console"],
            "propagate": False
        }
    },
    "root": {
        "level": "INFO",
        "handlers": ["console", "file"]
    }
}

# Apply configuration
logging.config.dictConfig(LOGGING_CONFIG)
Enter fullscreen mode Exit fullscreen mode

Strategy #6: Sampling for High-Volume Events

Don't log EVERY audio chunk - sample intelligently:

import random

class SampledLogger(CorrelatedLogger):
    """Logger with sampling support for high-frequency events"""

    def __init__(self, name: str, sample_rate: float = 0.01):
        super().__init__(name)
        self.sample_rate = sample_rate

    def sample(self, event: str, **kwargs):
        """Log with sampling"""
        if random.random() < self.sample_rate:
            self.info(event, sampled=True, **kwargs)

logger = SampledLogger(__name__, sample_rate=0.01)  # Log 1% of events

# Log every 100th audio chunk
logger.sample(
    "audio_chunk_processed",
    chunk_size=len(chunk),
    total_chunks=chunk_count
)
Enter fullscreen mode Exit fullscreen mode

Strategy #7: Error Context Preservation

When errors happen, log EVERYTHING relevant:

import traceback
import sys

class ErrorContextLogger(CorrelatedLogger):
    """Logger with rich error context"""

    def error_with_context(
        self,
        event: str,
        error: Exception,
        **kwargs
    ):
        """Log error with full context"""

        # Get exception info
        exc_type, exc_value, exc_traceback = sys.exc_info()

        # Build error context
        error_context = {
            "error_type": type(error).__name__,
            "error_message": str(error),
            "error_code": getattr(error, 'code', None),
            "traceback": traceback.format_exc(),
            **kwargs
        }

        self.error(event, **error_context)

# Usage
logger = ErrorContextLogger(__name__)

try:
    await riva_client.transcribe(audio)
except Exception as e:
    logger.error_with_context(
        "transcription_failed",
        error=e,
        audio_size=len(audio),
        sample_rate=sample_rate,
        language=language,
        riva_endpoint=riva_client.endpoint
    )
    raise
Enter fullscreen mode Exit fullscreen mode

Strategy #8: Log Aggregation & Search

Use ELK Stack or Loki for log aggregation:

# Docker Compose for Loki + Grafana
version: '3'
services:
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"
    command: -config.file=/etc/loki/local-config.yaml

  promtail:
    image: grafana/promtail:latest
    volumes:
      - ./logs:/var/log
      - ./promtail-config.yaml:/etc/promtail/config.yaml
    command: -config.file=/etc/promtail/config.yaml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
Enter fullscreen mode Exit fullscreen mode

Now you can query logs with LogQL:

# Find all errors for a specific session
{job="transcription-service"} |= "session_id=abc-123" | json | level="ERROR"

# Find slow transcriptions
{job="transcription-service"} | json | duration_ms > 1000

# Count errors by type
sum by (error_type) (count_over_time({job="transcription-service"} | json | level="ERROR" [1h]))
Enter fullscreen mode Exit fullscreen mode

Strategy #9: Monitoring Integration

Connect logs to metrics:

from prometheus_client import Counter, Histogram

# Metrics
transcription_requests = Counter(
    'transcription_requests_total',
    'Total transcription requests',
    ['session_id', 'language', 'status']
)

transcription_duration = Histogram(
    'transcription_duration_seconds',
    'Transcription duration',
    ['language']
)

class MonitoredLogger(ErrorContextLogger):
    """Logger integrated with metrics"""

    @log_performance("transcription")
    async def log_transcription(
        self,
        session_id: str,
        language: str,
        audio_size: int
    ):
        start_time = time.time()

        try:
            result = await transcribe(audio_data, language)

            # Log success
            self.info(
                "transcription_completed",
                audio_size=audio_size,
                language=language,
                result_length=len(result)
            )

            # Update metrics
            transcription_requests.labels(
                session_id=session_id,
                language=language,
                status="success"
            ).inc()

            transcription_duration.labels(
                language=language
            ).observe(time.time() - start_time)

            return result

        except Exception as e:
            # Log failure
            self.error_with_context(
                "transcription_failed",
                error=e,
                audio_size=audio_size,
                language=language
            )

            # Update metrics
            transcription_requests.labels(
                session_id=session_id,
                language=language,
                status="error"
            ).inc()

            raise
Enter fullscreen mode Exit fullscreen mode

Best Practices

  1. Always include session/correlation IDs - Makes debugging possible
  2. Use structured logging - JSON is searchable and parseable
  3. Sample high-frequency events - Don't fill disk with audio chunk logs
  4. Log performance metrics - Know what's slow before users complain
  5. Preserve error context - Log everything needed to debug
  6. Set appropriate log levels - Debug in dev, Info in production
  7. Rotate log files - Don't fill up disk
  8. Centralize logs - Use log aggregation for multiple instances
  9. Alert on log patterns - Error rate spikes should trigger alerts
  10. Test your logging - Verify logs are useful during incidents

The Results

After implementing these logging strategies:

  • Mean Time to Resolution (MTTR) dropped from hours to minutes
  • Debug sessions became productive instead of frustrating
  • Production incidents were traceable across services
  • Performance bottlenecks became immediately visible
  • Customer support could look up exact session issues

Final Thoughts

Good logging is invisible when everything works, but invaluable when things break. The goal isn't to log everything - it's to log the right things at the right level with the right context.

Think of logs as breadcrumbs for future you. When you're debugging at 3 AM, you'll thank past you for logging that session ID.

What's your logging setup? Any horror stories about debugging without proper logs? Share below! πŸš€

Top comments (0)