Understanding Kafka Lag
What is Kafka Lag?
Kafka lag is the difference between the latest message offset in a topic partition and the current offset of a consumer. It represents the number of messages produced but not yet consumed—essentially, how far behind your consumer is from real-time.
Real-World Example: Cryptocurrency Price Streaming System
Let's examine a practical system that streams cryptocurrency prices:
Architecture:
- Producer: Fetches BTC, ETH, and SOL prices from Binance API every 3 seconds
- Kafka Topics: Three topics (btc, eth, sol) receiving price updates
- Consumer: Reads from all topics and persists to MongoDB Atlas
- Infrastructure: Docker-based Kafka cluster with Kafdrop monitoring
Current Implementation:
# producer.py - Sends 3 messages every 3 seconds (1 msg/sec)
while True:
for r in btc():
producer.send("btc", r)
for r in eth():
producer.send("eth", r)
for r in sol():
producer.send("sol", r)
time.sleep(3)
# consumer.py - Processes messages sequentially
for msg in consumer:
topic = msg.topic
value = msg.value
if topic == 'btc':
col.btc.insert_one(value) # Blocking MongoDB write
elif topic == 'eth':
col.eth.insert_one(value)
elif topic == 'sol':
col.sol.insert_one(value)
Root Causes of Lag in This System
1. MongoDB Network Latency (Primary Bottleneck)
The consumer writes to MongoDB Atlas (cloud database) for every single message:
- Each
insert_one()makes a network round-trip (~50-200ms) - SSL/TLS handshake overhead
- Geographic distance between consumer and MongoDB cluster
- Impact: Consumer spends most of its time waiting for network I/O
Calculation:
- Producer rate: 1 message/second
- Consumer processing: 50-200ms/message (network bound)
- Result: Consumer can theoretically handle 5-20 msg/sec, but synchronous blocking limits it to ~3 msg/sec
2. Synchronous Blocking Processing
for msg in consumer:
col.btc.insert_one(value) # Consumer blocks here for 50-200ms
# Next message can't be processed until this completes
The consumer cannot fetch or process the next message while waiting for MongoDB to acknowledge the write.
3. No Error Handling
# What happens when MongoDB is unreachable?
col.btc.insert_one(value) # Exception crashes consumer
# Lag accumulates until manual restart
Network failures, MongoDB timeouts, or connection issues crash the consumer immediately. During downtime:
- Producer continues writing to Kafka
- Consumer is offline
- Lag grows unbounded until someone notices and restarts
4. Auto-Commit with Blocking I/O
consumer = KafkaConsumer(
enable_auto_commit=True, # Commits offsets every 5 seconds by default
...
)
The problem:
- Kafka commits offsets periodically (every 5 seconds)
- If MongoDB write fails after offset commit but before data persistence
- Data loss occurs on consumer restart (offset already advanced)
5. Single Consumer, Multiple Topics
consumer = KafkaConsumer("btc", 'eth', 'sol', ...) # One consumer for all topics
One consumer processes three topics sequentially:
- No parallelism across topics
- BTC processing blocks ETH and SOL processing
- Can't scale individual topics independently
Methods for Reducing and Eliminating Lag
Method 1: Batch Inserts
Problem: Individual insert_one() calls make 1 network round-trip per message.
Solution: Accumulate messages and use insert_many() for bulk writes.
from collections import defaultdict
import time
consumer = KafkaConsumer(
"btc", "eth", "sol",
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()),
enable_auto_commit=False, # Manual control
max_poll_records=100, # Fetch more messages per poll
fetch_min_bytes=1024,
fetch_max_wait_ms=500
)
mongo = MongoClient(
"mongodb+srv://kimtryx_db_user:3100@cluster0.nds95yl.mongodb.net/",
maxPoolSize=50, # Connection pooling
retryWrites=True
)
col = mongo.fx
BATCH_SIZE = 50
BATCH_TIMEOUT = 5
batches = defaultdict(list)
last_commit = time.time()
for msg in consumer:
batches[msg.topic].append(msg.value)
# Flush when batch is full or timeout reached
batch_ready = any(len(b) >= BATCH_SIZE for b in batches.values())
timeout_reached = (time.time() - last_commit) >= BATCH_TIMEOUT
if batch_ready or timeout_reached:
for topic, batch in batches.items():
if batch:
getattr(col, topic).insert_many(batch)
print(f"Inserted {len(batch)} documents into {topic}")
consumer.commit() # Commit only after successful writes
batches.clear()
last_commit = time.time()
Performance Improvement:
- Before: 1 network call per message = ~3 msg/sec
- After: 1 network call per 50 messages = ~150 msg/sec
- Speedup: 50x throughput increase
Why it works:
- MongoDB processes bulk inserts orders of magnitude faster
- Network overhead amortized across multiple documents
- Manual commits ensure durability (no data loss)
Method 2: Asynchronous Processing
Problem: Consumer blocks on MongoDB writes, can't fetch new messages.
Solution: Decouple Kafka consumption from MongoDB persistence using queues and worker threads.
from queue import Queue
import threading
message_queue = Queue(maxsize=1000)
shutdown_flag = threading.Event()
def batch_writer_worker():
"""Background thread writes to MongoDB"""
batches = defaultdict(list)
BATCH_SIZE = 50
BATCH_TIMEOUT = 3
last_write = time.time()
while not shutdown_flag.is_set():
try:
msg = message_queue.get(timeout=1)
if msg is None: # Shutdown signal
break
batches[msg['topic']].append(msg['value'])
# Write when ready
batch_ready = any(len(b) >= BATCH_SIZE for b in batches.values())
timeout_reached = (time.time() - last_write) >= BATCH_TIMEOUT
if batch_ready or timeout_reached:
for topic, batch in batches.items():
if batch:
getattr(col, topic).insert_many(batch)
print(f"Worker: Inserted {len(batch)} into {topic}")
batches.clear()
last_write = time.time()
except Exception as e:
print(f"Worker error: {e}")
# Start background worker
worker = threading.Thread(target=batch_writer_worker, daemon=True)
worker.start()
# Main consumer loop
COMMIT_INTERVAL = 100
batch_count = 0
for msg in consumer:
# Non-blocking: add to queue and continue
message_queue.put({'topic': msg.topic, 'value': msg.value})
batch_count += 1
if batch_count >= COMMIT_INTERVAL:
consumer.commit()
batch_count = 0
print(f"Queue size: {message_queue.qsize()}")
Performance Improvement:
- Before: Consumer blocked 50-200ms per message
- After: Consumer runs at full network speed, worker handles writes in parallel
- Benefit: Queue buffers traffic bursts, eliminates blocking
Trade-offs:
- Increased memory usage (in-memory queue)
- More complex error handling
- Slight increase in end-to-end latency (acceptable for most use cases)
Method 3: Error Handling and Resilience
Problem: Any error crashes the consumer, causing lag accumulation.
Solution: Implement retry logic, graceful degradation, and automatic recovery.
from pymongo.errors import BulkWriteError, ConnectionFailure
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_mongo_client():
"""Create MongoDB client with retry logic"""
while True:
try:
client = MongoClient(
"mongodb+srv://...",
maxPoolSize=50,
retryWrites=True,
serverSelectionTimeoutMS=5000
)
client.admin.command('ping') # Test connection
logger.info("✓ Connected to MongoDB")
return client
except Exception as e:
logger.error(f"MongoDB connection failed: {e}, retrying in 5s...")
time.sleep(5)
def insert_batch_with_retry(collection, batch, topic_name):
"""Insert with exponential backoff"""
MAX_RETRIES = 3
for attempt in range(MAX_RETRIES):
try:
collection.insert_many(batch, ordered=False)
logger.info(f"✓ Inserted {len(batch)} docs into {topic_name}")
return True
except BulkWriteError as e:
# Partial success - some docs inserted
logger.warning(f"Partial insert: {e.details}")
return True
except ConnectionFailure as e:
wait = 2 ** attempt
logger.error(f"Connection failed (attempt {attempt+1}), retry in {wait}s")
time.sleep(wait)
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
logger.error(f"Failed after {MAX_RETRIES} attempts")
return False
mongo = create_mongo_client()
col = mongo.fx
try:
for msg in consumer:
batches[msg.topic].append(msg.value)
if batch_ready or timeout_reached:
all_success = True
for topic, batch in batches.items():
if batch:
success = insert_batch_with_retry(
getattr(col, topic), batch, topic
)
if not success:
all_success = False
if all_success:
consumer.commit()
else:
logger.warning("Skipping commit due to failures")
batches.clear()
except KeyboardInterrupt:
logger.info("Graceful shutdown...")
# Flush remaining batches
for topic, batch in batches.items():
if batch:
insert_batch_with_retry(getattr(col, topic), batch, topic)
consumer.commit()
finally:
consumer.close()
mongo.close()
Benefits:
- Automatic recovery from transient network failures
- No data loss from crashes
- Exponential backoff prevents overwhelming failed services
- Graceful shutdown ensures no message loss
Method 4: Horizontal Scaling with Multiple Consumers
Problem: Single consumer can't parallelize across topics.
Solution: Deploy separate consumer instances for each topic.
# consumer_btc.py
consumer = KafkaConsumer(
"btc",
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()),
enable_auto_commit=False,
group_id="crypto-consumer-group"
)
mongo = MongoClient("mongodb+srv://...")
col = mongo.fx.btc
BATCH_SIZE = 50
batch = []
for msg in consumer:
batch.append(msg.value)
if len(batch) >= BATCH_SIZE:
col.insert_many(batch)
consumer.commit()
print(f"BTC: Inserted {len(batch)} documents")
batch.clear()
Deployment:
# Run three separate processes
python consumer_btc.py &
python consumer_eth.py &
python consumer_sol.py &
Benefits:
- True parallel processing across topics
- Each consumer optimized for its specific topic
- Fault isolation (BTC consumer crash doesn't affect ETH/SOL)
- Scale individual topics based on their load
- Simplified code per consumer
Scaling further:
- Increase partitions per topic (e.g., btc-0, btc-1, btc-2)
- Run multiple consumers per topic (Kafka auto-balances partitions)
- Deploy in Kubernetes with HPA based on lag metrics
Method 5: Consumer Configuration Optimization
Problem: Default Kafka consumer settings aren't optimized for high throughput.
Solution: Tune configuration parameters for your workload.
consumer = KafkaConsumer(
"btc", "eth", "sol",
bootstrap_servers=['localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode()),
enable_auto_commit=False,
auto_offset_reset="earliest",
# Fetch optimization
max_poll_records=500, # Fetch more messages per poll
fetch_min_bytes=10240, # Wait for 10KB before returning
fetch_max_wait_ms=500, # Or wait max 500ms
# Prevent rebalancing
session_timeout_ms=30000, # 30s before considered dead
heartbeat_interval_ms=3000, # Send heartbeat every 3s
max_poll_interval_ms=300000, # 5 min max processing time
# Connection management
connections_max_idle_ms=540000,
request_timeout_ms=30000
)
Parameter explanations:
| Parameter | Purpose | Impact |
|---|---|---|
max_poll_records |
Messages fetched per poll() | Higher = fewer network calls |
fetch_min_bytes |
Minimum data before return | Reduces small fetches |
fetch_max_wait_ms |
Max wait for fetch_min_bytes | Balances latency vs throughput |
session_timeout_ms |
Timeout before rebalance | Prevents unnecessary rebalances |
max_poll_interval_ms |
Max processing time | Allows longer batch processing |
Tuning strategy:
-
High throughput: Increase
max_poll_records,fetch_min_bytes -
Low latency: Decrease
fetch_max_wait_ms -
Long processing: Increase
max_poll_interval_ms
Method 6: Monitoring and Observability
Problem: Can't detect lag until it causes visible issues.
Solution: Implement proactive monitoring and alerting.
from kafka import TopicPartition
def check_consumer_lag(consumer, topics):
"""Calculate and report lag for all partitions"""
lag_data = {}
for topic in topics:
partitions = consumer.partitions_for_topic(topic)
for partition in partitions:
tp = TopicPartition(topic, partition)
# Current consumer position
position = consumer.position(tp)
# Latest available offset
end_offsets = consumer.end_offsets([tp])
end_offset = end_offsets[tp]
# Calculate lag
current_lag = end_offset - position
lag_data[f"{topic}-{partition}"] = {
'position': position,
'end_offset': end_offset,
'lag': current_lag
}
# Alert on high lag
if current_lag > 1000:
logger.warning(f"⚠️ HIGH LAG: {topic}[{partition}] = {current_lag}")
else:
logger.info(f"✓ {topic}[{partition}] lag: {current_lag}")
return lag_data
# Monitor periodically
import threading
def monitoring_loop():
while True:
lag_data = check_consumer_lag(consumer, ["btc", "eth", "sol"])
time.sleep(30) # Check every 30 seconds
monitor_thread = threading.Thread(target=monitoring_loop, daemon=True)
monitor_thread.start()
Metrics to track:
- Consumer lag: Messages behind per partition
- Consumption rate: Messages processed per second
- Processing time: Time spent per message
- Error rate: Failed processing attempts
- Queue depth: For async processing implementations
Visualization with Kafka's JMX metrics:
# prometheus.yml - scrape Kafka metrics
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka:9092']
# Alert rules
groups:
- name: kafka_lag
rules:
- alert: HighConsumerLag
expr: kafka_consumer_lag > 1000
for: 5m
annotations:
summary: "Consumer lag exceeds 1000 messages"
Performance Comparison
| Implementation | Throughput | Latency | Complexity | Reliability |
|---|---|---|---|---|
| Original (insert_one) | 3 msg/sec | 50-200ms | Low | Low (crashes) |
| Batch inserts | 150 msg/sec | 5s | Low | Medium |
| Async + batching | 500+ msg/sec | 3-5s | Medium | High |
| Multiple consumers | 1500+ msg/sec | 3-5s | Medium | Very High |
| All optimizations | 5000+ msg/sec | 3-5s | High | Very High |
Implementation Strategy
Step 1: Batch Inserts (Immediate Impact)
Start with batch inserts—low complexity, massive improvement:
batches[topic].append(value)
if len(batches[topic]) >= BATCH_SIZE:
collection.insert_many(batches[topic])
consumer.commit()
Expected: 50x throughput increase
Step 2: Error Handling (Stability)
Add retry logic and graceful shutdown:
def insert_with_retry(collection, batch):
for attempt in range(3):
try:
collection.insert_many(batch)
return True
except ConnectionFailure:
time.sleep(2 ** attempt)
return False
Expected: Eliminate crash-related lag
Step 3: Configuration Tuning (Easy Wins)
Optimize consumer settings:
max_poll_records=500,
fetch_min_bytes=10240,
session_timeout_ms=30000
Expected: 2-3x additional throughput
Step 4: Async Processing (Advanced)
When Steps 1-3 aren't enough:
message_queue.put(msg) # Non-blocking
# Worker thread handles MongoDB
Expected: Handle traffic bursts, eliminate blocking
Step 5: Horizontal Scaling (Production-Grade)
Deploy multiple consumer instances:
docker-compose scale consumer=3
Expected: Linear scaling with consumer count
Step 6: Monitoring (Operational Excellence)
Implement lag tracking and alerts:
if lag > THRESHOLD:
alert_ops_team()
auto_scale_consumers()
Expected: Proactive lag management
Key Takeaways
- Network I/O is typically the bottleneck in Kafka consumers—batch operations amortize this cost
- Manual offset commits after successful processing ensure exactly-once semantics and prevent data loss
- Error handling and retries are essential for production systems—transient failures are inevitable
- Asynchronous processing decouples consumption from persistence, enabling higher throughput
- Horizontal scaling provides linear performance improvements and fault tolerance
- Monitoring enables proactive management—detect and resolve lag before it impacts SLAs
- Configuration tuning can double or triple throughput with minimal code changes
The cryptocurrency price streaming system demonstrates how common patterns—synchronous processing, individual writes, poor error handling—create lag. By applying these methods systematically, you can transform a 3 msg/sec system struggling with lag into a 1000+ msg/sec production-grade pipeline that handles real-time data reliably.

Top comments (0)