<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: James Kimoune</title>
    <description>The latest articles on DEV Community by James Kimoune (@james_kimoune_a3474b030cb).</description>
    <link>https://dev.to/james_kimoune_a3474b030cb</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1462790%2F07696368-c999-40ec-ab84-218f61f209f5.jpeg</url>
      <title>DEV Community: James Kimoune</title>
      <link>https://dev.to/james_kimoune_a3474b030cb</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/james_kimoune_a3474b030cb"/>
    <language>en</language>
    <item>
      <title>Troubleshooting Kafka Connectivity with spark streaming</title>
      <dc:creator>James Kimoune</dc:creator>
      <pubDate>Thu, 02 May 2024 04:29:09 +0000</pubDate>
      <link>https://dev.to/james_kimoune_a3474b030cb/troubleshooting-kafka-connectivity-with-spark-streaming-ga2</link>
      <guid>https://dev.to/james_kimoune_a3474b030cb/troubleshooting-kafka-connectivity-with-spark-streaming-ga2</guid>
      <description>&lt;p&gt;Iam currently developing a mini project that integrates Jupyter and Kafka with Spark Streaming for data processing, and uses Cassandra for storage. For visualization and alert management, I am using Grafana. The project also includes a specific feature to display the temperature from various regions, which is managed randomly. However, I am encountering an issue with the connection between the producer and consumer initially. I would appreciate some help, please. I have deployed this on Docker&lt;/p&gt;

&lt;p&gt;producer&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Installation de la bibliothèque kafka-python si nécessaire
!pip install kafka-python

import json
import random
import time
from kafka import KafkaProducer
import threading
from IPython.display import display
import ipywidgets as widgets

# Configuration du producteur Kafka
producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],  # Ajustez l'adresse si nécessaire
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Fonction pour générer des données simulées
def generate_sensor_data():
    return {
        "sensor_id": random.randint(1, 100),
        "timestamp": int(time.time() * 1000),
        "temperature": round(random.uniform(20.0, 35.0), 2),
        "humidity": round(random.uniform(40.0, 60.0), 2),
        "pressure": round(random.uniform(970.0, 1030.0), 2)
    }

# Fonction pour envoyer les données
def send_sensor_data(stop_event):
    while not stop_event.is_set():
        data = generate_sensor_data()
        producer.send('temperature', value=data)
        print(f"Data sent: {data}")
        time.sleep(1)

stop_event = threading.Event()

# Widgets pour contrôler l'envoi des données
start_button = widgets.Button(description="Start Sending Data")
stop_button = widgets.Button(description="Stop Sending Data")
output = widgets.Output()

display(start_button, stop_button, output)

def start_sending_data(b):
    with output:
        global thread
        if not stop_event.is_set():
            stop_event.clear()
            thread = threading.Thread(target=send_sensor_data, args=(stop_event,))
            thread.start()
            print("Started sending data...")

def stop_sending_data(b):
    with output:
        if not stop_event.is_set():
            stop_event.set()
            thread.join()
            print("Stopped sending data.")

start_button.on_click(start_sending_data)
stop_button.on_click(stop_sending_data)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;et consumateur&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, from_unixtime, window, avg, min, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

# Initialisation de SparkSession avec l'intégration de Cassandra
spark = SparkSession \
    .builder \
    .appName("Weather Data Streaming") \
    .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \
    .config("spark.cassandra.connection.host", "localhost") \
    .getOrCreate()

# Lecture des messages en streaming depuis Kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "temperature") \
    .option("startingOffsets", "earliest") \
    .load()

# Schéma des données JSON reçues de Kafka
schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("timestamp", LongType()),  # temps en millisecondes depuis l'époque UNIX
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType()),
    StructField("pressure", DoubleType())
])

# Transformation des données brutes en DataFrame structuré
weather_data = df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select(
    "data.sensor_id",
    to_timestamp(from_unixtime(col("data.timestamp") / 1000)).alias("timestamp"),
    "data.temperature",
    "data.humidity",
    "data.pressure"
)

# Calcul des statistiques sur les températures
weather_stats = weather_data \
    .groupBy(
        window(col("timestamp"), "1 hour"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temp"),
        min("temperature").alias("min_temp"),
        max("temperature").alias("max_temp")
    )

# Fonction pour écrire les résultats dans Cassandra
def write_to_cassandra(batch_df, batch_id):
    batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table="weather", keyspace="weatherSensors") \
        .save()

# Configuration du Stream pour écrire dans Cassandra
query = weather_stats \
    .writeStream \
    .outputMode("complete") \
    .foreachBatch(write_to_cassandra) \
    .start()

query.awaitTermination()`
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



</description>
      <category>pyspark</category>
      <category>kafka</category>
      <category>spark</category>
    </item>
  </channel>
</rss>
