<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Eric Katumo</title>
    <description>The latest articles on DEV Community by Eric Katumo (@katumo).</description>
    <link>https://dev.to/katumo</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1038281%2F46257f3b-d9da-4aa7-b8a1-60df2aeb9914.png</url>
      <title>DEV Community: Eric Katumo</title>
      <link>https://dev.to/katumo</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/katumo"/>
    <language>en</language>
    <item>
      <title>Building a YouTube Channel Analytics Dashboard with Airflow, Spark, and Grafana</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Fri, 25 Apr 2025 13:18:54 +0000</pubDate>
      <link>https://dev.to/katumo/building-a-youtube-channel-analytics-dashboard-with-airflow-spark-and-grafana-1734</link>
      <guid>https://dev.to/katumo/building-a-youtube-channel-analytics-dashboard-with-airflow-spark-and-grafana-1734</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In today's creator economy, YouTube content creators rely heavily on performance metrics to guide their content strategies. While YouTube's native analytics dashboard provides basic insights, it often lacks the flexibility, depth, and customization options that serious creators need. To address this limitation, I built an automated data engineering pipeline that extracts data from the YouTube API, processes it with Apache Spark, stores it in PostgreSQL, and visualizes it with Grafana.&lt;/p&gt;

&lt;p&gt;This article details my implementation of this YouTube Channel Analytics Dashboard, developed as part of the Data Engineering Easter Holiday Challenge. The solution provides content creators with deeper insights into their channel performance, audience engagement, and optimal content strategies.&lt;/p&gt;

&lt;h2&gt;
  
  
  Problem Statement
&lt;/h2&gt;

&lt;p&gt;YouTube content creators face several challenges when trying to analyze their channel performance:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;YouTube's native dashboard offers limited flexibility and customization&lt;/li&gt;
&lt;li&gt;Manual data collection is time-consuming, error-prone, and not scalable&lt;/li&gt;
&lt;li&gt;Creators lack a centralized platform for tracking performance metrics over time&lt;/li&gt;
&lt;li&gt;Identifying optimal publishing schedules and content types requires complex analysis&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;To solve these problems, creators need an &lt;strong&gt;automated analytics data pipeline&lt;/strong&gt; that:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Periodically pulls video-level performance data from the YouTube API&lt;/li&gt;
&lt;li&gt;Cleans, aggregates, and stores that data in a structured format&lt;/li&gt;
&lt;li&gt;Delivers actionable insights through intuitive visualizations&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Solution Architecture
&lt;/h2&gt;

&lt;p&gt;The solution is built as a complete ETL (Extract, Transform, Load) pipeline with visualization capabilities:&lt;/p&gt;

&lt;p&gt;The pipeline consists of the following components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Extract&lt;/strong&gt;: A Python script that pulls channel and video data from the YouTube API v3&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Transform&lt;/strong&gt;: Apache Spark processes the extracted data and adds derived metrics&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Load&lt;/strong&gt;: Processed data is stored in a PostgreSQL database&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Orchestrate&lt;/strong&gt;: Apache Airflow schedules and manages the pipeline execution&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Visualize&lt;/strong&gt;: Grafana dashboards provide interactive visualizations of the data&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;All components were deployed on an Azure VM, making this a cost-effective solution for individual creators or small agencies.&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementation Details
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Data Extraction
&lt;/h3&gt;

&lt;p&gt;The extraction component uses the YouTube Data API v3 to collect channel and video data. The script handles API authentication, retrieves the channel's uploaded videos playlist, and extracts detailed metadata for each video.&lt;/p&gt;

&lt;p&gt;Here's the implementation of the extraction script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;googleapiclient.discovery&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;build&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;sys&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;

&lt;span class="c1"&gt;# Load variables from .env
&lt;/span&gt;&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# --- Configuration ---
&lt;/span&gt;&lt;span class="n"&gt;YOUTUBE_API_KEY&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;YOUTUBE_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;CHANNEL_ID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;YOUTUBE_CHANNEL_ID&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;OUTPUT_FILENAME&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;OUTPUT_FILENAME&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;validate_config&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;YOUTUBE_API_KEY&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error: YOUTUBE_API_KEY environment variable not set.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;CHANNEL_ID&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error: YOUTUBE_CHANNEL_ID environment variable not set.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;build_youtube_client&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;build&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;youtube&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;v3&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;developerKey&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;YOUTUBE_API_KEY&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_playlist_id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;channel_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;channels&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;part&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;contentDetails&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;channel_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;contentDetails&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;relatedPlaylists&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;uploads&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error fetching channel details: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_video_ids&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;playlist_id&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;next_page_token&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[],&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;
    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;playlistItems&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                &lt;span class="n"&gt;part&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;contentDetails&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                &lt;span class="n"&gt;playlistId&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;playlist_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                &lt;span class="n"&gt;maxResults&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
                &lt;span class="n"&gt;pageToken&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;next_page_token&lt;/span&gt;
            &lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;extend&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;contentDetails&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;videoId&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]])&lt;/span&gt;
            &lt;span class="n"&gt;next_page_token&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;nextPageToken&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;next_page_token&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;break&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error fetching playlist items: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;break&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;video_ids&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;get_video_details&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;all_video_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="nf"&gt;range&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
        &lt;span class="n"&gt;video_chunk&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;,&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;join&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="o"&gt;+&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;request&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;videos&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;part&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;snippet,statistics&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;video_chunk&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
            &lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;utcnow&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;isoformat&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Z&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
            &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;items&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]:&lt;/span&gt;
                &lt;span class="n"&gt;video_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;videoId&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;title&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;snippet&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;title&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;publishedAt&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;snippet&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;publishedAt&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;viewCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;statistics&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;viewCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;likeCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;statistics&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;likeCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;commentCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;statistics&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;commentCount&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
                    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;extractionTimestamp&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;timestamp&lt;/span&gt;
                &lt;span class="p"&gt;}&lt;/span&gt;
                &lt;span class="n"&gt;all_video_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;append&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;video_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error fetching video details: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;all_video_data&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Starting YouTube data extraction...&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 1: Validate configuration
&lt;/span&gt;    &lt;span class="nf"&gt;validate_config&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 2: Build YouTube client
&lt;/span&gt;    &lt;span class="n"&gt;youtube&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;build_youtube_client&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 3: Get the uploads playlist ID
&lt;/span&gt;    &lt;span class="n"&gt;playlist_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_playlist_id&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CHANNEL_ID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;playlist_id&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Could not retrieve playlist ID for channel &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;CHANNEL_ID&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 4: Fetch video IDs from the playlist
&lt;/span&gt;    &lt;span class="n"&gt;video_ids&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_video_ids&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;playlist_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;No videos found for channel &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;CHANNEL_ID&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 5: Get video details
&lt;/span&gt;    &lt;span class="n"&gt;video_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;get_video_details&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;youtube&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;video_ids&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="n"&gt;video_data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;No video data fetched.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 6: Save video data to file
&lt;/span&gt;    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nf"&gt;open&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;OUTPUT_FILENAME&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;w&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dump&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;video_data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;indent&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Data saved to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;OUTPUT_FILENAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;IOError&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error writing data to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;OUTPUT_FILENAME&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;file&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stderr&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Extraction process finished.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;sys&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The script follows these key steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Validates environment variables and API key&lt;/li&gt;
&lt;li&gt;Retrieves the channel's uploads playlist ID&lt;/li&gt;
&lt;li&gt;Extracts all video IDs from the playlist, handling pagination&lt;/li&gt;
&lt;li&gt;Fetches detailed metadata for each video (in batches of 50 to respect API limits)&lt;/li&gt;
&lt;li&gt;Saves the raw data to a JSON file for further processing&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Data Transformation and Enrichment
&lt;/h3&gt;

&lt;p&gt;The transformation component uses Apache Spark to process the raw JSON data and enrich it with additional metrics. This step converts date strings to timestamps, adds temporal dimensions (year, month, day, hour), and classifies videos based on performance.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;pyspark.sql.functions&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;col&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;

&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="n"&gt;POSTGRES_USER&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;POSTGRES_USER&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;POSTGRES_PASSWORD&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;POSTGRES_PASSWORD&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;POSTGRES_HOST&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;POSTGRES_HOST&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;POSTGRES_PORT&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;POSTGRES_PORT&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;POSTGRES_DB&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;POSTGRES_DB&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Initialize Spark Session with PostgreSQL JDBC driver
&lt;/span&gt;&lt;span class="n"&gt;spark&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SparkSession&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;appName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;YouTube Data Transformation&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;config&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;spark.jars&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/path/to/postgresql-driver.jar&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getOrCreate&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Read raw JSON data
&lt;/span&gt;&lt;span class="n"&gt;df_raw&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;read&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;option&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;multiline&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/path/to/youtube_videos_raw.json&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df_raw&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;youtube_videos_raw&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Basic transformation - type conversion and filtering
&lt;/span&gt;&lt;span class="n"&gt;sql_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    SELECT
        videoId,
        title,
        publishedAt,
        CAST(viewCount AS INT) AS viewCount,
        CAST(likeCount AS INT) AS likeCount,
        CAST(commentCount AS INT) AS commentCount,
        extractionTimestamp
    FROM youtube_videos_raw
    WHERE viewCount IS NOT NULL
    ORDER BY viewCount DESC
&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
&lt;span class="n"&gt;df_transformed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sql_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Register transformed DataFrame for enrichment
&lt;/span&gt;&lt;span class="n"&gt;df_transformed&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;createOrReplaceTempView&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;youtube_transformed&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Enhanced enrichment - adding temporal dimensions and performance classification
&lt;/span&gt;&lt;span class="n"&gt;enriched_query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
    SELECT
        *,
        YEAR(TO_TIMESTAMP(publishedAt)) AS year,
        MONTH(TO_TIMESTAMP(publishedAt)) AS month,
        DAY(TO_TIMESTAMP(publishedAt)) AS day_of_month,
        DATE_FORMAT(TO_TIMESTAMP(publishedAt), &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;EEEE&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;) AS day_of_week,
        HOUR(TO_TIMESTAMP(publishedAt)) AS hour,
        WEEKOFYEAR(TO_TIMESTAMP(publishedAt)) AS week,
        CASE
            WHEN viewCount &amp;gt; 100000 THEN &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;viral&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
            WHEN viewCount &amp;gt; 5000 THEN &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;high&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
            ELSE &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;normal&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;
        END AS performance_class
    FROM youtube_transformed
&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
&lt;span class="n"&gt;df_enriched&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;enriched_query&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;df_enriched&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;show&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Configure PostgreSQL connection
&lt;/span&gt;&lt;span class="n"&gt;jdbc_url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;jdbc:postgresql://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;POSTGRES_HOST&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;POSTGRES_PORT&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;POSTGRES_DB&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="n"&gt;connection_properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;POSTGRES_USER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;password&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;POSTGRES_PASSWORD&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;driver&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;org.postgresql.Driver&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Save enriched data to PostgreSQL
&lt;/span&gt;&lt;span class="n"&gt;df_enriched&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;write&lt;/span&gt; \
    &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;jdbc&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;jdbc_url&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;table&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dataengineering.youtube_videos_enriched&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;mode&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;connection_properties&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;✅ Enriched data successfully appended to PostgreSQL.&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;spark&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;stop&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The transformation script:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Reads the raw JSON data extracted from the YouTube API&lt;/li&gt;
&lt;li&gt;Performs basic transformations (type casting, filtering null values)&lt;/li&gt;
&lt;li&gt;Enriches the data with additional dimensions:

&lt;ul&gt;
&lt;li&gt;Temporal breakdowns (year, month, day, hour, week)&lt;/li&gt;
&lt;li&gt;Day of week for publishing analysis&lt;/li&gt;
&lt;li&gt;Performance classification based on view count thresholds&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;Loads the enriched data into a PostgreSQL database&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Workflow Orchestration with Airflow
&lt;/h3&gt;

&lt;p&gt;To ensure the pipeline runs consistently and reliably, I used Apache Airflow to orchestrate the workflow. The DAG (Directed Acyclic Graph) defines the sequence of tasks and their dependencies:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timedelta&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.bash&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BashOperator&lt;/span&gt;

&lt;span class="c1"&gt;# Default arguments
&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[USER_NAME]&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;depends_on_past&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2025&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;23&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;[EMAIL_ADDRESS]&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_failure&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_retry&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retry_delay&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;minutes&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# DAG definition
&lt;/span&gt;&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;youtube_analytics_pipeline&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;@daily&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;catchup&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;tags&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;YouTube&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Analytics&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Spark&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;run_scraper&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BashOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;run_youtube_scraper&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bash_command&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        source /path/to/virtualenv/bin/activate &amp;amp;&amp;amp; &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;        python3 /path/to/youtube_scraper.py
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;run_transform&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BashOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;run_spark_transform&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bash_command&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
        source /path/to/virtualenv/bin/activate &amp;amp;&amp;amp; &lt;/span&gt;&lt;span class="se"&gt;\
&lt;/span&gt;&lt;span class="s"&gt;        python3 /path/to/spark_transform.py
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;run_scraper&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;run_transform&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The Airflow DAG:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Runs daily, with configurable retry logic&lt;/li&gt;
&lt;li&gt;First executes the YouTube data extraction script&lt;/li&gt;
&lt;li&gt;Once extraction is complete, triggers the Spark transformation and loading process&lt;/li&gt;
&lt;li&gt;Provides error handling and notification capabilities&lt;/li&gt;
&lt;li&gt;Can be extended to include additional tasks (like data validation or report generation)&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Data Visualization with Grafana
&lt;/h3&gt;

&lt;p&gt;After setting up the data pipeline, I created a comprehensive Grafana dashboard to visualize the YouTube channel analytics. The dashboard addresses the key questions content creators need answered:&lt;/p&gt;

&lt;h4&gt;
  
  
  Key Visualizations
&lt;/h4&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Channel Growth Overview&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Time series graph of views, likes, and comments over time&lt;/li&gt;
&lt;li&gt;Cumulative view count by month&lt;/li&gt;
&lt;li&gt;Weekly growth rate comparison&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Top Performing Videos&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Table of highest viewed videos with title, view count, likes, and comments&lt;/li&gt;
&lt;li&gt;Engagement rate calculation (likes + comments / views)&lt;/li&gt;
&lt;li&gt;Performance classification distribution&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Publishing Time Analysis&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Heatmap of views by day of week and hour&lt;/li&gt;
&lt;li&gt;Bar chart of average view count by day of week&lt;/li&gt;
&lt;li&gt;Pie chart of views by time of day (morning, afternoon, evening, night)&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Audience Engagement Patterns&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Line graph tracking engagement metrics over time&lt;/li&gt;
&lt;li&gt;Scatter plot of video length vs. engagement rate&lt;/li&gt;
&lt;li&gt;Correlation between publishing frequency and view count&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F17j4j5ecyis28twuthsi.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F17j4j5ecyis28twuthsi.png" alt="Image description" width="800" height="205"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here's a sample query used for the day of week performance visualization:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; 
  &lt;span class="n"&gt;day_of_week&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
  &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;viewCount&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;avg_views&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;likeCount&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;avg_likes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;commentCount&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;avg_comments&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; 
  &lt;span class="n"&gt;dataengineering&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;youtube_videos_enriched&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; 
  &lt;span class="n"&gt;day_of_week&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; 
  &lt;span class="k"&gt;CASE&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Monday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Tuesday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Wednesday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Thursday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;4&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Friday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Saturday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;6&lt;/span&gt;
    &lt;span class="k"&gt;WHEN&lt;/span&gt; &lt;span class="n"&gt;day_of_week&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'Sunday'&lt;/span&gt; &lt;span class="k"&gt;THEN&lt;/span&gt; &lt;span class="mi"&gt;7&lt;/span&gt;
  &lt;span class="k"&gt;END&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Data Analysis and Insights
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Views by Time of Day
&lt;/h3&gt;

&lt;p&gt;The pie chart reveals clear viewing patterns throughout the day:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Time Period&lt;/th&gt;
&lt;th&gt;Audience Behavior&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Morning (5am-11am)&lt;/td&gt;
&lt;td&gt;
&lt;strong&gt;Highest engagement&lt;/strong&gt; - nearly half of all views&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Night (9pm-4am)&lt;/td&gt;
&lt;td&gt;Strong secondary viewing window&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Afternoon (12pm-4pm)&lt;/td&gt;
&lt;td&gt;Moderate engagement&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Evening (5pm-8pm)&lt;/td&gt;
&lt;td&gt;Lowest engagement period&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h3&gt;
  
  
  Day of Week Performance
&lt;/h3&gt;

&lt;p&gt;The bar chart highlights significant day-to-day variations:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Day&lt;/th&gt;
&lt;th&gt;Avg. Views&lt;/th&gt;
&lt;th&gt;Performance&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;Sunday&lt;/td&gt;
&lt;td&gt;569&lt;/td&gt;
&lt;td&gt;Peak performance (100%)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Thursday&lt;/td&gt;
&lt;td&gt;507&lt;/td&gt;
&lt;td&gt;Strong performer (89% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Wednesday&lt;/td&gt;
&lt;td&gt;428&lt;/td&gt;
&lt;td&gt;Good performer (75% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Friday&lt;/td&gt;
&lt;td&gt;322&lt;/td&gt;
&lt;td&gt;Moderate performer (57% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Monday&lt;/td&gt;
&lt;td&gt;313&lt;/td&gt;
&lt;td&gt;Moderate performer (55% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Saturday&lt;/td&gt;
&lt;td&gt;310&lt;/td&gt;
&lt;td&gt;Moderate performer (54% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;Tuesday&lt;/td&gt;
&lt;td&gt;183&lt;/td&gt;
&lt;td&gt;Lowest performer (32% of peak)&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h3&gt;
  
  
  Content Strategy Recommendations
&lt;/h3&gt;

&lt;p&gt;Based on the analytics, here are data-driven recommendations:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Optimal Publishing Schedule&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Primary&lt;/strong&gt;: Sunday mornings (5am-11am)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Secondary&lt;/strong&gt;: Thursday mornings (5am-11am)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Avoid&lt;/strong&gt;: Tuesday releases entirely&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Content Distribution Strategy&lt;/strong&gt;:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Release major content on Sundays to maximize initial viewership&lt;/li&gt;
&lt;li&gt;Schedule promotional content on Thursdays to build weekend momentum&lt;/li&gt;
&lt;li&gt;Use Wednesday slots for mid-week engagement maintenance&lt;/li&gt;
&lt;li&gt;Consider Monday content to start the week strong&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Challenges and Solutions
&lt;/h2&gt;

&lt;p&gt;During the implementation of this project, I encountered several challenges:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;API Rate Limiting&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Challenge&lt;/strong&gt;: YouTube API has strict quotas (10,000 units per day)&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Solution&lt;/strong&gt;: Implemented batch processing and clever quota usage by minimizing API calls&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Data Quality Issues&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Challenge&lt;/strong&gt;: Missing statistics fields for some videos&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Solution&lt;/strong&gt;: Added robust error handling and null value management in the Spark transformation&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Future Enhancements
&lt;/h2&gt;

&lt;p&gt;While the current implementation provides valuable insights, several enhancements could further improve the analytics platform:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Sentiment Analysis&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Integrate comment analysis using NLP to measure audience sentiment&lt;/li&gt;
&lt;li&gt;Track sentiment changes over time and correlation with content topics&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Competitor Analysis&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Add capability to benchmark performance against similar channels&lt;/li&gt;
&lt;li&gt;Identify content gaps and opportunities&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Recommendation Engine&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Develop a recommendation system for optimal publishing times&lt;/li&gt;
&lt;li&gt;Provide content suggestions based on historical performance&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Automated Reporting&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Implement automatic weekly/monthly reports via email&lt;/li&gt;
&lt;li&gt;Add anomaly detection to alert on unusual performance changes&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Cross-Platform Integration&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Connect with other social media APIs (Twitter, Instagram, TikTok)&lt;/li&gt;
&lt;li&gt;Provide holistic view of content performance across platforms&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  References
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;a href="https://developers.google.com/youtube/v3/docs" rel="noopener noreferrer"&gt;YouTube Data API v3 Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://spark.apache.org/docs/latest/" rel="noopener noreferrer"&gt;Apache Spark Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://airflow.apache.org/docs/" rel="noopener noreferrer"&gt;Apache Airflow Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://grafana.com/docs/" rel="noopener noreferrer"&gt;Grafana Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://www.postgresql.org/docs/" rel="noopener noreferrer"&gt;PostgreSQL Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://github.com/HarunMbaabu/DataEngineeringEasterHolidayChallenge" rel="noopener noreferrer"&gt;Data Engineering Easter Holiday Challenge&lt;/a&gt;&lt;/li&gt;
&lt;/ol&gt;

</description>
      <category>postgres</category>
      <category>spark</category>
      <category>airflow</category>
    </item>
    <item>
      <title>Building an Automated Weather Data Pipeline with Apache Kafka and Cassandra</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Mon, 07 Apr 2025 10:02:16 +0000</pubDate>
      <link>https://dev.to/luxdevhq/building-an-automated-weather-data-pipeline-with-apache-kafka-and-cassandra-23m</link>
      <guid>https://dev.to/luxdevhq/building-an-automated-weather-data-pipeline-with-apache-kafka-and-cassandra-23m</guid>
      <description>&lt;p&gt;This article creates an end-to-end weather data pipeline that collects weather data from multiple African cities, processes it through Apache Kafka, and stores it in a Cassandra database for analysis. The infrastructure is hosted on Microsoft Azure.&lt;/p&gt;

&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Weather data pipelines are essential components of modern environmental monitoring systems. They enable organizations to collect, process, and analyze meteorological information in real-time, facilitating better decision-making. This tutorial demonstrates how to build a simple yet robust weather data pipeline using:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Python&lt;/strong&gt; for data fetching and processing&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;OpenWeatherMap API&lt;/strong&gt; as our data source&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Apache Kafka&lt;/strong&gt; for handling real-time data streams&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Apache Cassandra&lt;/strong&gt; for scalable data storage&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Microsoft Azure&lt;/strong&gt; for cloud infrastructure&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;By the end of this article, you will understand how these technologies are combined to construct a data pipeline that can be scaled to more complex use cases.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture Overview
&lt;/h2&gt;

&lt;p&gt;Our weather data pipeline consists of the following workflow:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Data Extraction&lt;/strong&gt;: Weather data is extracted for five African cities from the OpenWeatherMap API using the producer script&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Streaming&lt;/strong&gt;: Weather data is streamed into a Kafka topic&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Consumption&lt;/strong&gt;: A consumer script reads from the Kafka topic&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Storage&lt;/strong&gt;: The data is processed and stored in a Cassandra database&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Data Query&lt;/strong&gt;: We can query the stored data using CQL (Cassandra Query Language)&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Azure Infrastructure Setup
&lt;/h2&gt;

&lt;p&gt;This project leverages Microsoft Azure for hosting our components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Azure Virtual Machine&lt;/strong&gt;: Ubuntu 20.04 LTS server to run our Python scripts&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Network Security Group&lt;/strong&gt;: Configured to allow necessary traffic for Kafka and Cassandra&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Azure Managed Disks&lt;/strong&gt;: For persistent storage of Cassandra data&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;To set up an Azure VM for this project:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Log in to the &lt;a href="https://portal.azure.com" rel="noopener noreferrer"&gt;Azure Portal&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;Create a new Virtual Machine with Ubuntu 20.04 LTS&lt;/li&gt;
&lt;li&gt;Select at least 2 vCPUs and 8GB RAM for optimal performance&lt;/li&gt;
&lt;li&gt;Configure networking to allow inbound SSH (port 22), Kafka (port 9092), and Cassandra (port 9042)&lt;/li&gt;
&lt;li&gt;Create and download SSH keys for secure access&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;After provisioning, connect to your VM using SSH:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;ssh &lt;span class="nt"&gt;-i&lt;/span&gt; /path/to/your/key.pem azureuser@your-vm-ip-address
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Setting Up Confluent Cloud for Your Weather Data Pipeline
&lt;/h2&gt;

&lt;p&gt;Before diving into the implementation, we'll set up a managed Kafka environment on Confluent Cloud.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating a Confluent Cloud Account
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;Visit &lt;a href="https://www.confluent.io/" rel="noopener noreferrer"&gt;Confluent.io&lt;/a&gt; and click on "Get Started Free" &lt;/li&gt;
&lt;li&gt;Complete the registration process by providing your email and setting up a password&lt;/li&gt;
&lt;li&gt;Verify your email address to activate your account&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Setting Up a Kafka Cluster
&lt;/h3&gt;

&lt;p&gt;Once logged in to Confluent Cloud:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Click on "Create cluster" in the dashboard&lt;/li&gt;
&lt;li&gt;Choose Azure as your cloud provider and select the region closest to your Azure VM&lt;/li&gt;
&lt;li&gt;Select the "Basic" cluster type for development purposes (you can upgrade later)&lt;/li&gt;
&lt;li&gt;Name your cluster (e.g., "weather-pipeline-cluster")&lt;/li&gt;
&lt;li&gt;Click "Launch cluster"&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Creating a Kafka Topic
&lt;/h3&gt;

&lt;p&gt;After your cluster is provisioned:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Select your cluster from the dashboard&lt;/li&gt;
&lt;li&gt;Navigate to the "Topics" section in the left sidebar&lt;/li&gt;
&lt;li&gt;Click "Create topic"&lt;/li&gt;
&lt;li&gt;Enter "weather_data" as the topic name&lt;/li&gt;
&lt;li&gt;Set the number of partitions (start with 6 for this example)&lt;/li&gt;
&lt;li&gt;Leave the default retention settings (7 days)&lt;/li&gt;
&lt;li&gt;Click "Create with defaults" (or customize advanced settings if needed)&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Creating API Keys for Authentication
&lt;/h3&gt;

&lt;p&gt;To connect your application to the Confluent Cloud Kafka cluster:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;In your cluster dashboard, click on "API keys" in the left sidebar&lt;/li&gt;
&lt;li&gt;Click "Create key"&lt;/li&gt;
&lt;li&gt;Select "Global access" (or "Granular access" if you prefer more control)&lt;/li&gt;
&lt;li&gt;Provide a description (e.g., "Weather Pipeline API Key")&lt;/li&gt;
&lt;li&gt;Click "Create key"&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Important&lt;/strong&gt;: Save both the API key and secret in a secure location as they will only be shown once&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Testing the Connection
&lt;/h3&gt;

&lt;p&gt;To verify your connection settings, create a simple test script on your Azure VM:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka.admin&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;AdminClient&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;

&lt;span class="c1"&gt;# Load environment variables
&lt;/span&gt;&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Kafka configuration
&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;security.protocol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SASL_SSL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.mechanisms&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PLAIN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_SECRET&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Create Admin client
&lt;/span&gt;&lt;span class="n"&gt;admin_client&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;AdminClient&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# List topics
&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;admin_client&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;list_topics&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;timeout&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Available topics:&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nf"&gt;list&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;keys&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Setting Up Cassandra on Azure VM
&lt;/h2&gt;

&lt;p&gt;We'll install Apache Cassandra directly on our Azure VM:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Install Java&lt;/span&gt;
&lt;span class="nb"&gt;sudo &lt;/span&gt;apt update
&lt;span class="nb"&gt;sudo &lt;/span&gt;apt &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-y&lt;/span&gt; openjdk-8-jdk

&lt;span class="c"&gt;# Add the Apache Cassandra repository&lt;/span&gt;
&lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"deb https://downloads.apache.org/cassandra/debian 40x main"&lt;/span&gt; | &lt;span class="nb"&gt;sudo tee&lt;/span&gt; &lt;span class="nt"&gt;-a&lt;/span&gt; /etc/apt/sources.list.d/cassandra.list
curl https://downloads.apache.org/cassandra/KEYS | &lt;span class="nb"&gt;sudo &lt;/span&gt;apt-key add -
&lt;span class="nb"&gt;sudo &lt;/span&gt;apt update

&lt;span class="c"&gt;# Install Cassandra&lt;/span&gt;
&lt;span class="nb"&gt;sudo &lt;/span&gt;apt &lt;span class="nb"&gt;install&lt;/span&gt; &lt;span class="nt"&gt;-y&lt;/span&gt; cassandra

&lt;span class="c"&gt;# Start the service&lt;/span&gt;
&lt;span class="nb"&gt;sudo &lt;/span&gt;systemctl start cassandra
&lt;span class="nb"&gt;sudo &lt;/span&gt;systemctl &lt;span class="nb"&gt;enable &lt;/span&gt;cassandra

&lt;span class="c"&gt;# Verify Cassandra is running&lt;/span&gt;
nodetool status
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Configure Cassandra to accept remote connections by editing &lt;code&gt;/etc/cassandra/cassandra.yaml&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;sudo &lt;/span&gt;nano /etc/cassandra/cassandra.yaml
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Change the following values:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;listen_address&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;&amp;lt;your-vm-ip-address&amp;gt;&lt;/span&gt;
&lt;span class="na"&gt;rpc_address&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;0.0.0.0&lt;/span&gt;
&lt;span class="na"&gt;seeds&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;&amp;lt;your-vm-ip-address&amp;gt;"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Restart Cassandra to apply changes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;sudo &lt;/span&gt;systemctl restart cassandra
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Setting Up the Environment
&lt;/h2&gt;

&lt;p&gt;Now on your Azure VM, set up the development environment:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Create a project directory&lt;/span&gt;
&lt;span class="nb"&gt;mkdir &lt;/span&gt;weather_data_pipeline
&lt;span class="nb"&gt;cd &lt;/span&gt;weather_data_pipeline

&lt;span class="c"&gt;# Create and activate a virtual environment&lt;/span&gt;
python3 &lt;span class="nt"&gt;-m&lt;/span&gt; venv myvenv
&lt;span class="nb"&gt;source &lt;/span&gt;myvenv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then install the required packages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install &lt;/span&gt;confluent-kafka requests python-dotenv cassandra-driver
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create a &lt;code&gt;.env&lt;/code&gt; file to store your environment variables:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;WEATHER_API_KEY=your_openweathermap_api_key
BOOTSTRAP_SERVERS=your_confluent_bootstrap_servers
KAFKA_API_KEY=your_confluent_api_key
KAFKA_API_SECRET=your_confluent_api_secret
CASSANDRA_HOST=localhost
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Creating the Producer Script
&lt;/h2&gt;

&lt;p&gt;The producer script is responsible for fetching weather data from the OpenWeatherMap API and sending it to a Kafka topic. Let's break down the key components of our &lt;code&gt;weather_producer.py&lt;/code&gt; script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Producer&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;

&lt;span class="c1"&gt;# Configure logging
&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basicConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;INFO&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%(asctime)s - %(name)s - %(levelname)s - %(message)s&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Load environment variables from .env file
&lt;/span&gt;&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# List of cities
&lt;/span&gt;&lt;span class="n"&gt;cities&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Nairobi&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Johannesburg&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Casablanca&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Lagos&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Kinshasa&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
&lt;span class="p"&gt;]&lt;/span&gt;

&lt;span class="c1"&gt;# OpenWeatherMap API setup
&lt;/span&gt;&lt;span class="n"&gt;owm_api_key&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;WEATHER_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;owm_base_url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://api.openweathermap.org/data/2.5/weather&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;fetch_weather_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Fetch weather data from OpenWeatherMap API using city name.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;owm_base_url&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;?q=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;&amp;amp;appid=&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;owm_api_key&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;&amp;amp;units=metric&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;raise_for_status&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;extracted_city&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;city&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;exceptions&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RequestException&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error fetching data for &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Callback for Kafka message delivery status.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Message delivery failed: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Message delivered to &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; [&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;partition&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;] at offset &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;offset&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Kafka configuration
&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;security.protocol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SASL_SSL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.mechanisms&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PLAIN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_SECRET&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;broker.address.family&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;v4&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;message.send.max.retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retry.backoff.ms&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;500&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="n"&gt;producer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Producer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;produce_weather_data&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Fetch weather data for each city and produce to Kafka.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;city&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;cities&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;fetch_weather_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;value&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;dumps&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;callback&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;delivery_report&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Failed to fetch data for &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;flush&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;produce_weather_data&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Data extraction and production complete&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Notable Components of the Producer Script:
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Environment Setup&lt;/strong&gt;: We import &lt;code&gt;dotenv&lt;/code&gt; to load environment variables and initialize logging to track script execution.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;City List&lt;/strong&gt;: We define a list of African cities for which we want to collect weather data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;API Integration&lt;/strong&gt;: The function &lt;code&gt;fetch_weather_data()&lt;/code&gt; makes HTTP requests to the OpenWeatherMap API, asking for metric units of measurement for temperature.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Kafka Configuration&lt;/strong&gt;: We set up a Kafka producer with security credentials and reliability properties.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Data Production&lt;/strong&gt;: The &lt;code&gt;produce_weather_data()&lt;/code&gt; method reads data for each city and produces it to the "weather_data" Kafka topic.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Delivery Reporting&lt;/strong&gt;: The &lt;code&gt;delivery_report()&lt;/code&gt; callback method prints if messages were delivered to Kafka successfully.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Creating the Consumer Script
&lt;/h2&gt;

&lt;p&gt;Now, let's create the consumer script that will read data from the Kafka topic and store it in Cassandra. Here is our &lt;code&gt;weather_consumer.py&lt;/code&gt; script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;uuid&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;confluent_kafka&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;cassandra.cluster&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;Cluster&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;

&lt;span class="c1"&gt;# Configure logging
&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;basicConfig&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;level&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;INFO&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="nb"&gt;format&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%(asctime)s - %(name)s - %(levelname)s - %(message)s&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;logger&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logging&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getLogger&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;__name__&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Load environment variables
&lt;/span&gt;&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Kafka configuration
&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;bootstrap.servers&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;BOOTSTRAP_SERVERS&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;group.id&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_consumer_group&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;auto.offset.reset&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;security.protocol&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;SASL_SSL&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.mechanisms&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;PLAIN&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.username&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;sasl.password&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;KAFKA_API_SECRET&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;broker.address.family&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;v4&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Cassandra configuration
&lt;/span&gt;&lt;span class="n"&gt;cassandra_host&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;CASSANDRA_HOST&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Connect to Cassandra
&lt;/span&gt;&lt;span class="n"&gt;cluster&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Cluster&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;cassandra_host&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="n"&gt;session&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;initialize_cassandra&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Initialize Cassandra connection and create keyspace/table if needed.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;global&lt;/span&gt; &lt;span class="n"&gt;session&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;session&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;cluster&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;connect&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="c1"&gt;# Create keyspace if it doesn't exist
&lt;/span&gt;        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
            CREATE KEYSPACE IF NOT EXISTS weather_data
            WITH replication = {&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;class&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;SimpleStrategy&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;, &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;replication_factor&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;: &lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;1&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;}
            AND durable_writes = true;
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Use the keyspace
&lt;/span&gt;        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;USE weather_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="c1"&gt;# Create table if it doesn't exist
&lt;/span&gt;        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
            CREATE TABLE IF NOT EXISTS simple_weather (
                id uuid PRIMARY KEY,
                city_name text,
                temperature float,
                timestamp timestamp,
                weather_description text,
                weather_main text
            );
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Cassandra table ready&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Cassandra initialization error: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;insert_weather_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Insert weather data into Cassandra.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="c1"&gt;# Extract interesting fields
&lt;/span&gt;        &lt;span class="n"&gt;city&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;extracted_city&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;temp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;main&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;temp&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;timestamp&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dt&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;weather_desc&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;description&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
        &lt;span class="n"&gt;weather_main&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;main&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;

        &lt;span class="c1"&gt;# Insert data into Cassandra
&lt;/span&gt;        &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;
            INSERT INTO simple_weather (id, city_name, temperature, timestamp, weather_description, weather_main)
            VALUES (%s, %s, %s, toTimestamp(now()), %s, %s)
        &lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
        &lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;uuid4&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;temp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;weather_desc&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;weather_main&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Inserted weather for &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;city&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; at &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;timestamp&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;
    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error inserting data: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;consume_weather_data&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
    &lt;span class="sh"&gt;"""&lt;/span&gt;&lt;span class="s"&gt;Consume weather data from Kafka and store in Cassandra.&lt;/span&gt;&lt;span class="sh"&gt;"""&lt;/span&gt;
    &lt;span class="c1"&gt;# Initialize Cassandra
&lt;/span&gt;    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="ow"&gt;not&lt;/span&gt; &lt;span class="nf"&gt;initialize_cassandra&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;

    &lt;span class="c1"&gt;# Create Kafka consumer
&lt;/span&gt;    &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;Consumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;kafka_config&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;subscribe&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;weather_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Subscribed to topic: weather_data&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;poll&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mf"&gt;1.0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="ow"&gt;is&lt;/span&gt; &lt;span class="bp"&gt;None&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="k"&gt;continue&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
                &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;().&lt;/span&gt;&lt;span class="nf"&gt;code&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="n"&gt;KafkaError&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;_PARTITION_EOF&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="k"&gt;continue&lt;/span&gt;
                &lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                    &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Consumer error: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="k"&gt;break&lt;/span&gt;
            &lt;span class="k"&gt;try&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;weather_data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;loads&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;value&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
                &lt;span class="nf"&gt;insert_weather_data&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;Exception&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
                &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Error processing message: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;except&lt;/span&gt; &lt;span class="nb"&gt;KeyboardInterrupt&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Stopping consumer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;finally&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;cluster&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;shutdown&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;consume_weather_data&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Key Components of the Consumer Script:
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Kafka Consumer Configuration&lt;/strong&gt;: We set up a consumer to consume from the "weather_data" topic, with the appropriate security settings.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cassandra Connection&lt;/strong&gt;: We connect to the Cassandra cluster directly without authentication credentials.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Database Initialization&lt;/strong&gt;: The &lt;code&gt;initialize_cassandra()&lt;/code&gt; function creates the necessary keyspace and table if they haven't already been created.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Data Processing&lt;/strong&gt;: The &lt;code&gt;insert_weather_data()&lt;/code&gt; function extracts pertinent fields from the weather data and inserts them into Cassandra.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Continuous Consumption&lt;/strong&gt;: The &lt;code&gt;consume_weather_data()&lt;/code&gt; function continuously polls the Kafka topic and processes any messages it finds.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Running the Pipeline on Azure
&lt;/h2&gt;

&lt;p&gt;Now that we've written both the producer and consumer scripts, let's run them on our Azure VM to see our data pipeline in action.&lt;/p&gt;

&lt;p&gt;First, make sure your virtual environment is activated and all dependencies are installed. Then, run the producer script:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;python3 weather_producer.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As shown in the first screenshot, the producer is successfully retrieving weather data and producing it to Kafka:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 8
2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 9
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 8
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 9
2025-04-07 05:56:47,072 - __main__ - INFO - Message delivered to weather_data [2] at offset 4
2025-04-07 05:56:47,072 - __main__ - INFO - Data extraction and production complete
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Next, run the consumer script in a separate terminal:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;python3 weather_consumer.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The consumer subscribes to the Kafka topic and begins processing messages, as shown the second screenshot:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Subscribed to topic: weather_data
Cassandra table ready
Inserted weather for Johannesburg at 2025-04-07 05:56:45
Inserted weather for Lagos at 2025-04-07 05:55:29
Inserted weather for Nairobi at 2025-04-07 05:56:45
Inserted weather for Casablanca at 2025-04-07 05:53:05
Inserted weather for Kinshasa at 2025-04-07 05:56:46
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Querying the Data
&lt;/h2&gt;

&lt;p&gt;Now that the data is in Cassandra, we can query it using CQL. On your Azure VM, use the &lt;code&gt;cqlsh&lt;/code&gt; tool:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;cqlsh localhost
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then query to retrieve the weather data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;USE&lt;/span&gt; &lt;span class="n"&gt;weather_data&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;simple_weather&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As in your third screenshot, this pulls back all of the weather data for our cities:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;id                                   | city_name     | temperature | timestamp                       | weather_description | weather_main
--------------------------------------+--------------+-------------+--------------------------------+---------------------+-------------
9781d8f2-e7d1-484f-a780-4df61ed7c7da | Johannesburg |       15.02 | 2025-04-07 06:22:41.000000+0000 |    overcast clouds  |      Clouds
f64ab11f-a732-4d47-84fb-f450d1e4a9bc |     Kinshasa |       21.21 | 2025-04-07 06:22:41.000000+0000 |         few clouds  |      Clouds
01840d74-0b10-461b-98b4-8eaf5fce2e0c |      Nairobi |       18.62 | 2025-04-07 06:22:41.000000+0000 |     broken clouds   |      Clouds
546d4df6-45b4-4956-94b1-b92b92bc1e84 |        Lagos |       26.57 | 2025-04-07 06:22:41.000000+0000 |    overcast clouds  |      Clouds
95d0dc97-a940-45a9-ab5d-28b9deba7b02 |   Casablanca |       14.07 | 2025-04-07 06:20:30.000000+0000 |   scattered clouds  |      Clouds
(5 rows)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Extending the Pipeline
&lt;/h2&gt;

&lt;p&gt;This basic pipeline can be extended as follows:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Add More Cities&lt;/strong&gt;: Expand the list of cities to get a broader geographic coverage.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Collect More Data Points&lt;/strong&gt;: Modify the scripts to collect additional weather parameters like humidity, wind speed, and barometric pressure.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Data Aggregation&lt;/strong&gt;: Add functionality to calculate averages, minimums, and maximums over time intervals of varying sizes.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Visualizations&lt;/strong&gt;: Connect a visualization software like Grafana or Azure Data Explorer to your Cassandra database to create dashboards.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Alerts&lt;/strong&gt;: Configure alerts for extreme weather conditions by adding thresholds for temperature, rainfall, etc.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scale with Azure Kubernetes Service&lt;/strong&gt;: For larger deployments, consider containerizing your applications and deploying them on AKS.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Common Issues and Troubleshooting
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;API Rate Limiting&lt;/strong&gt;: The OpenWeatherMap API rate limits free tier accounts. If you're dealing with a large amount of cities, you might hit these limits. Consider using a paid tier or request throttling.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Kafka Connection Issues&lt;/strong&gt;: Ensure that your Kafka credentials and bootstrap servers are correctly configured in the &lt;code&gt;.env&lt;/code&gt; file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Cassandra Connectivity&lt;/strong&gt;: Ensure that your Cassandra instance is accessible on your Azure VM and that the firewall rules allow connections.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Azure VM Connectivity&lt;/strong&gt;: Check that your Network Security Group allows the necessary inbound and outbound traffic.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Disk Space&lt;/strong&gt;: Monitor your Azure VM's disk space, especially if storing large amounts of historical weather data.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Resources
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/documentation/" rel="noopener noreferrer"&gt;Apache Kafka Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://cassandra.apache.org/doc/latest/" rel="noopener noreferrer"&gt;Apache Cassandra Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html" rel="noopener noreferrer"&gt;Confluent Kafka Python Client&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.confluent.io/cloud/current/overview.html" rel="noopener noreferrer"&gt;Confluent Cloud Documentation&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://openweathermap.org/api" rel="noopener noreferrer"&gt;OpenWeatherMap API Documentation&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>dataengineering</category>
      <category>data</category>
      <category>apachekafka</category>
    </item>
    <item>
      <title>Building an Automated Bitcoin Price ETL Pipeline with Airflow and PostgreSQL</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Mon, 31 Mar 2025 19:07:56 +0000</pubDate>
      <link>https://dev.to/luxdevhq/building-an-automated-bitcoin-price-etl-pipeline-with-airflow-and-postgresql-1ok7</link>
      <guid>https://dev.to/luxdevhq/building-an-automated-bitcoin-price-etl-pipeline-with-airflow-and-postgresql-1ok7</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.&lt;/p&gt;

&lt;p&gt;This project demonstrates several key data engineering concepts:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;API data extraction&lt;/li&gt;
&lt;li&gt;Data transformation using pandas&lt;/li&gt;
&lt;li&gt;Database integration with PostgreSQL&lt;/li&gt;
&lt;li&gt;Workflow orchestration with Apache Airflow&lt;/li&gt;
&lt;li&gt;Deployment to a cloud environment&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  System Architecture
&lt;/h2&gt;

&lt;p&gt;The pipeline consists of the following components:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Data Source&lt;/strong&gt;: Polygon.io API providing cryptocurrency price data&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;ETL Script&lt;/strong&gt;: Python script that handles extraction, transformation, and loading&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Database&lt;/strong&gt;: PostgreSQL for data storage&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Orchestration&lt;/strong&gt;: Apache Airflow for scheduling and monitoring&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Infrastructure&lt;/strong&gt;: Cloud VM for hosting the pipeline&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.&lt;/p&gt;

&lt;h2&gt;
  
  
  Detailed Implementation
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Step 1: Creating the ETL Script
&lt;/h3&gt;

&lt;p&gt;The first component is &lt;code&gt;btc_prices.py&lt;/code&gt;, which handles the core ETL functionality:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;dotenv&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;load_dotenv&lt;/span&gt;

&lt;span class="c1"&gt;# Define API endpoint
&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&amp;amp;apiKey=YOUR_API_KEY&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status_code&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="mi"&gt;200&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;open_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;open&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;close_price&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;close&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;date&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;day&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;else&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Failed to retrieve data: &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;status&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nf"&gt;exit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Prepare data for insertion
&lt;/span&gt;&lt;span class="n"&gt;data_df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;symbol&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;symbol&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;open_price&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;open_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;close_price&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;close_price&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;date&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data_df&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]).&lt;/span&gt;&lt;span class="n"&gt;dt&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;strftime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;%Y-%m-%d&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Load environment variables
&lt;/span&gt;&lt;span class="nf"&gt;load_dotenv&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="n"&gt;dbname&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;dbname&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;user&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;user&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;password&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;host&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;host&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;getenv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;port&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Create database connection
&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;postgresql://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;@&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;dbname&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;crypto_prices&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;schema&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;dataengineering&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Successfully loaded crypto data for &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;date&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;][&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This script:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Extracts Bitcoin price data from the Polygon.io API&lt;/li&gt;
&lt;li&gt;Transforms and structures the data using pandas&lt;/li&gt;
&lt;li&gt;Loads the data into PostgreSQL&lt;/li&gt;
&lt;li&gt;Uses environment variables for secure database connection management&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Step 2: Creating the Airflow DAG
&lt;/h3&gt;

&lt;p&gt;Next, the &lt;code&gt;btc_dag.py&lt;/code&gt; defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timedelta&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.bash_operator&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;BashOperator&lt;/span&gt;

&lt;span class="c1"&gt;# DAG default arguments
&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;data_engineer&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;depends_on_past&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2025&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;31&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_failure&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;email_on_retry&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retry_delay&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;minutes&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;polygon_btc_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;@daily&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="n"&gt;activate_venv&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BashOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;activate_virtual_env&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bash_command&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;source /home/user/project/venv/bin/activate&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;execute_file&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;BashOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;execute_python_file&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;bash_command&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;python /home/user/project/btc_prices.py&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;activate_venv&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;execute_file&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This DAG:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Defines the execution schedule&lt;/li&gt;
&lt;li&gt;Activates the virtual environment&lt;/li&gt;
&lt;li&gt;Executes the ETL script&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Step 3: Setting Up the Environment
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Creating a Virtual Environment&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   python &lt;span class="nt"&gt;-m&lt;/span&gt; venv venv
   &lt;span class="nb"&gt;source &lt;/span&gt;venv/bin/activate
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Installing Dependencies&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   pip &lt;span class="nb"&gt;install &lt;/span&gt;requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Setting Up Environment Variables&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"dbname=your_database_name"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; .env
   &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"user=your_database_user"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; .env
   &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"password=your_database_password"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; .env
   &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"host=your_database_host"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; .env
   &lt;span class="nb"&gt;echo&lt;/span&gt; &lt;span class="s2"&gt;"port=your_database_port"&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; .env
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 4: Server Deployment
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;SSH into the cloud VM&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   ssh user@your_server_ip
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Create necessary directories&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; ~/crypto_price
   &lt;span class="nb"&gt;mkdir&lt;/span&gt; &lt;span class="nt"&gt;-p&lt;/span&gt; ~/airflow/dags
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Transfer scripts to the server&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   scp btc_prices.py user@your_server_ip:~/crypto_price/
   scp btc_dag.py user@your_server_ip:~/airflow/dags/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 5: PostgreSQL Configuration
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Creating Database Schema&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;   &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;SCHEMA&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;dataengineering&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

   &lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt; &lt;span class="n"&gt;dataengineering&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;crypto_prices&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
       &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;SERIAL&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;symbol&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;open_price&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;close_price&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;8&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="nb"&gt;date&lt;/span&gt; &lt;span class="nb"&gt;DATE&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="k"&gt;NULL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
       &lt;span class="n"&gt;created_at&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="k"&gt;CURRENT_TIMESTAMP&lt;/span&gt;
   &lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;The architecture follows best practices for data engineering:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Separation of extraction, transformation, and loading concerns&lt;/li&gt;
&lt;li&gt;Secure credential management&lt;/li&gt;
&lt;li&gt;Robust error handling&lt;/li&gt;
&lt;li&gt;Automated scheduling&lt;/li&gt;
&lt;li&gt;Cloud-based deployment&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/Katumo-Zzetu/BTC-Pipeline.git" rel="noopener noreferrer"&gt;Github &lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>data</category>
      <category>apacheairflow</category>
    </item>
    <item>
      <title>Apache Airflow for Data Engineering: Best Practices and Real-World Examples</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Sun, 30 Mar 2025 17:44:57 +0000</pubDate>
      <link>https://dev.to/katumo/apache-airflow-for-data-engineering-best-practices-and-real-world-examples-ooo</link>
      <guid>https://dev.to/katumo/apache-airflow-for-data-engineering-best-practices-and-real-world-examples-ooo</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Apache Airflow, or simply Airflow, is an open-source tool and framework for running data pipelines in production environments. As an industry-leading workflow management platform, Airflow empowers data practitioners to define their data pipelines as code using Python, providing significant advantages over traditional scheduling tools by enabling version control, testing, and maintenance of complex workflows.&lt;/p&gt;

&lt;p&gt;Airflow enhances data pipeline management by offering robust scheduling capabilities, comprehensive monitoring, and detailed observability of pipeline performance. It serves as a centralized hub for all data workflows, whether you're preparing training data for machine learning models, transforming data for analytics, or persisting information in a data lake, with features designed for reliability and scalability.&lt;/p&gt;

&lt;p&gt;The core strength of Airflow lies in its ability to represent workflows as directed acyclic graphs (DAGs), explicitly defining dependencies between tasks and providing a clear visual representation of complex processes. This paradigm, combined with Airflow's extensible architecture, makes it uniquely suited to orchestrate the increasingly complex data needs of modern organizations.&lt;/p&gt;

&lt;h2&gt;
  
  
  Key Airflow Concepts and Terminology
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Core Concepts
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DAG (Directed Acyclic Graph)&lt;/strong&gt;: A DAG is the backbone of Airflow, structuring a workflow as a set of tasks with defined dependencies, ensuring no cycles exist. Written as Python code, it’s stored in the DAGs folder and dictates the sequence and relationships of operations.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Operator&lt;/strong&gt;: Operators are the building blocks of tasks, providing predefined templates for specific actions like running a script or sending an email. Popular examples include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;BashOperator&lt;/strong&gt;: Runs shell commands or scripts, perfect for quick command-line operations, though it depends on exit codes to determine success.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;PythonOperator&lt;/strong&gt;: Executes Python functions, offering flexibility for custom logic like processing data or querying APIs, with access to runtime context.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;EmailOperator&lt;/strong&gt;: Sends email notifications through an SMTP server, useful for alerting teams, though limited to basic messaging features.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Task&lt;/strong&gt;: A task is a single unit of work in a DAG, created by applying an operator with specific settings, such as running a bash command or a Python function. It represents an actionable step within the broader workflow.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Task Instance&lt;/strong&gt;: A task instance is a specific execution of a task, tied to a particular DAG run and timestamp, reflecting its state (e.g., running, succeeded, failed) at that moment.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Scheduling and Execution Concepts
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Schedule Interval&lt;/strong&gt;: This determines how frequently a DAG is triggered, set using cron-like expressions (e.g., &lt;code&gt;0 0 * * *&lt;/code&gt; for daily at midnight) or predefined intervals. It governs the timing of automated runs.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Execution Date&lt;/strong&gt;: The execution date marks the logical start time of a DAG run, often aligned with the schedule interval, though the actual run may occur later due to processing delays.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DAG Run&lt;/strong&gt;: A DAG run is a single instance of a DAG’s execution, initiated manually or by the scheduler, tied to a specific execution date and encompassing all task instances for that run.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Backfill&lt;/strong&gt;: Backfilling involves running a DAG for past periods, using historical execution dates to catch up on missed or retroactive data processing.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Airflow Architecture Components
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Scheduler&lt;/strong&gt;: The scheduler is the engine that monitors DAGs, triggers runs based on their schedule intervals, and assigns tasks to executors for processing. It ensures the timely and orderly execution of workflows.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Executor&lt;/strong&gt;: Executors manage how tasks are run, ranging from simple single-threaded execution to distributed setups. Common types include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;SequentialExecutor&lt;/strong&gt;: Runs tasks one-by-one, suitable for testing but not production.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;LocalExecutor&lt;/strong&gt;: Handles multiple tasks concurrently on one machine, balancing simplicity and efficiency.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;CeleryExecutor&lt;/strong&gt;: Distributes tasks across worker nodes for scalability, leveraging a message queue.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;KubernetesExecutor&lt;/strong&gt;: Launches tasks as individual Kubernetes pods, ideal for dynamic, cloud-native environments.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Web Server&lt;/strong&gt;: The web server powers Airflow’s user interface, offering a visual dashboard to monitor DAG runs, task statuses, and logs. It’s the primary tool for interacting with and overseeing workflows.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Metadata Database&lt;/strong&gt;: This database stores all operational data, including DAG definitions, task states, and run histories, serving as the central repository for Airflow’s state management.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Worker&lt;/strong&gt;: Workers are processes that execute tasks, typically used with distributed executors like CeleryExecutor, pulling tasks from a queue to process them.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Advanced Concepts
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;XComs (Cross-Communications)&lt;/strong&gt;: XComs enable tasks to share small pieces of data, like a file path or a result, by pushing and pulling values during execution. They’re handy for lightweight data exchange but not suited for large datasets.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Sensor&lt;/strong&gt;: Sensors are specialized operators that pause execution until a condition is met, such as a file appearing or an external process completing. They’re useful for waiting on dependencies outside the DAG’s control.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Writing DAG Definitions
&lt;/h2&gt;

&lt;p&gt;Let’s explore how to define a DAG in Airflow with a practical example suitable for beginners transitioning to more advanced concepts. This ETL pipeline fetches user data from an API, processes it, and loads it into a database, all written as Python code in a file placed in Airflow’s DAGs folder:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;timedelta&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;DAG&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;airflow.operators.python&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;PythonOperator&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;

&lt;span class="c1"&gt;# Set basic settings for the DAG
&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;owner&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;airflow&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;depends_on_past&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;start_date&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;2024&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retries&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;retry_delay&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="nf"&gt;timedelta&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;minutes&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;# Define the DAG (workflow)
&lt;/span&gt;&lt;span class="k"&gt;with&lt;/span&gt; &lt;span class="nc"&gt;DAG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;advanced_etl_pipeline&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;default_args&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;schedule_interval&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;@daily&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c1"&gt;# Runs once a day
&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;dag&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 1: Extract data from an API
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;extract&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;https://jsonplaceholder.typicode.com/users&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;  &lt;span class="c1"&gt;# Sample API
&lt;/span&gt;        &lt;span class="n"&gt;response&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;requests&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;response&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;json&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;DataFrame&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp/extracted_data.csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 2: Transform the data
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp/extracted_data.csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;[[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;email&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]]&lt;/span&gt;  &lt;span class="c1"&gt;# Keep only these columns
&lt;/span&gt;        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;columns&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;user_id&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;full_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;email_address&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;  &lt;span class="c1"&gt;# Rename columns
&lt;/span&gt;        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp/transformed_data.csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Step 3: Load data into a database
&lt;/span&gt;    &lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;load&lt;/span&gt;&lt;span class="p"&gt;():&lt;/span&gt;
        &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;postgresql://username:password@localhost:5432/etl_db&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/tmp/transformed_data.csv&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;customers&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Define tasks
&lt;/span&gt;    &lt;span class="n"&gt;extract_task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;extract&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;extract&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;transform_task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;transform&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;transform&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;load_task&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nc"&gt;PythonOperator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;task_id&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;load&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;python_callable&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;load&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Set the order: extract -&amp;gt; transform -&amp;gt; load
&lt;/span&gt;    &lt;span class="n"&gt;extract_task&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;transform_task&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;load_task&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This example demonstrates a realistic ETL process:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Extract&lt;/strong&gt;: Fetches user data from a public API and saves it as a CSV.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Transform&lt;/strong&gt;: Filters and renames columns, then saves the result.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Load&lt;/strong&gt;: Writes the data to a PostgreSQL database (update the connection string with your credentials).&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here’s how it’s built:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Imports&lt;/strong&gt;: Includes necessary libraries (&lt;code&gt;pandas&lt;/code&gt;, &lt;code&gt;requests&lt;/code&gt;, &lt;code&gt;sqlalchemy&lt;/code&gt;) alongside Airflow components.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Default Arguments&lt;/strong&gt;: Sets basic retry logic and a start date, keeping it minimal but functional.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;DAG Definition&lt;/strong&gt;: Uses a &lt;code&gt;with&lt;/code&gt; statement for cleaner syntax, scheduling it daily.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Tasks&lt;/strong&gt;: Three &lt;code&gt;PythonOperator&lt;/code&gt; tasks call the defined functions, linked sequentially with &lt;code&gt;&amp;gt;&amp;gt;&lt;/code&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;For beginners, start by running this in Airflow, replacing the database connection with your own, and observing the flow in the UI.&lt;/p&gt;

&lt;h2&gt;
  
  
  Best Practices for Apache Airflow DAGs
&lt;/h2&gt;

&lt;h3&gt;
  
  
  1. Creating DAGs
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Make Tasks Reusable and Modular&lt;/strong&gt;: Design tasks as standalone units with clear inputs and outputs, avoiding tight coupling to enable reuse across DAGs. Encapsulate logic using Python functions or external scripts, keeping DAG files focused on workflow definition.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Use Meaningful IDs and Descriptions&lt;/strong&gt;: Assign descriptive &lt;code&gt;dag_id&lt;/code&gt; and &lt;code&gt;task_id&lt;/code&gt; values (e.g., &lt;code&gt;extract_data&lt;/code&gt; instead of &lt;code&gt;task1&lt;/code&gt;) and add a &lt;code&gt;description&lt;/code&gt; to clarify purpose, improving readability and maintenance.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Avoid Top-Level Code Execution&lt;/strong&gt;: Place logic inside tasks or functions, not at the DAG file’s top level, to prevent unnecessary execution during parsing by the scheduler. For example, avoid API calls outside task definitions to reduce overhead.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  2. Testing and Validation
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Test DAG Loading&lt;/strong&gt;: Regularly validate DAG syntax and structure by running &lt;code&gt;airflow dags list&lt;/code&gt; or &lt;code&gt;python your_dag_file.py&lt;/code&gt; to catch errors early. This ensures the DAG can be parsed without runtime issues.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Unit Test Task Logic&lt;/strong&gt;: Write unit tests for Python functions used in &lt;code&gt;PythonOperator&lt;/code&gt; tasks using frameworks like &lt;code&gt;pytest&lt;/code&gt;, isolating business logic from Airflow execution. Mock external dependencies to focus on code behavior.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Simulate DAG Runs&lt;/strong&gt;: Use &lt;code&gt;airflow tasks test &amp;lt;dag_id&amp;gt; &amp;lt;task_id&amp;gt; &amp;lt;execution_date&amp;gt;&lt;/code&gt; to dry-run tasks and verify behavior without affecting the metadata database. This helps debug task execution locally.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  3. Workflow Design
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Ensure Idempotency&lt;/strong&gt;: Design tasks to produce consistent results regardless of how often they’re run with the same inputs, simplifying retries and backfills. For example, use &lt;code&gt;INSERT ... ON CONFLICT&lt;/code&gt; in SQL tasks to handle duplicates.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Minimize Task Dependencies&lt;/strong&gt;: Define only necessary dependencies to maximize parallelism and reduce complexity, using &lt;code&gt;&amp;gt;&amp;gt;&lt;/code&gt; or &lt;code&gt;set_upstream&lt;/code&gt; explicitly. Avoid over-sequencing tasks that can run independently.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Leverage SubDAGs or TaskGroups&lt;/strong&gt;: Organize complex workflows into SubDAGs (for reusable segments) or TaskGroups (for visual clarity), but prefer TaskGroups in newer versions as SubDAGs can complicate scheduling.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Pass Data Efficiently&lt;/strong&gt;: Use XComs for small data exchanges between tasks, but store larger datasets in external systems (e.g., S3, databases) and pass references (e.g., file paths) instead. This prevents memory overload in the metadata database.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  4. Operational Considerations
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Set Appropriate Retries&lt;/strong&gt;: Configure &lt;code&gt;retries&lt;/code&gt; and &lt;code&gt;retry_delay&lt;/code&gt; in &lt;code&gt;default_args&lt;/code&gt; to handle transient failures (e.g., network issues), balancing resilience with resource use. For example, &lt;code&gt;retries=3&lt;/code&gt; with &lt;code&gt;retry_delay=timedelta(minutes=5)&lt;/code&gt; is a common starting point.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Use Pools for Resource Limits&lt;/strong&gt;: Define pools to cap concurrent task execution for resource-intensive operations (e.g., database queries), preventing system overload. Set this via the Airflow UI or programmatically with &lt;code&gt;pool='my_pool'&lt;/code&gt;.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Secure Sensitive Data&lt;/strong&gt;: Store credentials and secrets in Airflow Connections or Variables (with encryption enabled) rather than hardcoding them in DAG files. Access them using &lt;code&gt;conn_id&lt;/code&gt; or &lt;code&gt;Variable.get()&lt;/code&gt; for security and flexibility.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Control Backfills with Catchup&lt;/strong&gt;: Set &lt;code&gt;catchup=False&lt;/code&gt; unless backfilling is intentional, and choose a logical &lt;code&gt;start_date&lt;/code&gt; to avoid unexpected historical runs when deploying new DAGs. This prevents scheduler overload on first deployment.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Monitor and Log&lt;/strong&gt;: Enable task logging and set up alerts (e.g., via &lt;code&gt;EmailOperator&lt;/code&gt; or custom callbacks) to track failures and performance, using the Airflow UI to inspect logs. Add custom logging in tasks to aid debugging.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Real-World Examples
&lt;/h2&gt;

&lt;p&gt;Apache Airflow’s versatility makes it a valuable tool across industries, orchestrating complex data workflows to enhance efficiency and decision-making. Below are examples of its application in financial services, retail and e-commerce, and healthcare.&lt;/p&gt;

&lt;h3&gt;
  
  
  Financial Services
&lt;/h3&gt;

&lt;p&gt;In financial services, Apache Airflow streamlines critical operations by automating data workflows with precision and scalability. It supports regulatory reporting to comply with laws like Dodd-Frank and GDPR, enhances fraud detection by curating data for machine learning models to identify anomalies, and aggregates data from multiple sources for a comprehensive customer view, while also aiding risk management and portfolio optimization.&lt;/p&gt;

&lt;h3&gt;
  
  
  Retail and E-Commerce
&lt;/h3&gt;

&lt;p&gt;In retail and e-commerce, Airflow boosts operational efficiency and customer satisfaction by managing data pipelines that integrate diverse sources. It automates inventory optimization, powers personalized marketing through customer behavior analysis, and improves supply chain coordination by syncing data across vendors, logistics, and sales for accurate demand forecasting and timely deliveries.&lt;/p&gt;

&lt;h3&gt;
  
  
  Healthcare
&lt;/h3&gt;

&lt;p&gt;In healthcare, Apache Airflow enhances patient care and operational efficiency by orchestrating complex medical data workflows. It automates the processing of electronic health record (EHR) data with compliance tracking, supports diagnostic processes by managing machine learning model training for medical imagery, and gathers actionable insights from patient data to improve outcomes and reduce manual effort.&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>data</category>
      <category>apacheairflow</category>
    </item>
    <item>
      <title>Amazon Redshift Data Warehousing (Sample Project)</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Wed, 26 Mar 2025 15:32:33 +0000</pubDate>
      <link>https://dev.to/luxdevhq/amazon-redshift-data-warehousing-sample-project-3h86</link>
      <guid>https://dev.to/luxdevhq/amazon-redshift-data-warehousing-sample-project-3h86</guid>
      <description>&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;This project focuses on analyzing two key business processes using Amazon Redshift:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Identifying the most common &lt;strong&gt;frequency of purchases&lt;/strong&gt; per season.&lt;/li&gt;
&lt;li&gt;Finding &lt;strong&gt;products with high review ratings&lt;/strong&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Step 1: Setting Up Amazon Redshift
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Create a Redshift Cluster&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Navigate to AWS Redshift Console.&lt;/li&gt;
&lt;li&gt;Click &lt;strong&gt;Create Cluster&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Choose a &lt;strong&gt;single-node&lt;/strong&gt; cluster.&lt;/li&gt;
&lt;li&gt;Select an appropriate &lt;strong&gt;node type&lt;/strong&gt; (e.g., &lt;code&gt;dc2.large&lt;/code&gt;).&lt;/li&gt;
&lt;li&gt;Set the &lt;strong&gt;database name, username, and password&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Click &lt;strong&gt;Create cluster&lt;/strong&gt;.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Create an IAM Role&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Go to the &lt;strong&gt;IAM Console&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Create a new role with &lt;strong&gt;AmazonS3ReadOnlyAccess&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Attach the role to your Redshift cluster.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Create an S3 Bucket and Upload Data&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Navigate to &lt;strong&gt;Amazon S3&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Create a new bucket (e.g., &lt;code&gt;zzetu&lt;/code&gt;).&lt;/li&gt;
&lt;li&gt;Upload &lt;code&gt;shopping_data.csv&lt;/code&gt; to this bucket.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Step 2: Loading Data into Redshift
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Use the Amazon Redshift Query Editor&lt;/strong&gt; to run SQL commands.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Create a table to store raw data&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Age&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Gender&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;Category&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="k"&gt;Location&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;Season&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;ReviewRating&lt;/span&gt; &lt;span class="nb"&gt;REAL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;SubscriptionStatus&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;PaymentMethod&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;ShippingType&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;DiscountApplied&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;PromoCodeUsed&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;PreviousPurchases&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;PreferredPaymentMethod&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Copy data from S3 into Redshift&lt;/strong&gt;:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;COPY&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="s1"&gt;'s3://zzetu/shopping_data.csv'&lt;/span&gt;
&lt;span class="n"&gt;IAM_ROLE&lt;/span&gt; &lt;span class="s1"&gt;'arn:aws:iam::your-account-id:role/your-redshift-role'&lt;/span&gt;
&lt;span class="n"&gt;FORMAT&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;CSV&lt;/span&gt;
&lt;span class="n"&gt;IGNOREHEADER&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 3: Schema Design
&lt;/h2&gt;

&lt;p&gt;We are using a star schema with 1 fact table and 3 dimension tables:&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;1. Dimension Tables&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;dimCustomer&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Age&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Gender&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="k"&gt;Location&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;SubscriptionStatus&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;PreferredPaymentMethod&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;dimProduct&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;Category&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;DiscountApplied&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;PromoCodeUsed&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;dimTransaction&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;TransactionID&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;IDENTITY&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;PaymentMethod&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;ShippingType&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;Season&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;50&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="k"&gt;FOREIGN&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;REFERENCES&lt;/span&gt; &lt;span class="n"&gt;dimCustomer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;2. Fact Table&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;factPurchases&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Age&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;ReviewRating&lt;/span&gt; &lt;span class="nb"&gt;REAL&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;PreviousPurchases&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="k"&gt;FOREIGN&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;REFERENCES&lt;/span&gt; &lt;span class="n"&gt;dimCustomer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 4: Data Insertion
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Insert data into dimCustomer&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;dimCustomer&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Age&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Gender&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;Location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;SubscriptionStatus&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PreferredPaymentMethod&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;DISTINCT&lt;/span&gt; &lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Age&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Gender&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;Location&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;SubscriptionStatus&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PreferredPaymentMethod&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Insert data into dimProduct&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;dimProduct&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Category&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DiscountApplied&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PromoCodeUsed&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;DISTINCT&lt;/span&gt; &lt;span class="n"&gt;Category&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DiscountApplied&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PromoCodeUsed&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Insert data into dimTransaction&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;dimTransaction&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PaymentMethod&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ShippingType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Season&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PaymentMethod&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ShippingType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Season&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Insert data into factPurchases&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;factPurchases&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Age&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ReviewRating&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PreviousPurchases&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;CustomerID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Age&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ReviewRating&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PreviousPurchases&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 5: Business Queries
&lt;/h2&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;1. Most Used Frequency of Purchase per Season&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Season&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;COUNT&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;PurchaseCount&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;factPurchases&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;dimTransaction&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Season&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;FrequencyOfPurchases&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Season&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;PurchaseCount&lt;/span&gt; &lt;span class="k"&gt;DESC&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  &lt;strong&gt;2. Products with High Review Ratings&lt;/strong&gt;
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;Category&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;AVG&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ReviewRating&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;AvgRating&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;fact_Purchases&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;
&lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;shopping&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;s&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CustomerID&lt;/span&gt;
&lt;span class="k"&gt;GROUP&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;Category&lt;/span&gt;
&lt;span class="k"&gt;ORDER&lt;/span&gt; &lt;span class="k"&gt;BY&lt;/span&gt; &lt;span class="n"&gt;AvgRating&lt;/span&gt; &lt;span class="k"&gt;DESC&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Setup Guide
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Create Redshift Cluster&lt;/strong&gt; and configure IAM roles.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Load Data from S3&lt;/strong&gt; into Redshift tables.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Run the SQL scripts&lt;/strong&gt; for creating tables and inserting data.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Execute business queries&lt;/strong&gt; to generate insights.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Repository Link
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://github.com/Katumo-Zzetu/Redshift-DE.git" rel="noopener noreferrer"&gt;GitHub Repository&lt;/a&gt;&lt;/p&gt;

</description>
      <category>database</category>
      <category>dataengineering</category>
      <category>data</category>
    </item>
    <item>
      <title>The Ultimate Guide to Apache Kafka: Basics, Architecture, and Core Concepts</title>
      <dc:creator>Eric Katumo</dc:creator>
      <pubDate>Mon, 10 Mar 2025 12:16:48 +0000</pubDate>
      <link>https://dev.to/luxdevhq/the-ultimate-guide-to-apache-kafka-31ce</link>
      <guid>https://dev.to/luxdevhq/the-ultimate-guide-to-apache-kafka-31ce</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F60pq9ixqqlfqpagvtc0j.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F60pq9ixqqlfqpagvtc0j.png" alt="Image description" width="800" height="593"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Introduction&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Apache Kafka is a widely-used open-source platform for distributed event streaming, supporting high-performance data pipelines, streaming analytics, data integration, and mission-critical applications across thousands of companies &lt;a href="https://kafka.apache.org/" rel="noopener noreferrer"&gt;https://kafka.apache.org/&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Originally developed by LinkedIn, Kafka is renowned for its high throughput, scalability and durability. It enables real-time data processing and is a key component in modern event-driven architectures.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Kafka Architecture&lt;/strong&gt;
&lt;/h3&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Brokers&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;A single Kafka server is called a Kafka Broker. Each broker operates as an independent process on a distinct machine, communicating with other brokers via a reliable and high-speed network.&lt;/p&gt;

&lt;p&gt;There can be any number of brokers but 3 are typically shown as a minimum. This allows one out for maintenance and one to fail at the same time.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Producers&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;A Producer is a client application that publishes (writes) events to a Kafka cluster. The Kafka producer is responsible for creating messages of the appropriate structure and sending them using the Kafka protocol. It has several configuration options to control message creation and delivery.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Consumers&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;The Kafka consumer works by issuing “fetch” requests to the brokers leading the partitions it wants to consume. The consumer receives back a chunk of log that contains all of the messages in that topic beginning from the offset position.&lt;/p&gt;

&lt;p&gt;A consumer group is a set of consumers that cooperate to consume data from some topics&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Topic&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Kafka topics are the categories used to organize messages. Topics are handled by Kafka as independent queues. This means that a consumer can subscribe to a specific topic and only receive messages marked with that topic.&lt;/p&gt;

&lt;p&gt;In Kafka, topics are partitioned and replicated across brokers throughout the implementation. Brokers refer to each of the nodes in a Kafka cluster&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Cluster&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Kafka clusters are a group of interconnected Kafka brokers that work together to manage the data streams entering and leaving a Kafka system. As user activity increases, so does the need for additional Kafka brokers to cope with the volume and velocity of the incoming data streams.&lt;/p&gt;

&lt;p&gt;Kafka clusters enable the replication of data partitions across multiple brokers, ensuring high availability even in the case of node failures. Your data pipeline remains robust and responsive to fluctuating demand.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Partitions&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Partitions are essential components within Kafka's distributed architecture that enable Kafka to scale horizontally, allowing for efficient parallel data processing. They are the building blocks for organizing and distributing data across the Kafka cluster.&lt;/p&gt;

&lt;p&gt;Each partition can have multiple replicas spread across different brokers, guaranteeing fault tolerance and data redundancy.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;KRaft&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;The consensus protocol that was introduced in KIP-500 to remove Apache Kafka’s dependency on ZooKeeper for metadata management. It leverages the Raft consensus algorithm to manage metadata and handle leader election natively within Kafka.&lt;/p&gt;

&lt;p&gt;This eliminates the dependency on an external coordination system, allowing Kafka to function as a self-contained system.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Core Concepts&lt;/strong&gt;
&lt;/h3&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Events&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;A digital logbook that keeps track of everything important happening in your system. This logbook is filled with "events" – like a record of each significant action or change. In Kafka, these events are the core of how data is stored and shared.&lt;/p&gt;

&lt;p&gt;Think of an event as a single entry in this logbook, with a few key pieces of information:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Key:&lt;/strong&gt; A unique identifier for the event. This helps you categorize or group related events.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Value:&lt;/strong&gt; The actual details of what happened.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Timestamp:&lt;/strong&gt; When the event occurred.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Headers (optional):&lt;/strong&gt; Extra bits of information about the event.&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Replication&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Replication is the process of having multiple copies of the data for the sole purpose of availability in case one of the brokers goes down and is unavailable to serve the requests. Copies of the partition are maintained at multiple broker instances using the partition’s write-ahead log.&lt;/p&gt;

&lt;p&gt;The write-ahead log is where all the messages for that partition are stored in order. The messages are identified by the unique offset.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Offsets&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;The consumer offset is a way of tracking the sequential order in which messages are received by Kafka topics.  Keeping track of the offset, or position, is important for nearly all Kafka use cases and can be an absolute necessity in certain instances, such as financial services.&lt;/p&gt;

&lt;p&gt;The Kafka consumer offset allows processing to continue from where it last left off if the stream application is turned off or if there is an unexpected failure. In other words, by having the offsets persist in a data store, data continuity is retained even when the stream application shuts down or fails.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Consumer Groups&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Consumer groups allow Kafka consumers to work together and process events from a topic in parallel. Each topic consists of one or more partitions. When a new consumer is started it will join a consumer group (this happens under the hood) and Kafka will then ensure that each partition is consumed by only one consumer from that group.&lt;/p&gt;

&lt;p&gt;So, if you have a topic with two partitions and only one consumer in a group, that consumer would consume records from both partitions. After another consumer joins the same group, each consumer would continue consuming only one partition.&lt;/p&gt;

&lt;h4&gt;
  
  
  &lt;strong&gt;Retention&lt;/strong&gt;
&lt;/h4&gt;

&lt;p&gt;Kafka retention provides the ability to control the size of the Topic logs and avoid outgrowing the existing disk size. The retention can be configured or controlled based on the size of the logs (log retention) or based on the configured duration (Time Based Retention).&lt;/p&gt;

&lt;p&gt;Also, the same retention can be set across all the Kafka topics or it can be configured per topic, depending on the nature of the topic we can set the retention accordingly.&lt;/p&gt;

&lt;h3&gt;
  
  
  &lt;strong&gt;Conclusion&lt;/strong&gt;
&lt;/h3&gt;

&lt;p&gt;Apache Kafka is a powerful distributed streaming platform that combines messaging and storage capabilities. Its architecture, featuring brokers, topics, and partitions, delivers scalability and fault tolerance. Kafka's core concepts, such as consumer groups and offsets, enable efficient and reliable stream processing.&lt;/p&gt;

&lt;p&gt;With its ability to handle high-volume data streams and support real-time applications, Kafka is a crucial component of modern data architectures. It empowers developers to build robust, data-driven applications that address the challenges of today's data-intensive world. To get started with Apache Kafka visit &lt;a href="https://kafka.apache.org/quickstart" rel="noopener noreferrer"&gt;https://kafka.apache.org/quickstart&lt;/a&gt;&lt;/p&gt;

</description>
      <category>dataengineering</category>
      <category>datascience</category>
      <category>data</category>
    </item>
  </channel>
</rss>
