DEV Community

James Kimoune
James Kimoune

Posted on

Troubleshooting Kafka Connectivity with spark streaming

Iam currently developing a mini project that integrates Jupyter and Kafka with Spark Streaming for data processing, and uses Cassandra for storage. For visualization and alert management, I am using Grafana. The project also includes a specific feature to display the temperature from various regions, which is managed randomly. However, I am encountering an issue with the connection between the producer and consumer initially. I would appreciate some help, please. I have deployed this on Docker

producer

# Installation de la bibliothèque kafka-python si nécessaire
!pip install kafka-python

import json
import random
import time
from kafka import KafkaProducer
import threading
from IPython.display import display
import ipywidgets as widgets

# Configuration du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],  # Ajustez l'adresse si nécessaire
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Fonction pour générer des données simulées
def generate_sensor_data():
    return {
        "sensor_id": random.randint(1, 100),
        "timestamp": int(time.time() * 1000),
        "temperature": round(random.uniform(20.0, 35.0), 2),
        "humidity": round(random.uniform(40.0, 60.0), 2),
        "pressure": round(random.uniform(970.0, 1030.0), 2)
    }

# Fonction pour envoyer les données
def send_sensor_data(stop_event):
    while not stop_event.is_set():
        data = generate_sensor_data()
        producer.send('temperature', value=data)
        print(f"Data sent: {data}")
        time.sleep(1)

stop_event = threading.Event()

# Widgets pour contrôler l'envoi des données
start_button = widgets.Button(description="Start Sending Data")
stop_button = widgets.Button(description="Stop Sending Data")
output = widgets.Output()

display(start_button, stop_button, output)

def start_sending_data(b):
    with output:
        global thread
        if not stop_event.is_set():
            stop_event.clear()
            thread = threading.Thread(target=send_sensor_data, args=(stop_event,))
            thread.start()
            print("Started sending data...")

def stop_sending_data(b):
    with output:
        if not stop_event.is_set():
            stop_event.set()
            thread.join()
            print("Stopped sending data.")

start_button.on_click(start_sending_data)
stop_button.on_click(stop_sending_data)
Enter fullscreen mode Exit fullscreen mode

et consumateur

`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, window, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Initialisation de SparkSession avec l'intégration de Cassandra
spark = SparkSession \
    .builder \
    .appName("Weather Data Streaming") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.cassandra.connection.host", "localhost") \
    .getOrCreate()

# Lecture des messages en streaming depuis Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "temperature") \
    .option("startingOffsets", "earliest") \
    .load()

# Schéma des données JSON reçues de Kafka
schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("timestamp", LongType()),  # temps en millisecondes depuis l'époque UNIX
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("pressure", DoubleType())
])

# Transformation des données brutes en DataFrame structuré
weather_data = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select(
    "data.sensor_id",
    to_timestamp(from_unixtime(col("data.timestamp") / 1000)).alias("timestamp"),
    "data.temperature",
    "data.humidity",
    "data.pressure"
)

# Calcul des statistiques sur les températures
weather_stats = weather_data \
    .groupBy(
        window(col("timestamp"), "1 hour"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp")
    )

# Fonction pour écrire les résultats dans Cassandra
def write_to_cassandra(batch_df, batch_id):
    batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table="weather", keyspace="weatherSensors") \
        .save()

# Configuration du Stream pour écrire dans Cassandra
query = weather_stats \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(write_to_cassandra) \
    .start()

query.awaitTermination()`
Enter fullscreen mode Exit fullscreen mode

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more