Why Proper Logging Matters
Before diving into technical details, let's understand why proper logging is important:
- Enables effective debugging in production
- Provides insights into application behavior
- Facilitates performance monitoring
- Helps track security incidents
- Supports compliance requirements
- Improves maintenance efficiency
Quick Start with Python Logging
For those new to Python logging, here's a basic example using logging.basicConfig:
# Simple python logging example
import logging
# Basic logger in python example
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create a logger
logger = logging.getLogger(__name__)
# Logger in python example
logger.info("This is an information message")
logger.warning("This is a warning message")
This example demonstrates the basics of the logging module in python and shows how to use python logger logging in your application.
Getting Started with Python's Logging Module
Basic Setup
Let's start with a simple logging configuration:
import logging
# Basic configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Your first logger
logger = logging.getLogger(__name__)
# Using the logger
logger.info("Application started")
logger.warning("Watch out!")
logger.error("Something went wrong")
Understanding Log Levels
Python logging comes with five standard levels:
Level | Numeric Value | When to Use |
---|---|---|
DEBUG | 10 | Detailed information for diagnosing problems |
INFO | 20 | General operational events |
WARNING | 30 | Something unexpected happened |
ERROR | 40 | More serious problem |
CRITICAL | 50 | Program may not be able to continue |
Beyond print() Statements
Why choose logging over print statements?
- Severity levels for better categorization
- Timestamp information
- Source information (file, line number)
- Configurable output destinations
- Production-ready filtering
- Thread safety
Configuring Your Logging System
Basic Configuration Options
logging.basicConfig(
filename='app.log',
filemode='w',
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S'
)
Advanced Configuration
For more complex applications:
config = {
'version': 1,
'formatters': {
'detailed': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'detailed'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'app.log',
'level': 'DEBUG',
'formatter': 'detailed'
}
},
'loggers': {
'myapp': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
'propagate': True
}
}
}
logging.config.dictConfig(config)
Working with Advanced Logging
Structured Logging
Structured logging provides a consistent, machine-readable format that's essential for log analysis and monitoring. For a comprehensive overview of structured logging patterns and best practices, check out the structured logging guide. Let's implement structured logging in Python:
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def __init__(self):
super().__init__()
def format(self, record):
# Create base log record
log_obj = {
"timestamp": self.formatTime(record, self.datefmt),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add exception info if present
if record.exc_info:
log_obj["exception"] = self.formatException(record.exc_info)
# Add custom fields from extra
if hasattr(record, "extra_fields"):
log_obj.update(record.extra_fields)
return json.dumps(log_obj)
# Usage Example
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# Log with extra fields
logger.info("User logged in", extra={"extra_fields": {"user_id": "123", "ip": "192.168.1.1"}})
Error Management
Proper error logging is crucial for debugging production issues. Here's a comprehensive approach:
import traceback
import sys
from contextlib import contextmanager
class ErrorLogger:
def __init__(self, logger):
self.logger = logger
@contextmanager
def error_context(self, operation_name, **context):
"""Context manager for error logging with additional context"""
try:
yield
except Exception as e:
# Capture the current stack trace
exc_type, exc_value, exc_traceback = sys.exc_info()
# Format error details
error_details = {
"operation": operation_name,
"error_type": exc_type.__name__,
"error_message": str(exc_value),
"context": context,
"stack_trace": traceback.format_exception(exc_type, exc_value, exc_traceback)
}
# Log the error with full context
self.logger.error(
f"Error in {operation_name}: {str(exc_value)}",
extra={"error_details": error_details}
)
# Re-raise the exception
raise
# Usage Example
logger = logging.getLogger(__name__)
error_logger = ErrorLogger(logger)
with error_logger.error_context("user_authentication", user_id="123", attempt=2):
# Your code that might raise an exception
authenticate_user(user_id)
Concurrent Logging
When logging in multi-threaded applications, you need to ensure thread safety:
import threading
import logging
from queue import Queue
from logging.handlers import QueueHandler, QueueListener
def setup_thread_safe_logging():
"""Set up thread-safe logging with a queue"""
# Create the queue
log_queue = Queue()
# Create handlers
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('app.log')
# Create queue handler and listener
queue_handler = QueueHandler(log_queue)
listener = QueueListener(
log_queue,
console_handler,
file_handler,
respect_handler_level=True
)
# Configure root logger
root_logger = logging.getLogger()
root_logger.addHandler(queue_handler)
# Start the listener in a separate thread
listener.start()
return listener
# Usage
listener = setup_thread_safe_logging()
def worker_function():
logger = logging.getLogger(__name__)
logger.info(f"Worker thread {threading.current_thread().name} starting")
# Do work...
logger.info(f"Worker thread {threading.current_thread().name} finished")
# Create and start threads
threads = [
threading.Thread(target=worker_function)
for _ in range(3)
]
for thread in threads:
thread.start()
Logging in Different Environments
Different application environments require specific logging approaches. Whether you're working with web applications, microservices, or background tasks, each environment has unique logging requirements and best practices. Let's explore how to implement effective logging across various deployment scenarios.
Web Application Logging
Django Logging Configuration
Here's a comprehensive Django logging setup:
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'filters': {
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'console': {
'level': 'INFO',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'simple'
},
'file': {
'level': 'ERROR',
'class': 'logging.FileHandler',
'filename': 'django-errors.log',
'formatter': 'verbose'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'include_html': True,
}
},
'loggers': {
'django': {
'handlers': ['console'],
'propagate': True,
},
'django.request': {
'handlers': ['file', 'mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myapp': {
'handlers': ['console', 'file'],
'level': 'INFO',
}
}
}
Flask Logging Setup
Flask provides its own logging system that can be customized:
import logging
from logging.handlers import RotatingFileHandler
from flask import Flask, request
app = Flask(__name__)
def setup_logger():
# Create formatter
formatter = logging.Formatter(
'[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
)
# File Handler
file_handler = RotatingFileHandler(
'flask_app.log',
maxBytes=10485760, # 10MB
backupCount=10
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# Add request context
class RequestFormatter(logging.Formatter):
def format(self, record):
record.url = request.url
record.remote_addr = request.remote_addr
return super().format(record)
# Configure app logger
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
return app.logger
# Usage in routes
@app.route('/api/endpoint')
def api_endpoint():
app.logger.info(f'Request received from {request.remote_addr}')
# Your code here
return jsonify({'status': 'success'})
FastAPI Logging Practices
FastAPI can leverage Python's logging with some middleware enhancements:
from fastapi import FastAPI, Request
from typing import Callable
import logging
import time
app = FastAPI()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Middleware for request logging
@app.middleware("http")
async def log_requests(request: Request, call_next: Callable):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
log_dict = {
"url": str(request.url),
"method": request.method,
"client_ip": request.client.host,
"duration": f"{duration:.2f}s",
"status_code": response.status_code
}
logger.info(f"Request processed: {log_dict}")
return response
# Example endpoint with logging
@app.get("/items/{item_id}")
async def read_item(item_id: int):
logger.info(f"Retrieving item {item_id}")
# Your code here
return {"item_id": item_id}
Microservices Logging
For microservices, distributed tracing and correlation IDs are essential:
import logging
import contextvars
from uuid import uuid4
# Create context variable for trace ID
trace_id_var = contextvars.ContextVar('trace_id', default=None)
class TraceIDFilter(logging.Filter):
def filter(self, record):
trace_id = trace_id_var.get()
record.trace_id = trace_id if trace_id else 'no_trace'
return True
def setup_microservice_logging(service_name):
logger = logging.getLogger(service_name)
# Create formatter with trace ID
formatter = logging.Formatter(
'%(asctime)s - %(name)s - [%(trace_id)s] - %(levelname)s - %(message)s'
)
# Add handlers with trace ID filter
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(TraceIDFilter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Usage in microservice
logger = setup_microservice_logging('order_service')
def process_order(order_data):
# Generate or get trace ID from request
trace_id_var.set(str(uuid4()))
logger.info("Starting order processing", extra={
'order_id': order_data['id'],
'customer_id': order_data['customer_id']
})
# Process order...
logger.info("Order processed successfully")
Background Task Logging
For background tasks, we need to ensure proper log handling and rotation:
from logging.handlers import RotatingFileHandler
import logging
import threading
from datetime import datetime
class BackgroundTaskLogger:
def __init__(self, task_name):
self.logger = logging.getLogger(f'background_task.{task_name}')
self.setup_logging()
def setup_logging(self):
# Create logs directory if it doesn't exist
import os
os.makedirs('logs', exist_ok=True)
# Setup rotating file handler
handler = RotatingFileHandler(
filename=f'logs/task_{datetime.now():%Y%m%d}.log',
maxBytes=5*1024*1024, # 5MB
backupCount=5
)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - [%(threadName)s] - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_task_status(self, status, **kwargs):
"""Log task status with additional context"""
extra = {
'thread_id': threading.get_ident(),
'timestamp': datetime.now().isoformat(),
**kwargs
}
self.logger.info(f"Task status: {status}", extra=extra)
# Usage example
def background_job():
logger = BackgroundTaskLogger('data_processing')
try:
logger.log_task_status('started', job_id=123)
# Do some work...
logger.log_task_status('completed', records_processed=1000)
except Exception as e:
logger.logger.error(f"Task failed: {str(e)}", exc_info=True)
Common Logging Patterns and Solutions
Request ID Tracking
Implementing request tracking across your application:
import logging
from contextlib import contextmanager
import threading
import uuid
# Store request ID in thread-local storage
_request_id = threading.local()
class RequestIDFilter(logging.Filter):
def filter(self, record):
record.request_id = getattr(_request_id, 'id', 'no_request_id')
return True
@contextmanager
def request_context(request_id=None):
"""Context manager for request tracking"""
if request_id is None:
request_id = str(uuid.uuid4())
old_id = getattr(_request_id, 'id', None)
_request_id.id = request_id
try:
yield request_id
finally:
if old_id is None:
del _request_id.id
else:
_request_id.id = old_id
# Setup logging with request ID
def setup_request_logging():
logger = logging.getLogger()
formatter = logging.Formatter(
'%(asctime)s - [%(request_id)s] - %(levelname)s - %(message)s'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.addFilter(RequestIDFilter())
logger.addHandler(handler)
return logger
# Usage example
logger = setup_request_logging()
def process_request(data):
with request_context() as request_id:
logger.info("Processing request", extra={
'data': data,
'operation': 'process_request'
})
# Process the request...
logger.info("Request processed successfully")
User Activity Logging
Track user actions securely:
import logging
from datetime import datetime
from typing import Dict, Any
class UserActivityLogger:
def __init__(self):
self.logger = logging.getLogger('user_activity')
self.setup_logger()
def setup_logger(self):
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - '
'[User: %(user_id)s] %(message)s'
)
handler = logging.FileHandler('user_activity.log')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_activity(
self,
user_id: str,
action: str,
resource: str,
details: Dict[str, Any] = None
):
"""Log user activity with context"""
activity_data = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'action': action,
'resource': resource,
'details': details or {}
}
self.logger.info(
f"User performed {action} on {resource}",
extra={
'user_id': user_id,
'activity_data': activity_data
}
)
return activity_data
# Usage example
activity_logger = UserActivityLogger()
def update_user_profile(user_id: str, profile_data: dict):
try:
# Update profile logic here...
activity_logger.log_activity(
user_id=user_id,
action='update_profile',
resource='user_profile',
details={
'updated_fields': list(profile_data.keys()),
'source_ip': '192.168.1.1'
}
)
except Exception as e:
activity_logger.logger.error(
f"Profile update failed for user {user_id}",
extra={'user_id': user_id, 'error': str(e)},
exc_info=True
)
Troubleshooting and Debugging
Effective troubleshooting of logging issues requires understanding common problems and their solutions. This section covers the most frequent challenges developers face when implementing logging and provides practical solutions for debugging logging configurations.
Common Logging Issues
Missing Log Entries
# Common problem: Logs not appearing due to incorrect log level
import logging
# Wrong way
logger = logging.getLogger(__name__)
logger.debug("This won't appear") # No handler and wrong level
# Correct way
def setup_proper_logging():
logger = logging.getLogger(__name__)
# Set the base logger level
logger.setLevel(logging.DEBUG)
# Create and configure handler
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
# Add formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
return logger
Performance Bottlenecks
import logging
import time
from functools import wraps
class PerformanceLoggingHandler(logging.Handler):
def __init__(self):
super().__init__()
self.log_times = []
def emit(self, record):
self.log_times.append(time.time())
if len(self.log_times) > 1000:
time_diff = self.log_times[-1] - self.log_times[0]
if time_diff < 1: # More than 1000 logs per second
print(f"Warning: High logging rate detected: {len(self.log_times)/time_diff} logs/second")
self.log_times = self.log_times[-100:]
def log_performance(logger):
"""Decorator to measure and log function performance"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
execution_time = end_time - start_time
logger.info(
f"Function {func.__name__} took {execution_time:.4f} seconds",
extra={
'execution_time': execution_time,
'function_name': func.__name__
}
)
return result
return wrapper
return decorator
Common Logging Pitfalls and Solutions
Configuration Issues
# Common Mistake 1: Not setting the log level properly
logger = logging.getLogger(__name__) # Will not show DEBUG messages
logger.debug("This won't appear")
# Solution: Configure both logger and handler levels
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# Common Mistake 2: Incorrect basicConfig usage
logging.basicConfig(level=logging.INFO) # Called after handler was added - won't work
logger.info("Message")
# Solution: Configure logging before creating loggers
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Message")
Memory and Resource Issues
# Common Mistake 3: Creating handlers in loops
def process_item(item): # DON'T DO THIS
logger = logging.getLogger(__name__)
handler = logging.FileHandler('app.log') # Memory leak!
logger.addHandler(handler)
logger.info(f"Processing {item}")
# Solution: Create handlers outside loops
logger = logging.getLogger(__name__)
handler = logging.FileHandler('app.log')
logger.addHandler(handler)
def process_item(item):
logger.info(f"Processing {item}")
# Common Mistake 4: Not closing file handlers
handler = logging.FileHandler('app.log')
logger.addHandler(handler)
# ... later
# handler.close() # Forgotten!
# Solution: Use context manager
from contextlib import contextmanager
@contextmanager
def log_file_handler(filename):
handler = logging.FileHandler(filename)
logger.addHandler(handler)
try:
yield
finally:
handler.close()
logger.removeHandler(handler)
Format String and Performance Issues
# Common Mistake 5: Inefficient string formatting
logger.error("Error occurred: {}".format(error)) # Inefficient
logger.error(f"Error occurred: {error}") # Less efficient for logging
# Solution: Use %-formatting for better performance
logger.error("Error occurred: %s", error)
# Common Mistake 6: Expensive operations in debug statements
logger.debug(f"Complex data: {expensive_operation()}") # Operation runs even if debug is disabled
# Solution: Use lazy evaluation
logger.debug("Complex data: %s", expensive_operation) if logger.isEnabledFor(logging.DEBUG) else None
Handler Configuration Traps
# Common Mistake 7: Multiple handler addition
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
logger.addHandler(handler)
logger.addHandler(handler) # Duplicate handler!
# Solution: Check for existing handlers
if not logger.handlers:
handler = logging.StreamHandler()
logger.addHandler(handler)
# Common Mistake 8: Incorrect propagation handling
child_logger = logging.getLogger('parent.child')
parent_logger = logging.getLogger('parent')
# Messages appear twice due to propagation
# Solution: Control propagation explicitly
child_logger.propagate = False # If you don't want propagation
Thread Safety Considerations
# Common Mistake 9: Unsafe handler modification
def setup_logging(): # DON'T DO THIS
logger = logging.getLogger()
logger.handlers.clear() # Unsafe in multi-threaded environment
# Solution: Use lock for handler modifications
import threading
_handler_lock = threading.Lock()
def setup_logging():
with _handler_lock:
logger = logging.getLogger()
existing_handlers = logger.handlers.copy()
for handler in existing_handlers:
logger.removeHandler(handler)
handler.close()
Configuration File Issues
# Common Mistake 10: Hardcoded paths in config
LOGGING = {
'handlers': {
'file': {
'filename': '/absolute/path/app.log', # Breaks portability
}
}
}
# Solution: Use environment variables or relative paths
import os
LOGGING = {
'handlers': {
'file': {
'filename': os.getenv('LOG_FILE', 'app.log'),
}
}
}
Testing Your Logging
Unit Testing with Logs
import unittest
from unittest.mock import patch
import logging
import io
class TestLogging(unittest.TestCase):
def setUp(self):
# Create logger for testing
self.logger = logging.getLogger('test_logger')
self.logger.setLevel(logging.DEBUG)
# Create string IO for capturing log output
self.log_output = io.StringIO()
# Create handler that writes to string IO
self.handler = logging.StreamHandler(self.log_output)
self.handler.setLevel(logging.DEBUG)
# Add handler to logger
self.logger.addHandler(self.handler)
def test_log_level_filtering(self):
"""Test that log levels are properly filtered"""
self.logger.debug("Debug message")
self.logger.info("Info message")
log_contents = self.log_output.getvalue()
self.assertIn("Debug message", log_contents)
self.assertIn("Info message", log_contents)
def test_exception_logging(self):
"""Test exception logging functionality"""
try:
raise ValueError("Test error")
except ValueError:
self.logger.exception("An error occurred")
log_contents = self.log_output.getvalue()
self.assertIn("An error occurred", log_contents)
self.assertIn("ValueError: Test error", log_contents)
self.assertIn("Traceback", log_contents)
def tearDown(self):
# Clean up
self.handler.close()
self.log_output.close()
Testing with Mocked Loggers
import unittest
from unittest.mock import patch, MagicMock
from your_module import YourClass
class TestWithMockedLogger(unittest.TestCase):
@patch('logging.getLogger')
def test_logger_calls(self, mock_get_logger):
# Create mock logger
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
# Create instance of your class
instance = YourClass()
# Perform some operation that should log
instance.do_something()
# Assert logging calls
mock_logger.info.assert_called_with(
"Operation completed",
extra={'operation': 'do_something'}
)
# Check number of calls
self.assertEqual(mock_logger.error.call_count, 0)
self.assertEqual(mock_logger.info.call_count, 1)
@patch('logging.getLogger')
def test_error_logging(self, mock_get_logger):
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
instance = YourClass()
# Trigger an error condition
instance.problematic_operation()
# Verify error was logged
mock_logger.error.assert_called_once()
args, kwargs = mock_logger.error.call_args
self.assertIn("Operation failed", args[0])
Alternative Logging Solutions
Loguru
Loguru provides a simpler logging interface with powerful features out of the box:
from loguru import logger
import sys
# Basic Loguru configuration
logger.remove() # Remove default handler
logger.add(
sys.stdout,
colorize=True,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
)
logger.add(
"app_{time}.log",
rotation="500 MB",
retention="10 days",
compression="zip"
)
# Advanced Loguru usage
from loguru import logger
import contextlib
# Custom format for structured logging
logger.add(
"structured.json",
format="{time} {level} {message}",
serialize=True # Enables JSON formatting
)
# Context manager for transaction logging
@contextlib.contextmanager
def transaction_context(transaction_id: str):
logger.info(f"Starting transaction {transaction_id}")
try:
yield
logger.info(f"Transaction {transaction_id} completed successfully")
except Exception as e:
logger.exception(f"Transaction {transaction_id} failed")
raise
# Example usage
def process_order(order_id: str):
with transaction_context(f"order_{order_id}"):
logger.debug("Processing order", order_id=order_id)
# Order processing logic
logger.info("Order processed", status="completed", order_id=order_id)
Structlog
Structlog is excellent for structured logging with context:
import structlog
import time
from typing import Any, Dict
# Configure structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
wrapper_class=structlog.BoundLogger,
cache_logger_on_first_use=True,
)
# Create logger instance
logger = structlog.get_logger()
class StructuredLogging:
def __init__(self):
self.logger = structlog.get_logger()
def with_context(self, **context) -> 'StructuredLogging':
"""Create a new logger with additional context"""
self.logger = self.logger.bind(**context)
return self
def log_operation(self, operation: str, **kwargs):
"""Log an operation with timing and context"""
start_time = time.time()
try:
result = operation(**kwargs)
duration = time.time() - start_time
self.logger.info(
"operation_completed",
operation_name=operation.__name__,
duration=duration,
status="success",
**kwargs
)
return result
except Exception as e:
duration = time.time() - start_time
self.logger.error(
"operation_failed",
operation_name=operation.__name__,
duration=duration,
error=str(e),
error_type=type(e).__name__,
**kwargs
)
raise
# Usage example
structured_logger = StructuredLogging()
def process_user_data(user_id: str, data: Dict[str, Any]):
logger = structured_logger.with_context(
user_id=user_id,
service="user_service"
)
logger.logger.info("processing_user_data", data_size=len(data))
# Process data...
logger.logger.info("user_data_processed", status="success")
Python-JSON-Logger
For JSON-formatted logging:
from pythonjsonlogger import jsonlogger
import logging
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
# Add custom fields
log_record['timestamp'] = datetime.datetime.now(datetime.UTC).isoformat()
log_record['level'] = record.levelname
log_record['logger'] = record.name
if hasattr(record, 'props'):
log_record.update(record.props)
def setup_json_logging():
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(name)s %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# Usage example
logger = setup_json_logging()
def log_event(event_type: str, **props):
logger.info(
f"Event: {event_type}",
extra={
'props': {
'event_type': event_type,
'timestamp': datetime.datetime.now(datetime.UTC).isoformat(),
**props
}
}
)
Best Practices and Guidelines
Logging Standards
# Example of implementing logging standards
import logging
from typing import Optional, Dict, Any
class StandardizedLogger:
def __init__(self, service_name: str):
self.logger = logging.getLogger(service_name)
self.service_name = service_name
self._setup_logging()
def _setup_logging(self):
"""Setup standardized logging configuration"""
formatter = logging.Formatter(
'[%(asctime)s] - %(name)s - %(levelname)s - %(message)s'
' {service: %(service)s, trace_id: %(trace_id)s}'
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def _log(self, level: str, message: str, trace_id: Optional[str] = None, **kwargs):
"""Standardized log method"""
extra = {
'service': self.service_name,
'trace_id': trace_id or 'no_trace',
**kwargs
}
getattr(self.logger, level)(
message,
extra=extra
)
def info(self, message: str, **kwargs):
self._log('info', message, **kwargs)
def error(self, message: str, **kwargs):
self._log('error', message, **kwargs)
Performance Optimization
import logging
from functools import lru_cache
from typing import Dict, Any
import queue
import threading
class PerformantLogger:
def __init__(self):
self.log_queue = queue.Queue(maxsize=1000)
self.running = True
self._start_worker()
def _start_worker(self):
"""Start background worker for async logging"""
def worker():
while self.running:
try:
record = self.log_queue.get(timeout=1)
self._process_log(record)
except queue.Empty:
continue
self.worker_thread = threading.Thread(target=worker, daemon=True)
self.worker_thread.start()
@lru_cache(maxsize=100)
def _format_message(self, template: str, **kwargs) -> str:
"""Cache commonly used log message templates"""
return template.format(**kwargs)
def _process_log(self, record: Dict[str, Any]):
"""Process log record"""
# Actual logging logic here
pass
def log(self, level: str, message: str, **kwargs):
"""Asynchronous logging method"""
record = {
'level': level,
'message': self._format_message(message, **kwargs),
'extra': kwargs
}
try:
self.log_queue.put_nowait(record)
except queue.Full:
# Handle queue full scenario
pass
Case Studies
Real-world Implementation: E-commerce Platform
import logging
from datetime import datetime
from typing import Dict, Any
class EcommerceLogger:
def __init__(self):
self.logger = logging.getLogger('ecommerce')
self._setup_logging()
def _setup_logging(self):
# Configure handlers for different components
self._add_transaction_handler()
self._add_inventory_handler()
self._add_user_activity_handler()
def _add_transaction_handler(self):
handler = logging.FileHandler('transactions.log')
handler.setFormatter(
logging.Formatter(
'%(asctime)s - %(transaction_id)s - %(message)s'
)
)
self.logger.addHandler(handler)
def log_order(self, order_id: str, user_id: str, amount: float):
"""Log order processing"""
self.logger.info(
f"Processing order {order_id}",
extra={
'transaction_id': order_id,
'user_id': user_id,
'amount': amount,
'timestamp': datetime.utcnow().isoformat()
}
)
# Example usage
class OrderProcessor:
def __init__(self):
self.logger = EcommerceLogger()
def process_order(self, order_data: Dict[str, Any]):
try:
# Log order initiation
self.logger.log_order(
order_data['order_id'],
order_data['user_id'],
order_data['amount']
)
# Process order...
# Log success
self.logger.logger.info(
f"Order {order_data['order_id']} processed successfully"
)
except Exception as e:
self.logger.logger.error(
f"Order processing failed: {str(e)}",
exc_info=True
)
Microservices Architecture Example
import logging
import json
from typing import Dict, Any
from datetime import datetime
class MicroserviceLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = self._configure_logger()
def _configure_logger(self):
logger = logging.getLogger(self.service_name)
# JSON formatter for structured logging
formatter = logging.Formatter(
json.dumps({
'timestamp': '%(asctime)s',
'service': '%(service_name)s',
'level': '%(levelname)s',
'message': '%(message)s',
'trace_id': '%(trace_id)s',
'additional_data': '%(additional_data)s'
})
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def log(self, level: str, message: str, trace_id: str, **kwargs):
"""Log with distributed tracing context"""
extra = {
'service_name': self.service_name,
'trace_id': trace_id,
'additional_data': json.dumps(kwargs)
}
getattr(self.logger, level)(
message,
extra=extra
)
# Example usage in a microservice
class PaymentService:
def __init__(self):
self.logger = MicroserviceLogger('payment-service')
def process_payment(self, payment_data: Dict[str, Any], trace_id: str):
self.logger.log(
'info',
'Processing payment',
trace_id,
payment_id=payment_data['id'],
amount=payment_data['amount']
)
# Process payment...
self.logger.log(
'info',
'Payment processed successfully',
trace_id,
payment_id=payment_data['id']
)
Conclusion
Key Takeaways
- Foundation First: Start with proper basic configuration
- Set appropriate log levels
- Configure meaningful formats
- Choose suitable handlers
- Structured Approach: Use structured logging for better analysis
- Consistent log formats
- Contextual information
- Machine-parseable output
- Performance Matters: Optimize logging for production
- Implement log rotation
- Use async logging when needed
- Consider sampling strategies
-
Security Awareness: Protect sensitive information
- Filter sensitive data
- Implement proper access controls
- Follow compliance requirements
Implementation Checklist
def logging_implementation_checklist() -> Dict[str, bool]:
return {
# Basic Setup
"basic_configuration": True,
"log_levels_defined": True,
"handlers_configured": True,
# Security
"sensitive_data_filtered": True,
"access_controls_implemented": True,
"compliance_checked": True,
# Performance
"log_rotation_configured": True,
"async_logging_implemented": True,
"sampling_strategy_defined": True,
# Monitoring
"alerts_configured": True,
"metrics_collected": True,
"dashboards_created": True
}
Additional Resources
- Official Documentation:
- Tools and Libraries:
This guide covers the essential aspects of Python logging, from basic setup to advanced implementations. Remember that logging is an integral part of application observability and maintenance. Implement it thoughtfully and maintain it regularly for the best results.
Remember to periodically review and update your logging implementation as your application evolves and new requirements emerge.
Top comments (0)