Introduction
After setting up a local Kafka environment with Docker (covered in my previous post), I started converting Java Kafka producer examples to Python as part of my learning process. The Udemy course uses Java, but I want to practice Kafka concepts using Python.
This journal documents the practical challenges, solutions, and patterns I discovered while converting two producer implementations: a basic producer and a producer with key-based partitioning.
Note: For Kafka producer concepts (sticky partitioning, acknowledgments, etc.), see my separate article "Understanding Kafka Producer From Basics to Sticky Partitioning". This journal focuses on the Python implementation journey.
Challenge #1: Translating Java Error Handling to Python
Java Try-Catch-Finally Pattern
Python Translation
Converting this pattern to Python, I learned several important details:
def main():
producer = None # ← Must initialize outside try!
try:
config = create_producer_config()
producer = Producer(config)
producer.produce(topic='demo_python', value='hello'.encode('utf-8'))
producer.flush()
except BufferError as e:
log.error(f"Producer queue full: {e}", exc_info=True)
raise # ← Bare raise preserves traceback
except Exception as e:
log.error(f"Error: {e}", exc_info=True)
raise
finally:
if producer is not None:
try:
producer.flush(timeout=5)
log.info("Producer closed successfully")
except Exception as e:
log.warning(f"Cleanup error: {e}") # ← Don't crash on cleanup
Key Differences from Java
| Aspect | Java | Python |
|---|---|---|
| Syntax | try-catch-finally | try-except-finally |
| Variable scope | Inside try block | Must initialize before try |
| Specific exceptions | Multiple catch blocks | Multiple except blocks |
| Traceback logging | log.error("msg", e) |
log.error("msg", exc_info=True) |
| Re-throw | throw e; |
raise (bare, no argument) |
Python-Specific Gotchas I Learned
1. Why Initialize producer = None Outside Try?
# ❌ WRONG: Variable undefined if Producer() fails
try:
producer = Producer(config) # May fail here
# ...
finally:
producer.flush() # NameError if Producer() failed!
# ✅ CORRECT: Always defined
producer = None
try:
producer = Producer(config)
# ...
finally:
if producer is not None: # Safe check
producer.flush()
In Java, variables declared in try block are still accessible in finally. In Python, if the line that defines the variable throws an exception, the variable doesn't exist.
2. Bare raise vs raise e
except BufferError as e:
log.error(f"Queue full: {e}", exc_info=True)
raise # ← Nothing after raise!
Three ways to raise:
-
raise- Re-raises exact exception with full traceback ✅ -
raise e- Re-raises but loses original traceback ❌ -
raise CustomError()- Raises different exception ⚠️
Why bare raise matters: It shows where the error originally occurred, not just where you re-raised it.
3. Nested Try-Except in Finally
finally:
if producer is not None:
try: # ← Nested try for cleanup
producer.flush(timeout=5)
except Exception as e:
log.warning(f"Cleanup error: {e}") # Don't re-raise
Cleanup can fail too! Use log.warning() instead of log.error() since we're already shutting down.
Challenge #2: The Mysterious poll() Method
The Confusing Part
This was the most confusing difference between Java and Python Kafka clients:
Java (kafka-clients library):
producer.send(record, callback); // Callback executes automatically
Python (confluent-kafka library):
producer.produce(topic='demo', value='msg', callback=callback)
# Callback WON'T execute yet!
producer.poll(0) # ← Must explicitly call this
# NOW callback executes
What I Learned
The confluent-kafka library (Python) wraps librdkafka (C library). Unlike Java's client, you must explicitly call poll() to:
- Trigger network operations (send queued messages)
- Receive acknowledgments from broker
- Execute callbacks
producer.produce(...) # Step 1: Queue in memory (fast)
producer.poll(0) # Step 2: Send + trigger callback
poll() Variations
producer.poll(0) # Non-blocking: process ready events, return immediately
producer.poll(1.0) # Blocking: wait up to 1 second for events
producer.flush() # Blocking: wait for ALL messages + callbacks
Visual Comparison
Without poll() ❌:
for i in range(10):
producer.produce(...) # Queue all 10 messages
# No poll() - callbacks delayed
producer.flush() # All 10 callbacks fire here at once
With poll(0) ✅:
for i in range(10):
producer.produce(...) # Queue message
producer.poll(0) # Callback fires immediately
This gives real-time feedback about partition assignment, which is crucial for the key-partitioning demo!
Challenge #3: Callback Timing - Sync vs Async
The Unexpected Output
When I first ran producer_demo_keys.py, the output was confusing:
--- Round 1 ---
--- Round 2 ---
Key: id_0 | Partition: 1 ← Where's Round 1?
Key: id_3 | Partition: 1
...
All Round 1 callbacks fired during Round 2! This revealed an important decision point.
The Cause
for round_num in range(2):
for i in range(10):
producer.produce(...)
producer.poll(0) # Non-blocking
time.sleep(0.5) # Not long enough for callbacks to complete
Network latency (even on localhost) meant callbacks from Round 1 hadn't executed before Round 2 started.
Two Valid Approaches
I realized both patterns have valid use cases, so I made it configurable:
Approach 1: Synchronized (Educational)
for round_num in range(2):
for i in range(10):
producer.produce(...)
producer.poll(0)
producer.flush() # ← Block until all callbacks complete
✅ Clean output showing Round 1 complete before Round 2
✅ Perfect for demos and testing
❌ Lower throughput (blocking waits)
Approach 2: Async (Production)
for round_num in range(2):
for i in range(10):
producer.produce(...)
producer.poll(0)
time.sleep(0.5) # ← Non-blocking
✅ Higher throughput
✅ More realistic production behavior
⚠️ Callbacks may interleave
Making It Toggleable
Added configuration comments in the code:
# ============================================================
# TWO APPROACHES: Choose based on your use case
# ============================================================
# APPROACH 1: Synchronized Rounds (Educational/Testing)
producer.flush() # ← Active by default
# APPROACH 2: Async High-Throughput (Production)
# Comment out flush() above, uncomment below:
# time.sleep(0.5)
# ============================================================
Key insight: Either way, the key-to-partition mapping remains consistent - that's what matters!
Java to Python Conversion Patterns
Configuration: Properties → dict
Java:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
Python:
config = {
'bootstrap.servers': 'localhost:9092',
# No serializer config needed for simple strings
# confluent-kafka handles it automatically
}
Producing Messages
Java:
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", "key", "value");
producer.send(record);
Python:
producer.produce(
topic='topic',
key='key'.encode('utf-8'), # Must encode to bytes
value='value'.encode('utf-8') # Must encode to bytes
)
Key difference: Python's confluent-kafka requires bytes, so always .encode('utf-8').
Callbacks
Java:
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
log.info("Partition: " + metadata.partition());
} else {
log.error("Error: " + e.getMessage());
}
}
});
Python:
def delivery_callback(err, msg):
if err is not None:
log.error(f"Delivery failed: {err}")
else:
log.info(f"Partition: {msg.partition()}")
producer.produce(topic='topic', value='msg', callback=delivery_callback)
producer.poll(0) # ← Must explicitly trigger!
Resource Cleanup
Java:
producer.flush();
producer.close();
Python:
producer.flush()
# No explicit close() - flush() is sufficient
# Python handles cleanup via garbage collection
Final Implementations
producer_demo_keys.py (With Partitioning)
import time
from confluent_kafka import Producer, KafkaException
from kafka_logging import get_logger
log = get_logger(__name__)
def delivery_callback(err, msg):
if err is not None:
log.error(f"Message delivery failed: {err}")
else:
key_str = msg.key().decode('utf-8') if msg.key() else None
log.info(f"Key: {key_str} | Partition: {msg.partition()}")
def main():
producer = None
try:
config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(config)
topic = 'demo_python'
for round_num in range(2):
log.info(f"\n--- Round {round_num + 1} ---")
for i in range(10):
key = f"id_{i}"
value = f"hello world {i}"
producer.produce(
topic=topic,
key=key.encode('utf-8'),
value=value.encode('utf-8'),
callback=delivery_callback
)
producer.poll(0) # Trigger callbacks
# Choose approach:
producer.flush() # Synchronized
# time.sleep(0.5) # Async
log.info("All messages sent!")
except BufferError as e:
log.error(f"Queue full: {e}", exc_info=True)
raise
except KafkaException as e:
log.error(f"Kafka error: {e}", exc_info=True)
raise
except Exception as e:
log.error(f"Error: {e}", exc_info=True)
raise
finally:
if producer is not None:
try:
producer.flush(timeout=5)
log.info("Producer closed")
except Exception as e:
log.warning(f"Cleanup error: {e}")
if __name__ == "__main__":
main()
Testing
Running the Producers
# Basic producer
uv run python -m kafka_basics.producer_demo
# Producer with keys
uv run python -m kafka_basics.producer_demo_keys
Verifying with Console Consumer
docker exec -it broker kafka-console-consumer.sh \
--topic demo_python \
--from-beginning \
--bootstrap-server localhost:9092
Press Ctrl+C to exit.
Key Takeaways
Python-Specific Lessons
-
Bytes Encoding: Always
.encode('utf-8')for keys and values - poll() is Required: Unlike Java, callbacks need explicit triggering
- Variable Scope: Initialize resources before try block in Python
-
Bare raise: Use
raisewithout arguments to preserve full traceback
Error Handling Patterns
- Initialize resources to
Nonebefore try block - Use bare
raiseto preserve tracebacks - Nest try-except in finally block for safe cleanup
- Use
log.warning()for cleanup errors (notlog.error()) - Always add timeouts to prevent indefinite blocking
Conversion Strategy
| Java Concept | Python Equivalent | Notes |
|---|---|---|
| Properties | dict | Use dot-notation keys |
| try-catch-finally | try-except-finally | Different syntax, same pattern |
| Anonymous Callback | Function callback | Define as regular function |
| producer.send() | producer.produce() + poll() | Explicit callback trigger needed |
| producer.close() | producer.flush() | flush() is sufficient |
What I Learned About Python Development
- Resource Management: Python's finally block works like Java but with scope differences
- Library Quirks: confluent-kafka requires manual poll() unlike Java client
- Configuration Flexibility: Python's dict makes config more readable than Java Properties
- Async Patterns: Understanding when to use flush() vs poll() for different use cases
Resources
Session Date: January 2, 2026
Python 3.13, uv package manager, confluent-kafka, Kafka 4.0.1
Top comments (0)