DEV Community

James Kimoune
James Kimoune

Posted on

Troubleshooting Kafka Connectivity with spark streaming

Iam currently developing a mini project that integrates Jupyter and Kafka with Spark Streaming for data processing, and uses Cassandra for storage. For visualization and alert management, I am using Grafana. The project also includes a specific feature to display the temperature from various regions, which is managed randomly. However, I am encountering an issue with the connection between the producer and consumer initially. I would appreciate some help, please. I have deployed this on Docker

producer

# Installation de la bibliothèque kafka-python si nécessaire
!pip install kafka-python

import json
import random
import time
from kafka import KafkaProducer
import threading
from IPython.display import display
import ipywidgets as widgets

# Configuration du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],  # Ajustez l'adresse si nécessaire
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Fonction pour générer des données simulées
def generate_sensor_data():
    return {
        "sensor_id": random.randint(1, 100),
        "timestamp": int(time.time() * 1000),
        "temperature": round(random.uniform(20.0, 35.0), 2),
        "humidity": round(random.uniform(40.0, 60.0), 2),
        "pressure": round(random.uniform(970.0, 1030.0), 2)
    }

# Fonction pour envoyer les données
def send_sensor_data(stop_event):
    while not stop_event.is_set():
        data = generate_sensor_data()
        producer.send('temperature', value=data)
        print(f"Data sent: {data}")
        time.sleep(1)

stop_event = threading.Event()

# Widgets pour contrôler l'envoi des données
start_button = widgets.Button(description="Start Sending Data")
stop_button = widgets.Button(description="Stop Sending Data")
output = widgets.Output()

display(start_button, stop_button, output)

def start_sending_data(b):
    with output:
        global thread
        if not stop_event.is_set():
            stop_event.clear()
            thread = threading.Thread(target=send_sensor_data, args=(stop_event,))
            thread.start()
            print("Started sending data...")

def stop_sending_data(b):
    with output:
        if not stop_event.is_set():
            stop_event.set()
            thread.join()
            print("Stopped sending data.")

start_button.on_click(start_sending_data)
stop_button.on_click(stop_sending_data)
Enter fullscreen mode Exit fullscreen mode

et consumateur

`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, window, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Initialisation de SparkSession avec l'intégration de Cassandra
spark = SparkSession \
    .builder \
    .appName("Weather Data Streaming") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.cassandra.connection.host", "localhost") \
    .getOrCreate()

# Lecture des messages en streaming depuis Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "temperature") \
    .option("startingOffsets", "earliest") \
    .load()

# Schéma des données JSON reçues de Kafka
schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("timestamp", LongType()),  # temps en millisecondes depuis l'époque UNIX
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("pressure", DoubleType())
])

# Transformation des données brutes en DataFrame structuré
weather_data = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select(
    "data.sensor_id",
    to_timestamp(from_unixtime(col("data.timestamp") / 1000)).alias("timestamp"),
    "data.temperature",
    "data.humidity",
    "data.pressure"
)

# Calcul des statistiques sur les températures
weather_stats = weather_data \
    .groupBy(
        window(col("timestamp"), "1 hour"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp")
    )

# Fonction pour écrire les résultats dans Cassandra
def write_to_cassandra(batch_df, batch_id):
    batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table="weather", keyspace="weatherSensors") \
        .save()

# Configuration du Stream pour écrire dans Cassandra
query = weather_stats \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(write_to_cassandra) \
    .start()

query.awaitTermination()`
Enter fullscreen mode Exit fullscreen mode

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more