DEV Community

Cover image for Study Notes 6.13-14: Kafka Streaming with Python & PySpark Structured Streaming with Kafka
Pizofreude
Pizofreude

Posted on

Study Notes 6.13-14: Kafka Streaming with Python & PySpark Structured Streaming with Kafka

1. Overview of Kafka Streaming with Python

Purpose & Context:

  • This session demonstrates how to implement Kafka streaming applications using Python, mirroring the producer–consumer examples shown in Java.
  • Instead of relying on Confluent Cloud, the examples run Kafka services locally via Docker Compose.
  • The tutorial covers setting up the Kafka ecosystem (broker, schema registry, Zookeeper, Control Center, and Kafka REST API), creating producers and consumers in Python, and handling serialization/deserialization.

Key Takeaways:

  • Understanding Kafka’s configuration differences (listeners vs. advertised.listeners) in a containerized environment.
  • How to produce and consume messages in Python using the Confluent Kafka Python library.
  • Differences between JSON and Avro serialization in Python and the role of Schema Registry.
  • Best practices for maintaining consistency and compatibility when streaming data.

2. Docker Environment for Kafka Streaming

Kafka Ecosystem Setup:

  • Services in Docker Compose:
    • Broker, Zookeeper, Schema Registry, Control Center: Essential for running Kafka locally.
    • Kafka REST Proxy (Kfar): Optional; useful for debugging and interacting with Kafka via REST.
  • Network Configuration:
    • Create a dedicated Docker network (e.g., ccus_park) to facilitate inter-service communication.
    • Broker configuration uses different listener settings:
      • Inter-broker listener (internal): Used for communication among Kafka nodes.
      • External listener (advertised.listeners): Used for accessing the broker from outside the Docker network (e.g., using localhost and a designated port).

Importance for Python Streaming:

  • Running Kafka locally allows for easier testing and prototyping without needing a managed cloud setup.
  • Ensure that your Python code connects to the correct advertised listener (e.g., localhost with the designated port).

3. Python Producer – Key Concepts

Producer Implementation:

  • Reading Data:
    • The producer reads records from a local CSV file located in the resources folder.
    • Each CSV row is parsed and converted into a dictionary or an object (often referred to as a “right” record in the example).
  • Serialization:
    • JSON Example:
      • The producer converts the dictionary to a JSON string and then encodes it into binary format, as Kafka expects byte data.
      • Keys (often numerical) are first converted to strings and encoded before sending.
    • Avro/Schema Registry Example:
      • When using Avro, the producer is configured with the Kafka Avro serializer and Schema Registry settings.
      • Schema Registry ensures that each message adheres to a defined schema, enforcing compatibility rules.
  • Configuration Parameters:
    • Bootstrap Servers: Set to the advertised listener (e.g., localhost with a specified port from Docker).
    • Key and Value Serializers:
      • For JSON, custom serialization converts keys and values appropriately.
      • For Avro, use the Confluent Kafka Avro serializer with proper schema registry URL and authentication.

Example Code Structure:

from confluent_kafka import Producer
import json

# Producer configuration
producer_conf = {
    'bootstrap.servers': 'localhost:1992',
    'key.serializer': str.encode,
    'value.serializer': lambda v: json.dumps(v).encode('utf-8'),
}

# Read CSV, parse into dictionary, then produce message
def produce_records(csv_file, topic):
    producer = Producer({'bootstrap.servers': producer_conf['bootstrap.servers']})
    with open(csv_file, 'r') as f:
        for row in f:
            record = parse_csv_row(row)  # Convert CSV row into dict/object
            key = str(record.get('pickup_location_id'))
            value = record  # Already a dict; will be converted to JSON string
            producer.produce(topic, key=key, value=json.dumps(value))
            producer.flush()

produce_records('resources/right.csv', 'rides-topic')

Enter fullscreen mode Exit fullscreen mode

4. Python Consumer – Key Concepts

Consumer Implementation:

  • Subscription:
    • The consumer subscribes to the same Kafka topic that the producer writes to.
    • It uses the Confluent Kafka Python consumer to poll messages.
  • Deserialization:
    • JSON Example:
      • The consumer decodes the binary message back to a JSON string and then loads it as a dictionary.
      • Keys are decoded from bytes to string and converted back to the expected data type if necessary.
    • Avro Example:
      • For Avro, the consumer is configured with the Avro deserializer and connects to Schema Registry to fetch the appropriate schema.
  • Configuration Parameters:
    • Bootstrap Servers: Same as producer.
    • Auto Offset Reset: Typically set to 'earliest' or 'latest' based on use-case.
    • Key and Value Deserializers:
      • Custom logic to decode and parse the message.

Example Code Structure:

from confluent_kafka import Consumer
import json

# Consumer configuration
consumer_conf = {
    'bootstrap.servers': 'localhost:1992',
    'group.id': 'python-consumer-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(consumer_conf)
consumer.subscribe(['rides-topic'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    key = msg.key().decode('utf-8')
    value = json.loads(msg.value().decode('utf-8'))
    print("Received message: Key={}, Value={}".format(key, value))

consumer.close()

Enter fullscreen mode Exit fullscreen mode

5. Schema Registry and Serialization

Why Schema Registry Matters:

  • Data Consistency: Ensures that both producers and consumers use the same schema contract, preventing errors due to mismatches.
  • Schema Evolution: Allows changes to data formats over time without breaking consumers. For example, a producer may add a new field with a default value, and older consumers will still work.
  • Supported Formats: While JSON is common, Avro is preferred for its compact binary encoding and robust support for schema evolution.

Python Integration:

  • When using Avro with Python, configure the producer and consumer to use Confluent’s Avro serializer/deserializer.
  • Set the Schema Registry URL and credentials in the configuration.
  • Ensure that schema compatibility settings (e.g., transitive full compatibility) are properly defined to control evolution.

6. Best Practices for Kafka Streaming with Python

  • Consistent Configuration: Verify that the Kafka broker’s advertised listeners and the Python client configuration align correctly.
  • Serialization/Deserialization: Ensure that data is converted to and from the correct formats. For JSON, maintain proper encoding and decoding. For Avro, properly integrate with Schema Registry.
  • Error Handling: Implement robust error handling in both producers and consumers to catch serialization errors or schema incompatibility issues.
  • Testing: Use unit tests to simulate producer and consumer behavior. Validate that messages are produced, serialized, sent, received, and deserialized as expected.
  • Monitoring and Logging: Log delivery reports and consumer offsets to track the status of message production and consumption.
  • Schema Evolution: Carefully manage changes to your data schema to ensure backward and forward compatibility. Use Schema Registry’s compatibility checks to prevent accidental breaks in production.

1. Overview of PySpark Structured Streaming with Kafka

  • Objective:

    Learn how to integrate Apache Kafka with PySpark Structured Streaming for real-time data processing. The session covers setting up a local Kafka environment with Docker, reading from and writing to Kafka topics using Spark, and processing streaming data in micro-batches.

  • Key Takeaways:

    • How to configure and run both Kafka and Spark services in a containerized environment.
    • Reading Kafka streams into Spark as a structured DataFrame.
    • Transforming, parsing, and processing data (e.g., decoding binary to strings, splitting CSV columns).
    • Output modes and trigger configurations for streaming queries.
    • Best practices for checkpointing, managing offsets, and handling late data.

2. Environment Setup

a. Docker-Based Kafka and Spark Setup

  • Docker Compose Services:
    • Kafka Broker, Zookeeper, Schema Registry, Control Center: Essential components for running Kafka locally.
    • Kafka REST Proxy (optional): Allows debugging and interaction with Kafka via REST.
    • Spark Cluster Components: Spark Master and multiple Spark Workers.
  • Networking and Volumes:
    • Create a dedicated Docker network (e.g., a bridge network such as ccus_park) to enable communication between Kafka and Spark.
    • Configure volumes to persist data (e.g., HDFS replication for Spark).
  • Broker Configuration:
    • Use separate listener configurations: one for internal cluster communication and one for external access (advertised listeners).
    • Example:
      • Internal listener: used by Kafka brokers to talk to each other.
      • External listener: typically set to localhost:1992 so that external clients (like Spark) can connect.

3. PySpark Structured Streaming with Kafka

a. Reading from Kafka in PySpark

  • Spark Session Setup:

    • Create a Spark session with necessary configurations and include required JARs (e.g., for Kafka integration and serialization libraries).
    • Example:

      from pyspark.sql import SparkSession
      
      spark = SparkSession.builder \
          .appName("KafkaStructuredStreaming") \
          .getOrCreate()
      
      
  • ReadStream Configuration:

    • Use spark.readStream.format("kafka") with options such as:
      • kafka.bootstrap.servers: the Kafka broker (e.g., "localhost:1992").
      • subscribe: the topic to consume (e.g., "rides-topic").
      • startingOffsets: typically "earliest" for full history or "latest" for new data.
    • The data is read as a DataFrame with binary key and value columns.
    • Example:

      df = spark.readStream.format("kafka") \
          .option("kafka.bootstrap.servers", "localhost:1992") \
          .option("subscribe", "rides-topic") \
          .option("startingOffsets", "earliest") \
          .load()
      
      

b. Data Transformation

  • Casting and Parsing:

    • Convert binary columns (key and value) to strings:

      from pyspark.sql.functions import col
      
      df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      
      
    • Split CSV-formatted strings into multiple columns using functions like split and withColumn.

    • Example:

      from pyspark.sql.functions import split
      
      df = df.withColumn("columns", split(col("value"), ","))
      # Then select and cast individual columns as needed
      
      

c. Writing Streaming Data

  • Output Modes and Triggers:

    • Output Modes:
      • Append: Only new rows are written.
      • Complete: Writes the full result table on every trigger.
      • Update: Writes only the rows that have changed.
    • Triggers:

      • Define how often the streaming query is executed (e.g., every 5 seconds using micro-batch mode).
      • Example:

        query = df.writeStream \
            .outputMode("append") \
            .format("console") \
            .trigger(processingTime="5 seconds") \
            .option("checkpointLocation", "checkpoint_dir") \
            .start()
        
        

d. Checkpointing and Fault Tolerance

  • Checkpoint Location:
    • Specify a checkpoint directory to track offsets and state across streaming batches.
  • Stateful Operations:
    • When performing aggregations or window functions, Spark uses state that is recovered from checkpoints in case of failures.

4. Streaming Query Patterns

a. Windowed Aggregations and Grouping

  • Grouping and Windowing:

    • Group data by key and apply window functions to aggregate data over fixed time intervals.
    • Example:

      from pyspark.sql.functions import window
      
      agg_df = df.groupBy(
          window(col("timestamp"), "10 seconds"),
          col("key")
      ).count()
      
      

b. Real-Time Dashboards

  • Output Options:
    • Write streaming data to the console for debugging.
    • Write to Kafka topics, file systems, or memory tables for real-time dashboards.
  • Integration with Other Systems:
    • Spark Structured Streaming can feed data into BI tools or data warehouses for live monitoring.

5. Best Practices for PySpark Structured Streaming

  • Environment Consistency:
    • Ensure Kafka broker configurations (listeners, advertised addresses) match between Docker and your Spark client.
  • Resource Management:
    • Monitor micro-batch processing delays and adjust the trigger interval as needed.
  • Fault Tolerance:
    • Always set a checkpoint location to maintain exactly-once semantics and state recovery.
  • Schema Management:
    • For production applications, consider schema evolution and enforce schemas when converting binary data.
  • Testing and Debugging:
    • Use Spark's local mode for development and testing. Print streaming output to console or use memory sinks for validation.

Heroku

Amplify your impact where it matters most — building exceptional apps.

Leave the infrastructure headaches to us, while you focus on pushing boundaries, realizing your vision, and making a lasting impression on your users.

Get Started

Top comments (0)

AWS Q Developer image

Your AI Code Assistant

Automate your code reviews. Catch bugs before your coworkers. Fix security issues in your code. Built to handle large projects, Amazon Q Developer works alongside you from idea to production code.

Get started free in your IDE

AWS GenAI LIVE!

GenAI LIVE! is a dynamic live-streamed show exploring how AWS and our partners are helping organizations unlock real value with generative AI.

Tune in to the full event

DEV is partnering to bring live events to the community. Join us or dismiss this billboard if you're not interested. ❤️