DEV Community

Byron Hsieh
Byron Hsieh

Posted on

Learning Apache Kafka with Python - Part 1: Producers

Introduction

After setting up a local Kafka environment with Docker (covered in my previous post), I started converting Java Kafka producer examples to Python as part of my learning process. The Udemy course uses Java, but I want to practice Kafka concepts using Python.

This journal documents the practical challenges, solutions, and patterns I discovered while converting two producer implementations: a basic producer and a producer with key-based partitioning.

Note: For Kafka producer concepts (sticky partitioning, acknowledgments, etc.), see my separate article "Understanding Kafka Producer From Basics to Sticky Partitioning". This journal focuses on the Python implementation journey.

Challenge #1: Translating Java Error Handling to Python

Java Try-Catch-Finally Pattern

Python Translation

Converting this pattern to Python, I learned several important details:

def main():
    producer = None  # ← Must initialize outside try!

    try:
        config = create_producer_config()
        producer = Producer(config)
        producer.produce(topic='demo_python', value='hello'.encode('utf-8'))
        producer.flush()

    except BufferError as e:
        log.error(f"Producer queue full: {e}", exc_info=True)
        raise  # ← Bare raise preserves traceback

    except Exception as e:
        log.error(f"Error: {e}", exc_info=True)
        raise

    finally:
        if producer is not None:
            try:
                producer.flush(timeout=5)
                log.info("Producer closed successfully")
            except Exception as e:
                log.warning(f"Cleanup error: {e}")  # ← Don't crash on cleanup
Enter fullscreen mode Exit fullscreen mode

Key Differences from Java

Aspect Java Python
Syntax try-catch-finally try-except-finally
Variable scope Inside try block Must initialize before try
Specific exceptions Multiple catch blocks Multiple except blocks
Traceback logging log.error("msg", e) log.error("msg", exc_info=True)
Re-throw throw e; raise (bare, no argument)

Python-Specific Gotchas I Learned

1. Why Initialize producer = None Outside Try?

# ❌ WRONG: Variable undefined if Producer() fails
try:
    producer = Producer(config)  # May fail here
    # ...
finally:
    producer.flush()  # NameError if Producer() failed!

# ✅ CORRECT: Always defined
producer = None
try:
    producer = Producer(config)
    # ...
finally:
    if producer is not None:  # Safe check
        producer.flush()
Enter fullscreen mode Exit fullscreen mode

In Java, variables declared in try block are still accessible in finally. In Python, if the line that defines the variable throws an exception, the variable doesn't exist.

2. Bare raise vs raise e

except BufferError as e:
    log.error(f"Queue full: {e}", exc_info=True)
    raise  # ← Nothing after raise!
Enter fullscreen mode Exit fullscreen mode

Three ways to raise:

  • raise - Re-raises exact exception with full traceback
  • raise e - Re-raises but loses original traceback
  • raise CustomError() - Raises different exception ⚠️

Why bare raise matters: It shows where the error originally occurred, not just where you re-raised it.

3. Nested Try-Except in Finally

finally:
    if producer is not None:
        try:  # ← Nested try for cleanup
            producer.flush(timeout=5)
        except Exception as e:
            log.warning(f"Cleanup error: {e}")  # Don't re-raise
Enter fullscreen mode Exit fullscreen mode

Cleanup can fail too! Use log.warning() instead of log.error() since we're already shutting down.

Challenge #2: The Mysterious poll() Method

The Confusing Part

This was the most confusing difference between Java and Python Kafka clients:

Java (kafka-clients library):

producer.send(record, callback);  // Callback executes automatically
Enter fullscreen mode Exit fullscreen mode

Python (confluent-kafka library):

producer.produce(topic='demo', value='msg', callback=callback)
# Callback WON'T execute yet!

producer.poll(0)  # ← Must explicitly call this
# NOW callback executes
Enter fullscreen mode Exit fullscreen mode

What I Learned

The confluent-kafka library (Python) wraps librdkafka (C library). Unlike Java's client, you must explicitly call poll() to:

  1. Trigger network operations (send queued messages)
  2. Receive acknowledgments from broker
  3. Execute callbacks
producer.produce(...)  # Step 1: Queue in memory (fast)
producer.poll(0)       # Step 2: Send + trigger callback
Enter fullscreen mode Exit fullscreen mode

poll() Variations

producer.poll(0)      # Non-blocking: process ready events, return immediately
producer.poll(1.0)    # Blocking: wait up to 1 second for events
producer.flush()      # Blocking: wait for ALL messages + callbacks
Enter fullscreen mode Exit fullscreen mode

Visual Comparison

Without poll() ❌:

for i in range(10):
    producer.produce(...)  # Queue all 10 messages
    # No poll() - callbacks delayed

producer.flush()  # All 10 callbacks fire here at once
Enter fullscreen mode Exit fullscreen mode

With poll(0) ✅:

for i in range(10):
    producer.produce(...)  # Queue message
    producer.poll(0)       # Callback fires immediately
Enter fullscreen mode Exit fullscreen mode

This gives real-time feedback about partition assignment, which is crucial for the key-partitioning demo!

Challenge #3: Callback Timing - Sync vs Async

The Unexpected Output

When I first ran producer_demo_keys.py, the output was confusing:

--- Round 1 ---

--- Round 2 ---
Key: id_0 | Partition: 1  ← Where's Round 1?
Key: id_3 | Partition: 1
...
Enter fullscreen mode Exit fullscreen mode

All Round 1 callbacks fired during Round 2! This revealed an important decision point.

The Cause

for round_num in range(2):
    for i in range(10):
        producer.produce(...)
        producer.poll(0)  # Non-blocking

    time.sleep(0.5)  # Not long enough for callbacks to complete
Enter fullscreen mode Exit fullscreen mode

Network latency (even on localhost) meant callbacks from Round 1 hadn't executed before Round 2 started.

Two Valid Approaches

I realized both patterns have valid use cases, so I made it configurable:

Approach 1: Synchronized (Educational)

for round_num in range(2):
    for i in range(10):
        producer.produce(...)
        producer.poll(0)

    producer.flush()  # ← Block until all callbacks complete
Enter fullscreen mode Exit fullscreen mode

✅ Clean output showing Round 1 complete before Round 2

✅ Perfect for demos and testing

❌ Lower throughput (blocking waits)

Approach 2: Async (Production)

for round_num in range(2):
    for i in range(10):
        producer.produce(...)
        producer.poll(0)

    time.sleep(0.5)  # ← Non-blocking
Enter fullscreen mode Exit fullscreen mode

✅ Higher throughput

✅ More realistic production behavior

⚠️ Callbacks may interleave

Making It Toggleable

Added configuration comments in the code:

# ============================================================
# TWO APPROACHES: Choose based on your use case
# ============================================================

# APPROACH 1: Synchronized Rounds (Educational/Testing)
producer.flush()  # ← Active by default

# APPROACH 2: Async High-Throughput (Production)
# Comment out flush() above, uncomment below:
# time.sleep(0.5)
# ============================================================
Enter fullscreen mode Exit fullscreen mode

Key insight: Either way, the key-to-partition mapping remains consistent - that's what matters!

Java to Python Conversion Patterns

Configuration: Properties → dict

Java:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
Enter fullscreen mode Exit fullscreen mode

Python:

config = {
    'bootstrap.servers': 'localhost:9092',
    # No serializer config needed for simple strings
    # confluent-kafka handles it automatically
}
Enter fullscreen mode Exit fullscreen mode

Producing Messages

Java:

ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic", "key", "value");
producer.send(record);
Enter fullscreen mode Exit fullscreen mode

Python:

producer.produce(
    topic='topic',
    key='key'.encode('utf-8'),    # Must encode to bytes
    value='value'.encode('utf-8')  # Must encode to bytes
)
Enter fullscreen mode Exit fullscreen mode

Key difference: Python's confluent-kafka requires bytes, so always .encode('utf-8').

Callbacks

Java:

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e == null) {
            log.info("Partition: " + metadata.partition());
        } else {
            log.error("Error: " + e.getMessage());
        }
    }
});
Enter fullscreen mode Exit fullscreen mode

Python:

def delivery_callback(err, msg):
    if err is not None:
        log.error(f"Delivery failed: {err}")
    else:
        log.info(f"Partition: {msg.partition()}")

producer.produce(topic='topic', value='msg', callback=delivery_callback)
producer.poll(0)  # ← Must explicitly trigger!
Enter fullscreen mode Exit fullscreen mode

Resource Cleanup

Java:

producer.flush();
producer.close();
Enter fullscreen mode Exit fullscreen mode

Python:

producer.flush()
# No explicit close() - flush() is sufficient
# Python handles cleanup via garbage collection
Enter fullscreen mode Exit fullscreen mode

Final Implementations

producer_demo_keys.py (With Partitioning)

import time
from confluent_kafka import Producer, KafkaException
from kafka_logging import get_logger

log = get_logger(__name__)

def delivery_callback(err, msg):
    if err is not None:
        log.error(f"Message delivery failed: {err}")
    else:
        key_str = msg.key().decode('utf-8') if msg.key() else None
        log.info(f"Key: {key_str} | Partition: {msg.partition()}")

def main():
    producer = None

    try:
        config = {'bootstrap.servers': 'localhost:9092'}
        producer = Producer(config)
        topic = 'demo_python'

        for round_num in range(2):
            log.info(f"\n--- Round {round_num + 1} ---")

            for i in range(10):
                key = f"id_{i}"
                value = f"hello world {i}"

                producer.produce(
                    topic=topic,
                    key=key.encode('utf-8'),
                    value=value.encode('utf-8'),
                    callback=delivery_callback
                )
                producer.poll(0)  # Trigger callbacks

            # Choose approach:
            producer.flush()   # Synchronized
            # time.sleep(0.5)  # Async

        log.info("All messages sent!")

    except BufferError as e:
        log.error(f"Queue full: {e}", exc_info=True)
        raise
    except KafkaException as e:
        log.error(f"Kafka error: {e}", exc_info=True)
        raise
    except Exception as e:
        log.error(f"Error: {e}", exc_info=True)
        raise
    finally:
        if producer is not None:
            try:
                producer.flush(timeout=5)
                log.info("Producer closed")
            except Exception as e:
                log.warning(f"Cleanup error: {e}")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Testing

Running the Producers

# Basic producer
uv run python -m kafka_basics.producer_demo

# Producer with keys
uv run python -m kafka_basics.producer_demo_keys
Enter fullscreen mode Exit fullscreen mode

Verifying with Console Consumer

docker exec -it broker kafka-console-consumer.sh \
  --topic demo_python \
  --from-beginning \
  --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode

Press Ctrl+C to exit.

Key Takeaways

Python-Specific Lessons

  1. Bytes Encoding: Always .encode('utf-8') for keys and values
  2. poll() is Required: Unlike Java, callbacks need explicit triggering
  3. Variable Scope: Initialize resources before try block in Python
  4. Bare raise: Use raise without arguments to preserve full traceback

Error Handling Patterns

  1. Initialize resources to None before try block
  2. Use bare raise to preserve tracebacks
  3. Nest try-except in finally block for safe cleanup
  4. Use log.warning() for cleanup errors (not log.error())
  5. Always add timeouts to prevent indefinite blocking

Conversion Strategy

Java Concept Python Equivalent Notes
Properties dict Use dot-notation keys
try-catch-finally try-except-finally Different syntax, same pattern
Anonymous Callback Function callback Define as regular function
producer.send() producer.produce() + poll() Explicit callback trigger needed
producer.close() producer.flush() flush() is sufficient

What I Learned About Python Development

  1. Resource Management: Python's finally block works like Java but with scope differences
  2. Library Quirks: confluent-kafka requires manual poll() unlike Java client
  3. Configuration Flexibility: Python's dict makes config more readable than Java Properties
  4. Async Patterns: Understanding when to use flush() vs poll() for different use cases

Resources


Session Date: January 2, 2026
Python 3.13, uv package manager, confluent-kafka, Kafka 4.0.1

Top comments (0)