<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Batrudin Jamaludin</title>
    <description>The latest articles on DEV Community by Batrudin Jamaludin (@batrudin_haji).</description>
    <link>https://dev.to/batrudin_haji</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F2833194%2F7a28cf3e-681c-4d88-a530-139dcf4bdb07.png</url>
      <title>DEV Community: Batrudin Jamaludin</title>
      <link>https://dev.to/batrudin_haji</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/batrudin_haji"/>
    <language>en</language>
    <item>
      <title>A Step-by-Step Guide to Streaming Live Weather Data Using Apache Kafka and Mongodb</title>
      <dc:creator>Batrudin Jamaludin</dc:creator>
      <pubDate>Mon, 07 Apr 2025 08:00:21 +0000</pubDate>
      <link>https://dev.to/batrudin_haji/a-step-by-step-guide-to-streaming-live-weather-data-using-apache-kafka-and-mongodb-306</link>
      <guid>https://dev.to/batrudin_haji/a-step-by-step-guide-to-streaming-live-weather-data-using-apache-kafka-and-mongodb-306</guid>
      <description>&lt;p&gt;In this guide, we will walk you through the process of building a real-time weather data pipeline using Apache Kafka for streaming and MongoDB for storage. The pipeline collects weather data from the OpenWeatherMap API, streams it via Kafka, and stores it in MongoDB for real-time analysis and querying. By the end of this tutorial, you’ll have a working solution capable of streaming live weather data and storing it for further analysis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;This project demonstrates how to create a real-time data pipeline that extracts weather data from the OpenWeatherMap API, streams it through Apache Kafka, and stores it in MongoDB. The data will be available for querying and analysis in near real-time. Here's how each component works:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Weather Data Extraction: Using the OpenWeatherMap API, we fetch live weather data for a given city.&lt;/li&gt;
&lt;li&gt;Kafka Producer: The producer sends weather data to Kafka, which allows the data to be streamed to multiple consumers.&lt;/li&gt;
&lt;li&gt;Kafka Consumer: The consumer retrieves weather data from Kafka and stores it in MongoDB for persistence.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;Before proceeding with the tutorial, ensure that you have the following installed:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Python 3.x&lt;/li&gt;
&lt;li&gt;Apache Kafka and Zookeeper&lt;/li&gt;
&lt;li&gt;MongoDB (or MongoDB Atlas if you prefer a cloud-based instance)&lt;/li&gt;
&lt;li&gt;Apache Kafka (with Zookeeper)&lt;/li&gt;
&lt;li&gt;Confluent Kafka Python library&lt;/li&gt;
&lt;li&gt;Pandas for data transformation&lt;/li&gt;
&lt;li&gt;OpenWeatherMap API key&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setting Up Apache Kafka
&lt;/h2&gt;

&lt;p&gt;Apache Kafka is the backbone of our real-time streaming solution. It allows us to decouple the producer and consumer, ensuring that weather data can be streamed. Kafka operates by sending data to topics, which can be consumed by the consumer.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Start Zookeeper (Kafka depends on it)&lt;/strong&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/zookeeper-server-start.sh config/zookeeper.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Start Kafka in another terminal:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-server-start.sh config/server.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Create a Kafka Topic&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Kafka uses topics to organize data. For our weather data, we will create a topic called weather_topic.&lt;/p&gt;

&lt;p&gt;To create the topic, run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --create --topic weather_topic --bootstrap-server localhost:9092 --partitions 6 --replication-factor 1
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Setting Up MongoDB
&lt;/h2&gt;

&lt;p&gt;We will use MongoDB to store the weather data in a database for real-time querying and analysis. In this tutorial we use mongodb on the cloud and created a database called weather_db and a collection called weather_data. To secure our application we store our mongo uri in a .env file. &lt;br&gt;
&lt;strong&gt;Mongodb&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9azsp2yu29wmr2akhrpf.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9azsp2yu29wmr2akhrpf.png" alt="Mongodb set up" width="800" height="320"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Building the Weather Data Extraction Script
&lt;/h2&gt;

&lt;p&gt;We will now create the script to extract weather data from OpenWeatherMap. The script will fetch the current weather for a city and format the data into a JSON object suitable for Kafka streaming.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;

&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pymongo.mongo_client&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;MongoClient&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pymongo.server_api&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ServerApi&lt;/span&gt;

&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="c1"&gt;# Load env 
&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;DB_STRING&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Create a new client and connect to the server
&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;MongoClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;server_api&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;ServerApi&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;extract_data&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;


    &lt;span class="n"&gt;city_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Nairobi&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

    &lt;span class="c1"&gt;# get API KEY
&lt;/span&gt;    &lt;span class="n"&gt;KEY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;WEATHER_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;#load weather data in Nairobi,KE from openweathermap
&lt;/span&gt;    &lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://api.openweathermap.org/data/2.5/weather?q=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;city_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;&amp;amp;appid=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;KEY&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;


    &lt;span class="n"&gt;weather_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;df_weather&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;df_temp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;main&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="n"&gt;country&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sys&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;country&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="n"&gt;df_loc&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;country&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;country&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;city&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;city_name&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;#merge data frame
&lt;/span&gt;    &lt;span class="n"&gt;merged_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;merge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;merge&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_weather&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;df_temp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;left_index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;right_index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;how&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;outer&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                        &lt;span class="n"&gt;df_loc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;left_index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;right_index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;how&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;outer&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
                        &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;merged_df&lt;/span&gt;
&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;transform_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;merged_df&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="c1"&gt;# drop columns 
&lt;/span&gt;    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;merged_df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;drop&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;icon&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="c1"&gt;#tranform temp from kelvin to celcius
&lt;/span&gt;    &lt;span class="n"&gt;temp_list&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;temp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;feels_like&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;temp_min&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;temp_max&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;temp_list&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;temp_list&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="mi"&gt;273&lt;/span&gt;

    &lt;span class="n"&gt;data_dict&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_dict&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orient&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;records&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;#Convert the DataFrame to a list of dictionaries
&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;data_dict&lt;/span&gt;


&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;load_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data_dict&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;

    &lt;span class="c1"&gt;#load to mongodb
&lt;/span&gt;
    &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather_db&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;  &lt;span class="c1"&gt;#create db 
&lt;/span&gt;    &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="c1"&gt;#creates collection
&lt;/span&gt;

    &lt;span class="c1"&gt;# data_dict = df.to_dict(orient="records") #Convert the DataFrame to a list of dictionaries
&lt;/span&gt;
    &lt;span class="c1"&gt;# print(data_dict)
&lt;/span&gt;
    &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;insert_many&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data_dict&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# insert the entire DataFrame into the collection
&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Kafka Producer Implementation&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Producer&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;extract_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;transform_data&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;

&lt;span class="n"&gt;conf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;client.id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;python-producer&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather_topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;



&lt;span class="c1"&gt;# Delivery callback
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Delivery failed: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Message delivered to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; [&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;]&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;



&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="c1"&gt;#call functions
&lt;/span&gt;    &lt;span class="n"&gt;extracted_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;extract_data&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;transform_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;extracted_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;# Trigger delivery callback
&lt;/span&gt;


    &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;180&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="c1"&gt;#after 3 minutes
&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdsb2aoa0zcvgd4xecuf5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fdsb2aoa0zcvgd4xecuf5.png" alt="kafka producer" width="574" height="190"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Kafka Consumer Implementation&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="n"&gt;KafkaException&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;app&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_data&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pymongo.mongo_client&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;MongoClient&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pymongo.server_api&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;ServerApi&lt;/span&gt;

&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="c1"&gt;# Load env 
&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;DB_STRING&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Create a new client and connect to the server
&lt;/span&gt;&lt;span class="n"&gt;client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;MongoClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uri&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;server_api&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="nc"&gt;ServerApi&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;


&lt;span class="c1"&gt;# Kafka Consumer configuration
&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;localhost:9092&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Kafka broker(s)
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;group.id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather-consumer-group&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Consumer group ID
&lt;/span&gt;    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;auto.offset.reset&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;  &lt;span class="c1"&gt;# Start consuming from the earliest message
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Create Consumer instance
&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;conf&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Kafka topic to consume messages from
&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;weather_topic&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

&lt;span class="c1"&gt;# Subscribe to the topic
&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

&lt;span class="c1"&gt;# Delivery callback to handle message processing
&lt;/span&gt;&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Message delivery failed: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Message delivered to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; [&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;] at offset &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;


&lt;span class="c1"&gt;# Main consumer loop
&lt;/span&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Poll for a message (timeout )
&lt;/span&gt;        &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;180.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;  &lt;span class="c1"&gt;# 3 minutes timeout
&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;No message received within the timeout period.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;elif&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
            &lt;span class="c1"&gt;# Handle errors from the consumer
&lt;/span&gt;            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;code&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_PARTITION_EOF&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;End of partition reached: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; at offset &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;raise&lt;/span&gt; &lt;span class="nc"&gt;KafkaException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="c1"&gt;# Successfully received a message
&lt;/span&gt;            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Received message: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;value&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="c1"&gt;# Deserialize the message (assuming the message is in JSON format)
&lt;/span&gt;            &lt;span class="n"&gt;message_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;value&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;utf-8&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

            &lt;span class="c1"&gt;# Load data in db
&lt;/span&gt;            &lt;span class="nf"&gt;load_data&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;message_data&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt; &lt;span class="c1"&gt;#list message data
&lt;/span&gt;
            &lt;span class="c1"&gt;# Optional: You can print or handle the processed data here
&lt;/span&gt;            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Processed data: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message_data&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;180&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 


    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;An error occurred: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="nf"&gt;str&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqntzch5flcj4lyqbytwx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqntzch5flcj4lyqbytwx.png" alt="Kafka Consumer" width="586" height="441"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Streaming Data in the DB&lt;/strong&gt;&lt;br&gt;
&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp7fybgkjc06mfb8mxcvd.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fp7fybgkjc06mfb8mxcvd.png" alt="Data streaming mongodb" width="800" height="293"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion and Recommendation
&lt;/h2&gt;

&lt;p&gt;In this guide, we've built a real-time weather data pipeline using Apache Kafka and MongoDB to fetch, stream, and store weather data. While this setup works well for basic use, there are several ways to improve and expand it.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Recommendations:&lt;/strong&gt;&lt;br&gt;
&lt;em&gt;Dashboard Visualization&lt;/em&gt;&lt;br&gt;
Integrate tools like Grafana to visualize real-time weather data with charts and graphs for better insights.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Alert Systems&lt;/em&gt;&lt;br&gt;
Set up automatic notifications via email or SMS when the weather conditions exceed a predefined threshold.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Analytics Integration&lt;/em&gt;&lt;br&gt;
Use machine learning or statistical models to analyse long-term weather trends and predict future conditions.&lt;/p&gt;

&lt;p&gt;For complete code and further details, visit the project on &lt;a href="https://github.com/batru/Real-Time-Streaming-Weather-Data" rel="noopener noreferrer"&gt;Github&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>The Ultimate Guide to Apache Kafka: Basics, Architecture, and Core Concepts</title>
      <dc:creator>Batrudin Jamaludin</dc:creator>
      <pubDate>Mon, 10 Mar 2025 09:40:52 +0000</pubDate>
      <link>https://dev.to/batrudin_haji/the-ultimate-guide-to-apache-kafka-basics-architecture-and-core-concepts-1ji3</link>
      <guid>https://dev.to/batrudin_haji/the-ultimate-guide-to-apache-kafka-basics-architecture-and-core-concepts-1ji3</guid>
      <description>&lt;p&gt;Apache Kafka is a powerful open-source distributed streaming platform used to handle real-time data feeds. It was Originally developed by LinkedIn and later open-sourced in 2011, Kafka is now one of the most popular tools for building real-time data pipelines and streaming applications. Similar to other distributed systems, Kafka boasts a complex architecture, which may pose a challenge for new developers. Setting up Kafka involves navigating a formidable command line interface and configuring numerous settings. In this guide, I will provide insights into architectural concepts and essential commands frequently used by developers to initiate their journey with Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Apache Kafka Basics
&lt;/h2&gt;

&lt;p&gt;Kafka is different from traditional messaging systems because it allows data to be published, consumed, and stored across a distributed network of servers, enabling real-time data processing.&lt;/p&gt;

&lt;p&gt;Some of the key concepts of Kafka include: clusters, Topic, Producer, Consumer, Partitions and Connect.  &lt;/p&gt;

&lt;h2&gt;
  
  
  Kafka’s Architecture
&lt;/h2&gt;

&lt;p&gt;The architecture of Kafka is designed to be highly scalable, fault-tolerant, and efficient. It is built around a few core components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Topic: Topic is Streams of records that Kafka organizes data into. It is the equivalent of tables that hold records in a relational database. A topic can have multiple partitions, allowing data to be distributed across different Kafka brokers, improving scalability and fault tolerance.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Producer: The producer is applications that write data to kafka topic and are responsible for sending message to kafka topic through Kafka’s client libraries. Examples include a microservice, web application or any other system that generates real time data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Consumer: The consumer is applications that read data from kafka's topics. Kafka supports multiple consumers that can consume messages from the same topic independently. Consumers are often part of a consumer group to enable parallel processing of data and fault tolerance.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Broker: Its a Kafka server instances that receive, store and forward  messages. Multiple brokers work together to form a Kafka cluster, which helps scale the system horizontally. Each broker in the cluster is responsible for managing a portion of the topic's data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Partition: Partition is the division and partition of topics to split data into smaller and manageable chunks. This allows Kafka to parallelize the consumption of messages, which can significantly boost performance. Each partition is an ordered, immutable sequence of messages.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Connect: Kafka Connect manages the tasks. The connector is only responsible for generating the set of tasks and indicating to the framework when they need to be updated.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Commonly Used CLI Commands
&lt;/h2&gt;

&lt;p&gt;&lt;strong&gt;Start Zookeeper&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;zookeeper-server-start config\zookeeper.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Start Kafka Server&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;kafka-server-start config\server.properties
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;List existing topics&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --zookeeper localhost:2181 --list
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Describe a topic&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Delete a topic&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Consume messages&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mytopic --from-beginning
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Start Kafka Producer&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Start Kafka Consumer&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Apache Kafka's real time processing capabilities make it a powerful tool for building data pipelines and stream-processing applications. Whether you are handling log data, monitoring system activity, or building complex event-driven applications, Kafka provides a reliable and efficient solution to stream and process large volumes of data. Understanding Kafka's architecture and core concepts, such as topics, producers, consumers, and partitions, is crucial for leveraging its full potential in modern data-driven applications.&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Mastering SQL for Data Engineering: Advanced Queries, Optimization, and Data Modelling Best Practices</title>
      <dc:creator>Batrudin Jamaludin</dc:creator>
      <pubDate>Mon, 10 Feb 2025 08:03:15 +0000</pubDate>
      <link>https://dev.to/batrudin_haji/mastering-sql-for-data-engineering-advanced-queries-optimization-and-data-modelling-best-14mc</link>
      <guid>https://dev.to/batrudin_haji/mastering-sql-for-data-engineering-advanced-queries-optimization-and-data-modelling-best-14mc</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8bjtlzvnqt6ij4teu8sw.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F8bjtlzvnqt6ij4teu8sw.jpeg" alt="SQL For Data Engineers" width="800" height="450"&gt;&lt;/a&gt;Structured Query Language (SQL) is Key in the world of Data Engineering. From managing large amounts of data to working on complex data processing pipelines, understanding SQL is essential for extracting, transforming, and analyzing data. For data engineers, SQL is not only a tool for querying databases but the foundation for building efficient data pipelines, optimizing queries, and ensuring scalability. In this article, we'll explore advanced SQL techniques, query optimization strategies, and best practices in data modelling that every data engineer should know.&lt;/p&gt;

&lt;h2&gt;
  
  
  Core SQL Concepts for Data Engineering
&lt;/h2&gt;

&lt;p&gt;It’s important to have a solid understanding on the core SQL concepts that form the foundation of data engineering.&lt;br&gt;
SELECT, WHERE, JOIN, GROUP BY, and HAVING&lt;br&gt;
These are the building blocks of SQL and will be used in almost every query. Here's a brief overview of how they work:&lt;br&gt;
• SELECT: An SQL statement Used to specify the columns you want to retrieve from a table in a database.&lt;br&gt;
• WHERE: Filters data based on a given condition.&lt;br&gt;
• JOIN: Combines data from multiple tables based on a related column.&lt;br&gt;
• GROUP BY: Aggregates rows that have the same values in specified columns.&lt;br&gt;
• HAVING: Filters aggregated results, similar to the WHERE clause but for grouped data.&lt;/p&gt;
&lt;h2&gt;
  
  
  Real-World Use Case: Data Pipelines and ETL Processes
&lt;/h2&gt;

&lt;p&gt;In a typical ETL (Extract, Transform, Load) pipeline, these concepts are frequently used to pull data from different sources, filter unnecessary data, and transform it into a structured format.&lt;/p&gt;

&lt;p&gt;Lets use an example to demonstrate this concept. Imagine you have 3 tables: employee_data, salary_data and payroll_data. The task is to extract the data from both employee_data and salary_data, clean it, and then load it into a structured table (payroll_data) that can be used for payroll reports.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- Extract employee details from raw_employees
SELECT employee_id, employee_name, department
FROM employee_data;

-- Extract salary details from raw_salaries
SELECT employee_id, salary
FROM salary_data;

-- Transform data: Join employee and salary tables, and calculate salary after tax
SELECT 
    e.employee_id, 
    e.employee_name, 
    e.department, 
    s.salary, 
    (s.salary * 0.9) AS salary_after_tax
FROM 
    employee_data e
JOIN 
    salary_data s ON e.employee_id = s.employee_id;

-- Load transformed data into a new table (payroll_data)
CREATE TABLE payroll_data AS
SELECT 
    e.employee_id, 
    e.employee_name, 
    e.department, 
    s.salary, 
    (s.salary * 0.9) AS salary_after_tax
FROM 
    employee_data e
JOIN 
    salary_data s ON e.employee_id = s.employee_id;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Advanced SQL Techniques
&lt;/h2&gt;

&lt;p&gt;In this section we are going to dive into more complex queries that can improve your efficiency and problem-solving skills in data engineering.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Recursive Queries and Common Table Expressions (CTEs)
Recursive queries are useful when dealing with hierarchical data (e.g., organizational structures). &lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Common Table Expressions (CTEs) is a temporary result set that is defined within the execution of a query. After the execution the result no longer exists. They make the queries more readable and easier to manage by allowing you to define for example a subquery and reference it multiple times within the main query. CTEs are defined using the WITH keyword, followed by the CTE name and the query that generates the result set.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;WITH cte_name AS (
    SELECT column1, column2
    FROM table_name
    WHERE condition
)
SELECT column1, column2
FROM cte_name;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Window Functions for Running Totals, Ranking, and Partitioning
&lt;/h2&gt;

&lt;p&gt;Window functions are powerful tools for performing calculations across a set of rows that are related to the current row, without collapsing the result set unlike aggregate functions.&lt;br&gt;
Examples:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;ROW_NUMBER() for ranking.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;SUM() OVER() for running totals.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;RANK() for ranking data with ties.&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT 
    employee_id,
    salary,
    SUM(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) AS running_total
FROM employees;
This query computes a running total of salaries per department, ordered by salary in descending order.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h2&gt;
  
  
  Complex JOINs and Subqueries for Efficient Data Retrieval
&lt;/h2&gt;

&lt;p&gt;Complex joins like LEFT JOIN, RIGHT JOIN, and INNER JOIN are used to combine data across different tables in flexible ways.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT e.employee_id, e.name, e.salary, d.department_name
FROM employees e
JOIN departments d ON e.department_id = d.department_id
WHERE e.salary &amp;gt; (
    SELECT AVG(salary) FROM employees WHERE department_id = e.department_id
);
This query returns employees who earn more than the average salary in their respective departments.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Query Optimization and Performance Tuning
&lt;/h2&gt;

&lt;p&gt;As your database grows, query optimization becomes a critical skill. Without proper tuning, even the simplest queries can become slow and inefficient. We are going to look at indexing and stored procedure&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Indexing
Indexes are essential for speeding up queries, especially when dealing with large datasets. By creating an index on columns that are frequently queried, you can reduce the time needed to retrieve data.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE INDEX idx_order_date ON orders(order_date);
This index speeds up queries that filter by order_date.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Stored Procedure
Stored procedure is a precompiled collection of one or more SQL statements that can be executed as a single unit. Stored procedures are particularly useful for repetitive tasks, improving both performance and maintainability.
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;--- An example of stored procedure that selects records from a table
CREATE PROCEDURE GetOrdersByDate 
AS
BEGIN
    SELECT order_id, customer_id, order_date
    FROM orders
    WHERE order_date = @order_date;
END;
--- Execute the stored procedure
EXEC GetOrdersByDate;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Data Modeling Best Practices
&lt;/h2&gt;

&lt;p&gt;Effective data modeling ensures that your database schema is optimized for both storage and query performance.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Normalization vs. Denormalization&lt;br&gt;
• Normalization reduces data redundancy and ensures data integrity by breaking down data into smaller tables.&lt;br&gt;
• Denormalization involves merging tables to reduce joins, which can speed up read-heavy applications, at the cost of additional storage and potential update anomalies.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Designing Efficient Relational Schemas&lt;br&gt;
When designing a schema, focus on the following:&lt;br&gt;
• Minimize redundancy and maintain consistency.&lt;br&gt;
• Ensure foreign key relationships between tables to enforce integrity.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Star Schema vs. Snowflake Schema&lt;br&gt;
• Star Schema: A simplified schema with a central fact table surrounded by dimension tables. Ideal for analytical queries.&lt;br&gt;
• Snowflake Schema: A more complex schema where dimension tables are normalized. While it saves storage, it may lead to slower queries due to the need for more joins.&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Example of a star schema:
CREATE TABLE fact_sales (
    sale_id INT PRIMARY KEY,
    product_id INT,
    customer_id INT,
    sales_amount DECIMAL(10, 2),
    sale_date DATE
);

CREATE TABLE dim_product (
    product_id INT PRIMARY KEY,
    product_name VARCHAR(255),
    category VARCHAR(100)
);

CREATE TABLE dim_customer (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(255)
);
In this example, the fact_sales table holds the sales data, while dim_product and dim_customer contain information on products and customers, respectively.

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Real-World Application &amp;amp; Case Study
&lt;/h2&gt;

&lt;p&gt;Consider the following query, which joins multiple tables but is slow due to lack of indexing&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT orders.order_id, customers.customer_name, SUM(order_items.quantity * order_items.price) AS total
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id
JOIN order_items ON orders.order_id = order_items.order_id
GROUP BY orders.order_id, customers.customer_name;

--- By adding indexes to the foreign key columns (customer_id, order_id), you can significantly improve performance.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Case Study: Transforming Raw Data into Structured Reports&lt;/strong&gt;&lt;br&gt;
Imagine you are tasked with transforming raw transactional data into monthly sales reports. Using SQL, you can aggregate data, join it with dimension tables for more context, and present it in a clean, readable format.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT 
    MONTH(order_date) AS month,
    SUM(order_items.quantity * order_items.price) AS total_sales
FROM orders
JOIN order_items ON orders.order_id = order_items.order_id
WHERE order_date BETWEEN '2024-01-01' AND '2024-12-31'
GROUP BY MONTH(order_date)
ORDER BY month;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Mastering SQL is a vital skill for any data engineer. By understanding and applying advanced SQL techniques, optimizing queries, and designing efficient data models, you can significantly enhance your ability to work with large datasets and build scalable data pipelines. The key takeaways are:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Understand core SQL concepts, including joins, filtering, and grouping.&lt;/li&gt;
&lt;li&gt; Use advanced techniques like recursive queries, window functions, and CTEs to solve complex problems.&lt;/li&gt;
&lt;li&gt; Focus on query optimization by analyzing execution plans and leveraging indexing.&lt;/li&gt;
&lt;li&gt; Follow data modelling best practices to design scalable and efficient schemas.
By applying these strategies to real-world projects, you'll be well on your way to mastering SQL and becoming a more effective data engineer.&lt;/li&gt;
&lt;/ol&gt;

</description>
    </item>
    <item>
      <title>Building Scalable Data Pipelines with Python – A Complete Guide</title>
      <dc:creator>Batrudin Jamaludin</dc:creator>
      <pubDate>Sat, 08 Feb 2025 14:38:40 +0000</pubDate>
      <link>https://dev.to/batrudin_haji/building-scalable-data-pipelines-with-python-a-complete-guide-40a7</link>
      <guid>https://dev.to/batrudin_haji/building-scalable-data-pipelines-with-python-a-complete-guide-40a7</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft9rs6c2q43bhwr5nrmiq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft9rs6c2q43bhwr5nrmiq.png" alt="Image description" width="800" height="452"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What is a Data Pipeline?
&lt;/h2&gt;

&lt;p&gt;Ever wondered how data is moved from a source to its destination or how messy data is transformed into clean and ready for analysis without doing it manually? Well, that is what a data pipeline is. It's a series of steps that automate the movement of data from various sources to a destination, ensuring that it is structured, clean, and ready for analysis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Key Functions of a Data Pipeline
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Extract&lt;/strong&gt;: The process of retrieving raw data from multiple sources (databases, APIs, files, etc.).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Transform&lt;/strong&gt;: Clean, filter, aggregate, or modify the data to make it useful.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Load&lt;/strong&gt;: Store the transformed data into a target system, the destination (data warehouse, database, or analytics tool).&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Hands-on Python ETL Implementation
&lt;/h2&gt;

&lt;p&gt;In this section, we will implement ETL Pipeline examples using Python:&lt;/p&gt;

&lt;h3&gt;
  
  
  Reading from CSV and Writing to PostgreSQL
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Generate sales data&lt;/strong&gt; (I have used Mockaroo to generate dummy data).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Create a PostgreSQL database&lt;/strong&gt;. I have used &lt;code&gt;aiven.io&lt;/code&gt; to store my transformed data and &lt;code&gt;DBeaver&lt;/code&gt; to connect to the database.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Implement the ETL process using Python&lt;/strong&gt;. Make sure to install the necessary libraries (&lt;code&gt;pandas&lt;/code&gt; and &lt;code&gt;sqlalchemy&lt;/code&gt;).
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;

&lt;span class="c1"&gt;# Database connection
&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;postgresql://username:password@localhost:5432/etl_db&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Extract
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sales.csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Transform the data
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;rename&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;sales_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;})&lt;/span&gt;  &lt;span class="c1"&gt;# Rename columns
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dropna&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;  &lt;span class="c1"&gt;# Drop null values
&lt;/span&gt;
&lt;span class="c1"&gt;# Load
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sales&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Data successfully loaded into PostgreSQL!&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Reading from API and Writing to Database
&lt;/h3&gt;

&lt;p&gt;We are going to use this endpoint to fetch JSON data (Staff data of a fictitious company): Sample JSON Data.&lt;/p&gt;

&lt;p&gt;Python Code to Fetch and Load API Data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;

&lt;span class="c1"&gt;# Step 1: Fetch data from API
&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://raw.githubusercontent.com/LuxDevHQ/LuxDevHQDataEngineeringGuide/refs/heads/main/samplejson.json&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Step 2: Extract - Convert JSON data to DataFrame
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Step 3: Transform - Clean and reshape the data
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;position&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;country&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt;  &lt;span class="c1"&gt;# Select relevant columns
&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;full_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;role&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;country&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;  &lt;span class="c1"&gt;# Rename columns for clarity
&lt;/span&gt;
&lt;span class="c1"&gt;# Step 4: Load - Insert the data into PostgreSQL
&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;postgresql://username:password@localhost:5432/etl_db&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;api_staff&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;API Data Loaded Successfully!&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Congratulations! 🎉
&lt;/h2&gt;

&lt;p&gt;If you've followed each step correctly, you've successfully implemented an ETL data pipeline using Python!&lt;/p&gt;

&lt;p&gt;This hands-on example demonstrates how to automate the process of moving data from CSV files and APIs into a database, streamlining your data processing workflows and making them more efficient and scalable.&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
