DEV Community

Eric Katumo for LuxDevHQ

Posted on

2 1 1 1 2

Building an Automated Weather Data Pipeline with Apache Kafka and Cassandra

This article creates an end-to-end weather data pipeline that collects weather data from multiple African cities, processes it through Apache Kafka, and stores it in a Cassandra database for analysis. The infrastructure is hosted on Microsoft Azure.

Introduction

Weather data pipelines are essential components of modern environmental monitoring systems. They enable organizations to collect, process, and analyze meteorological information in real-time, facilitating better decision-making. This tutorial demonstrates how to build a simple yet robust weather data pipeline using:

  • Python for data fetching and processing
  • OpenWeatherMap API as our data source
  • Apache Kafka for handling real-time data streams
  • Apache Cassandra for scalable data storage
  • Microsoft Azure for cloud infrastructure

By the end of this article, you will understand how these technologies are combined to construct a data pipeline that can be scaled to more complex use cases.

Architecture Overview

Our weather data pipeline consists of the following workflow:

  1. Data Extraction: Weather data is extracted for five African cities from the OpenWeatherMap API using the producer script
  2. Data Streaming: Weather data is streamed into a Kafka topic
  3. Data Consumption: A consumer script reads from the Kafka topic
  4. Data Storage: The data is processed and stored in a Cassandra database
  5. Data Query: We can query the stored data using CQL (Cassandra Query Language)

Azure Infrastructure Setup

This project leverages Microsoft Azure for hosting our components:

  1. Azure Virtual Machine: Ubuntu 20.04 LTS server to run our Python scripts
  2. Network Security Group: Configured to allow necessary traffic for Kafka and Cassandra
  3. Azure Managed Disks: For persistent storage of Cassandra data

To set up an Azure VM for this project:

  1. Log in to the Azure Portal
  2. Create a new Virtual Machine with Ubuntu 20.04 LTS
  3. Select at least 2 vCPUs and 8GB RAM for optimal performance
  4. Configure networking to allow inbound SSH (port 22), Kafka (port 9092), and Cassandra (port 9042)
  5. Create and download SSH keys for secure access

After provisioning, connect to your VM using SSH:

ssh -i /path/to/your/key.pem azureuser@your-vm-ip-address
Enter fullscreen mode Exit fullscreen mode

Setting Up Confluent Cloud for Your Weather Data Pipeline

Before diving into the implementation, we'll set up a managed Kafka environment on Confluent Cloud.

Creating a Confluent Cloud Account

  1. Visit Confluent.io and click on "Get Started Free"
  2. Complete the registration process by providing your email and setting up a password
  3. Verify your email address to activate your account

Setting Up a Kafka Cluster

Once logged in to Confluent Cloud:

  1. Click on "Create cluster" in the dashboard
  2. Choose Azure as your cloud provider and select the region closest to your Azure VM
  3. Select the "Basic" cluster type for development purposes (you can upgrade later)
  4. Name your cluster (e.g., "weather-pipeline-cluster")
  5. Click "Launch cluster"

Creating a Kafka Topic

After your cluster is provisioned:

  1. Select your cluster from the dashboard
  2. Navigate to the "Topics" section in the left sidebar
  3. Click "Create topic"
  4. Enter "weather_data" as the topic name
  5. Set the number of partitions (start with 6 for this example)
  6. Leave the default retention settings (7 days)
  7. Click "Create with defaults" (or customize advanced settings if needed)

Creating API Keys for Authentication

To connect your application to the Confluent Cloud Kafka cluster:

  1. In your cluster dashboard, click on "API keys" in the left sidebar
  2. Click "Create key"
  3. Select "Global access" (or "Granular access" if you prefer more control)
  4. Provide a description (e.g., "Weather Pipeline API Key")
  5. Click "Create key"
  6. Important: Save both the API key and secret in a secure location as they will only be shown once

Testing the Connection

To verify your connection settings, create a simple test script on your Azure VM:

from confluent_kafka.admin import AdminClient
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Kafka configuration
kafka_config = {
    "bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('KAFKA_API_KEY'),
    "sasl.password": os.getenv('KAFKA_API_SECRET'),
}

# Create Admin client
admin_client = AdminClient(kafka_config)

# List topics
topics = admin_client.list_topics(timeout=10)
print("Available topics:", list(topics.topics.keys()))
Enter fullscreen mode Exit fullscreen mode

Setting Up Cassandra on Azure VM

We'll install Apache Cassandra directly on our Azure VM:

# Install Java
sudo apt update
sudo apt install -y openjdk-8-jdk

# Add the Apache Cassandra repository
echo "deb https://downloads.apache.org/cassandra/debian 40x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.list
curl https://downloads.apache.org/cassandra/KEYS | sudo apt-key add -
sudo apt update

# Install Cassandra
sudo apt install -y cassandra

# Start the service
sudo systemctl start cassandra
sudo systemctl enable cassandra

# Verify Cassandra is running
nodetool status
Enter fullscreen mode Exit fullscreen mode

Configure Cassandra to accept remote connections by editing /etc/cassandra/cassandra.yaml:

sudo nano /etc/cassandra/cassandra.yaml
Enter fullscreen mode Exit fullscreen mode

Change the following values:

listen_address: <your-vm-ip-address>
rpc_address: 0.0.0.0
seeds: "<your-vm-ip-address>"
Enter fullscreen mode Exit fullscreen mode

Restart Cassandra to apply changes:

sudo systemctl restart cassandra
Enter fullscreen mode Exit fullscreen mode

Setting Up the Environment

Now on your Azure VM, set up the development environment:

# Create a project directory
mkdir weather_data_pipeline
cd weather_data_pipeline

# Create and activate a virtual environment
python3 -m venv myvenv
source myvenv/bin/activate
Enter fullscreen mode Exit fullscreen mode

Then install the required packages:

pip install confluent-kafka requests python-dotenv cassandra-driver
Enter fullscreen mode Exit fullscreen mode

Create a .env file to store your environment variables:

WEATHER_API_KEY=your_openweathermap_api_key
BOOTSTRAP_SERVERS=your_confluent_bootstrap_servers
KAFKA_API_KEY=your_confluent_api_key
KAFKA_API_SECRET=your_confluent_api_secret
CASSANDRA_HOST=localhost
Enter fullscreen mode Exit fullscreen mode

Creating the Producer Script

The producer script is responsible for fetching weather data from the OpenWeatherMap API and sending it to a Kafka topic. Let's break down the key components of our weather_producer.py script:

import json
import time
import os
import requests
from confluent_kafka import Producer
from dotenv import load_dotenv
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# List of cities
cities = [
    "Nairobi",
    "Johannesburg",
    "Casablanca",
    "Lagos",
    "Kinshasa"
]

# OpenWeatherMap API setup
owm_api_key = os.getenv('WEATHER_API_KEY')
owm_base_url = "https://api.openweathermap.org/data/2.5/weather"

def fetch_weather_data(city):
    """Fetch weather data from OpenWeatherMap API using city name."""
    url = f"{owm_base_url}?q={city}&appid={owm_api_key}&units=metric"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        data["extracted_city"] = city
        return data
    except requests.exceptions.RequestException as e:
        logger.error(f"Error fetching data for {city}: {e}")
        return None

def delivery_report(err, msg):
    """Callback for Kafka message delivery status."""
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Kafka configuration
kafka_config = {
    "bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('KAFKA_API_KEY'),
    "sasl.password": os.getenv('KAFKA_API_SECRET'),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

producer = Producer(kafka_config)
topic = "weather_data"

def produce_weather_data():
    """Fetch weather data for each city and produce to Kafka."""
    for city in cities:
        data = fetch_weather_data(city)
        if data:
            producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
            producer.poll(0)
        else:
            logger.error(f"Failed to fetch data for {city}")
    producer.flush()

if __name__ == "__main__":
    produce_weather_data()
    logger.info("Data extraction and production complete")
Enter fullscreen mode Exit fullscreen mode

Notable Components of the Producer Script:

  1. Environment Setup: We import dotenv to load environment variables and initialize logging to track script execution.

  2. City List: We define a list of African cities for which we want to collect weather data.

  3. API Integration: The function fetch_weather_data() makes HTTP requests to the OpenWeatherMap API, asking for metric units of measurement for temperature.

  4. Kafka Configuration: We set up a Kafka producer with security credentials and reliability properties.

  5. Data Production: The produce_weather_data() method reads data for each city and produces it to the "weather_data" Kafka topic.

  6. Delivery Reporting: The delivery_report() callback method prints if messages were delivered to Kafka successfully.

Creating the Consumer Script

Now, let's create the consumer script that will read data from the Kafka topic and store it in Cassandra. Here is our weather_consumer.py script:

import json
import uuid
import os
from confluent_kafka import Consumer, KafkaError
from cassandra.cluster import Cluster
from dotenv import load_dotenv
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

# Kafka configuration
kafka_config = {
    "bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
    "group.id": "weather_consumer_group",
    "auto.offset.reset": "earliest",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('KAFKA_API_KEY'),
    "sasl.password": os.getenv('KAFKA_API_SECRET'),
    "broker.address.family": "v4",
}

# Cassandra configuration
cassandra_host = os.getenv('CASSANDRA_HOST')

# Connect to Cassandra
cluster = Cluster([cassandra_host])
session = None

def initialize_cassandra():
    """Initialize Cassandra connection and create keyspace/table if needed."""
    global session
    try:
        session = cluster.connect()

        # Create keyspace if it doesn't exist
        session.execute("""
            CREATE KEYSPACE IF NOT EXISTS weather_data
            WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
            AND durable_writes = true;
        """)

        # Use the keyspace
        session.execute("USE weather_data")

        # Create table if it doesn't exist
        session.execute("""
            CREATE TABLE IF NOT EXISTS simple_weather (
                id uuid PRIMARY KEY,
                city_name text,
                temperature float,
                timestamp timestamp,
                weather_description text,
                weather_main text
            );
        """)
        logger.info("Cassandra table ready")
        return True
    except Exception as e:
        logger.error(f"Cassandra initialization error: {e}")
        return False

def insert_weather_data(weather_data):
    """Insert weather data into Cassandra."""
    try:
        # Extract interesting fields
        city = weather_data["extracted_city"]
        temp = weather_data["main"]["temp"]
        timestamp = weather_data["dt"]
        weather_desc = weather_data["weather"][0]["description"]
        weather_main = weather_data["weather"][0]["main"]

        # Insert data into Cassandra
        query = """
            INSERT INTO simple_weather (id, city_name, temperature, timestamp, weather_description, weather_main)
            VALUES (%s, %s, %s, toTimestamp(now()), %s, %s)
        """
        session.execute(query, (uuid.uuid4(), city, temp, weather_desc, weather_main))
        logger.info(f"Inserted weather for {city} at {timestamp}")
        return True
    except Exception as e:
        logger.error(f"Error inserting data: {e}")
        return False

def consume_weather_data():
    """Consume weather data from Kafka and store in Cassandra."""
    # Initialize Cassandra
    if not initialize_cassandra():
        return

    # Create Kafka consumer
    consumer = Consumer(kafka_config)
    consumer.subscribe(["weather_data"])

    logger.info("Subscribed to topic: weather_data")

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    logger.error(f"Consumer error: {msg.error()}")
                    break
            try:
                weather_data = json.loads(msg.value())
                insert_weather_data(weather_data)
            except Exception as e:
                logger.error(f"Error processing message: {e}")

    except KeyboardInterrupt:
        logger.info("Stopping consumer")
    finally:
        consumer.close()
        cluster.shutdown()

if __name__ == "__main__":
    consume_weather_data()
Enter fullscreen mode Exit fullscreen mode

Key Components of the Consumer Script:

  1. Kafka Consumer Configuration: We set up a consumer to consume from the "weather_data" topic, with the appropriate security settings.

  2. Cassandra Connection: We connect to the Cassandra cluster directly without authentication credentials.

  3. Database Initialization: The initialize_cassandra() function creates the necessary keyspace and table if they haven't already been created.

  4. Data Processing: The insert_weather_data() function extracts pertinent fields from the weather data and inserts them into Cassandra.

  5. Continuous Consumption: The consume_weather_data() function continuously polls the Kafka topic and processes any messages it finds.

Running the Pipeline on Azure

Now that we've written both the producer and consumer scripts, let's run them on our Azure VM to see our data pipeline in action.

First, make sure your virtual environment is activated and all dependencies are installed. Then, run the producer script:

python3 weather_producer.py
Enter fullscreen mode Exit fullscreen mode

As shown in the first screenshot, the producer is successfully retrieving weather data and producing it to Kafka:

2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 8
2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 9
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 8
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 9
2025-04-07 05:56:47,072 - __main__ - INFO - Message delivered to weather_data [2] at offset 4
2025-04-07 05:56:47,072 - __main__ - INFO - Data extraction and production complete
Enter fullscreen mode Exit fullscreen mode

Next, run the consumer script in a separate terminal:

python3 weather_consumer.py
Enter fullscreen mode Exit fullscreen mode

The consumer subscribes to the Kafka topic and begins processing messages, as shown the second screenshot:

Subscribed to topic: weather_data
Cassandra table ready
Inserted weather for Johannesburg at 2025-04-07 05:56:45
Inserted weather for Lagos at 2025-04-07 05:55:29
Inserted weather for Nairobi at 2025-04-07 05:56:45
Inserted weather for Casablanca at 2025-04-07 05:53:05
Inserted weather for Kinshasa at 2025-04-07 05:56:46
Enter fullscreen mode Exit fullscreen mode

Querying the Data

Now that the data is in Cassandra, we can query it using CQL. On your Azure VM, use the cqlsh tool:

cqlsh localhost
Enter fullscreen mode Exit fullscreen mode

Then query to retrieve the weather data:

USE weather_data;
SELECT * FROM simple_weather;
Enter fullscreen mode Exit fullscreen mode

As in your third screenshot, this pulls back all of the weather data for our cities:

id                                   | city_name     | temperature | timestamp                       | weather_description | weather_main
--------------------------------------+--------------+-------------+--------------------------------+---------------------+-------------
9781d8f2-e7d1-484f-a780-4df61ed7c7da | Johannesburg |       15.02 | 2025-04-07 06:22:41.000000+0000 |    overcast clouds  |      Clouds
f64ab11f-a732-4d47-84fb-f450d1e4a9bc |     Kinshasa |       21.21 | 2025-04-07 06:22:41.000000+0000 |         few clouds  |      Clouds
01840d74-0b10-461b-98b4-8eaf5fce2e0c |      Nairobi |       18.62 | 2025-04-07 06:22:41.000000+0000 |     broken clouds   |      Clouds
546d4df6-45b4-4956-94b1-b92b92bc1e84 |        Lagos |       26.57 | 2025-04-07 06:22:41.000000+0000 |    overcast clouds  |      Clouds
95d0dc97-a940-45a9-ab5d-28b9deba7b02 |   Casablanca |       14.07 | 2025-04-07 06:20:30.000000+0000 |   scattered clouds  |      Clouds
(5 rows)
Enter fullscreen mode Exit fullscreen mode

Extending the Pipeline

This basic pipeline can be extended as follows:

  1. Add More Cities: Expand the list of cities to get a broader geographic coverage.

  2. Collect More Data Points: Modify the scripts to collect additional weather parameters like humidity, wind speed, and barometric pressure.

  3. Data Aggregation: Add functionality to calculate averages, minimums, and maximums over time intervals of varying sizes.

  4. Visualizations: Connect a visualization software like Grafana or Azure Data Explorer to your Cassandra database to create dashboards.

  5. Alerts: Configure alerts for extreme weather conditions by adding thresholds for temperature, rainfall, etc.

  6. Scale with Azure Kubernetes Service: For larger deployments, consider containerizing your applications and deploying them on AKS.

Common Issues and Troubleshooting

  1. API Rate Limiting: The OpenWeatherMap API rate limits free tier accounts. If you're dealing with a large amount of cities, you might hit these limits. Consider using a paid tier or request throttling.

  2. Kafka Connection Issues: Ensure that your Kafka credentials and bootstrap servers are correctly configured in the .env file.

  3. Cassandra Connectivity: Ensure that your Cassandra instance is accessible on your Azure VM and that the firewall rules allow connections.

  4. Azure VM Connectivity: Check that your Network Security Group allows the necessary inbound and outbound traffic.

  5. Disk Space: Monitor your Azure VM's disk space, especially if storing large amounts of historical weather data.

Resources

Top comments (0)