This article creates an end-to-end weather data pipeline that collects weather data from multiple African cities, processes it through Apache Kafka, and stores it in a Cassandra database for analysis. The infrastructure is hosted on Microsoft Azure.
Introduction
Weather data pipelines are essential components of modern environmental monitoring systems. They enable organizations to collect, process, and analyze meteorological information in real-time, facilitating better decision-making. This tutorial demonstrates how to build a simple yet robust weather data pipeline using:
- Python for data fetching and processing
- OpenWeatherMap API as our data source
- Apache Kafka for handling real-time data streams
- Apache Cassandra for scalable data storage
- Microsoft Azure for cloud infrastructure
By the end of this article, you will understand how these technologies are combined to construct a data pipeline that can be scaled to more complex use cases.
Architecture Overview
Our weather data pipeline consists of the following workflow:
- Data Extraction: Weather data is extracted for five African cities from the OpenWeatherMap API using the producer script
- Data Streaming: Weather data is streamed into a Kafka topic
- Data Consumption: A consumer script reads from the Kafka topic
- Data Storage: The data is processed and stored in a Cassandra database
- Data Query: We can query the stored data using CQL (Cassandra Query Language)
Azure Infrastructure Setup
This project leverages Microsoft Azure for hosting our components:
- Azure Virtual Machine: Ubuntu 20.04 LTS server to run our Python scripts
- Network Security Group: Configured to allow necessary traffic for Kafka and Cassandra
- Azure Managed Disks: For persistent storage of Cassandra data
To set up an Azure VM for this project:
- Log in to the Azure Portal
- Create a new Virtual Machine with Ubuntu 20.04 LTS
- Select at least 2 vCPUs and 8GB RAM for optimal performance
- Configure networking to allow inbound SSH (port 22), Kafka (port 9092), and Cassandra (port 9042)
- Create and download SSH keys for secure access
After provisioning, connect to your VM using SSH:
ssh -i /path/to/your/key.pem azureuser@your-vm-ip-address
Setting Up Confluent Cloud for Your Weather Data Pipeline
Before diving into the implementation, we'll set up a managed Kafka environment on Confluent Cloud.
Creating a Confluent Cloud Account
- Visit Confluent.io and click on "Get Started Free"
- Complete the registration process by providing your email and setting up a password
- Verify your email address to activate your account
Setting Up a Kafka Cluster
Once logged in to Confluent Cloud:
- Click on "Create cluster" in the dashboard
- Choose Azure as your cloud provider and select the region closest to your Azure VM
- Select the "Basic" cluster type for development purposes (you can upgrade later)
- Name your cluster (e.g., "weather-pipeline-cluster")
- Click "Launch cluster"
Creating a Kafka Topic
After your cluster is provisioned:
- Select your cluster from the dashboard
- Navigate to the "Topics" section in the left sidebar
- Click "Create topic"
- Enter "weather_data" as the topic name
- Set the number of partitions (start with 6 for this example)
- Leave the default retention settings (7 days)
- Click "Create with defaults" (or customize advanced settings if needed)
Creating API Keys for Authentication
To connect your application to the Confluent Cloud Kafka cluster:
- In your cluster dashboard, click on "API keys" in the left sidebar
- Click "Create key"
- Select "Global access" (or "Granular access" if you prefer more control)
- Provide a description (e.g., "Weather Pipeline API Key")
- Click "Create key"
- Important: Save both the API key and secret in a secure location as they will only be shown once
Testing the Connection
To verify your connection settings, create a simple test script on your Azure VM:
from confluent_kafka.admin import AdminClient
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Kafka configuration
kafka_config = {
"bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv('KAFKA_API_KEY'),
"sasl.password": os.getenv('KAFKA_API_SECRET'),
}
# Create Admin client
admin_client = AdminClient(kafka_config)
# List topics
topics = admin_client.list_topics(timeout=10)
print("Available topics:", list(topics.topics.keys()))
Setting Up Cassandra on Azure VM
We'll install Apache Cassandra directly on our Azure VM:
# Install Java
sudo apt update
sudo apt install -y openjdk-8-jdk
# Add the Apache Cassandra repository
echo "deb https://downloads.apache.org/cassandra/debian 40x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.list
curl https://downloads.apache.org/cassandra/KEYS | sudo apt-key add -
sudo apt update
# Install Cassandra
sudo apt install -y cassandra
# Start the service
sudo systemctl start cassandra
sudo systemctl enable cassandra
# Verify Cassandra is running
nodetool status
Configure Cassandra to accept remote connections by editing /etc/cassandra/cassandra.yaml
:
sudo nano /etc/cassandra/cassandra.yaml
Change the following values:
listen_address: <your-vm-ip-address>
rpc_address: 0.0.0.0
seeds: "<your-vm-ip-address>"
Restart Cassandra to apply changes:
sudo systemctl restart cassandra
Setting Up the Environment
Now on your Azure VM, set up the development environment:
# Create a project directory
mkdir weather_data_pipeline
cd weather_data_pipeline
# Create and activate a virtual environment
python3 -m venv myvenv
source myvenv/bin/activate
Then install the required packages:
pip install confluent-kafka requests python-dotenv cassandra-driver
Create a .env
file to store your environment variables:
WEATHER_API_KEY=your_openweathermap_api_key
BOOTSTRAP_SERVERS=your_confluent_bootstrap_servers
KAFKA_API_KEY=your_confluent_api_key
KAFKA_API_SECRET=your_confluent_api_secret
CASSANDRA_HOST=localhost
Creating the Producer Script
The producer script is responsible for fetching weather data from the OpenWeatherMap API and sending it to a Kafka topic. Let's break down the key components of our weather_producer.py
script:
import json
import time
import os
import requests
from confluent_kafka import Producer
from dotenv import load_dotenv
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# List of cities
cities = [
"Nairobi",
"Johannesburg",
"Casablanca",
"Lagos",
"Kinshasa"
]
# OpenWeatherMap API setup
owm_api_key = os.getenv('WEATHER_API_KEY')
owm_base_url = "https://api.openweathermap.org/data/2.5/weather"
def fetch_weather_data(city):
"""Fetch weather data from OpenWeatherMap API using city name."""
url = f"{owm_base_url}?q={city}&appid={owm_api_key}&units=metric"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
data["extracted_city"] = city
return data
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data for {city}: {e}")
return None
def delivery_report(err, msg):
"""Callback for Kafka message delivery status."""
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Kafka configuration
kafka_config = {
"bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv('KAFKA_API_KEY'),
"sasl.password": os.getenv('KAFKA_API_SECRET'),
"broker.address.family": "v4",
"message.send.max.retries": 5,
"retry.backoff.ms": 500,
}
producer = Producer(kafka_config)
topic = "weather_data"
def produce_weather_data():
"""Fetch weather data for each city and produce to Kafka."""
for city in cities:
data = fetch_weather_data(city)
if data:
producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
producer.poll(0)
else:
logger.error(f"Failed to fetch data for {city}")
producer.flush()
if __name__ == "__main__":
produce_weather_data()
logger.info("Data extraction and production complete")
Notable Components of the Producer Script:
Environment Setup: We import
dotenv
to load environment variables and initialize logging to track script execution.City List: We define a list of African cities for which we want to collect weather data.
API Integration: The function
fetch_weather_data()
makes HTTP requests to the OpenWeatherMap API, asking for metric units of measurement for temperature.Kafka Configuration: We set up a Kafka producer with security credentials and reliability properties.
Data Production: The
produce_weather_data()
method reads data for each city and produces it to the "weather_data" Kafka topic.Delivery Reporting: The
delivery_report()
callback method prints if messages were delivered to Kafka successfully.
Creating the Consumer Script
Now, let's create the consumer script that will read data from the Kafka topic and store it in Cassandra. Here is our weather_consumer.py
script:
import json
import uuid
import os
from confluent_kafka import Consumer, KafkaError
from cassandra.cluster import Cluster
from dotenv import load_dotenv
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Kafka configuration
kafka_config = {
"bootstrap.servers": os.getenv('BOOTSTRAP_SERVERS'),
"group.id": "weather_consumer_group",
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv('KAFKA_API_KEY'),
"sasl.password": os.getenv('KAFKA_API_SECRET'),
"broker.address.family": "v4",
}
# Cassandra configuration
cassandra_host = os.getenv('CASSANDRA_HOST')
# Connect to Cassandra
cluster = Cluster([cassandra_host])
session = None
def initialize_cassandra():
"""Initialize Cassandra connection and create keyspace/table if needed."""
global session
try:
session = cluster.connect()
# Create keyspace if it doesn't exist
session.execute("""
CREATE KEYSPACE IF NOT EXISTS weather_data
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
AND durable_writes = true;
""")
# Use the keyspace
session.execute("USE weather_data")
# Create table if it doesn't exist
session.execute("""
CREATE TABLE IF NOT EXISTS simple_weather (
id uuid PRIMARY KEY,
city_name text,
temperature float,
timestamp timestamp,
weather_description text,
weather_main text
);
""")
logger.info("Cassandra table ready")
return True
except Exception as e:
logger.error(f"Cassandra initialization error: {e}")
return False
def insert_weather_data(weather_data):
"""Insert weather data into Cassandra."""
try:
# Extract interesting fields
city = weather_data["extracted_city"]
temp = weather_data["main"]["temp"]
timestamp = weather_data["dt"]
weather_desc = weather_data["weather"][0]["description"]
weather_main = weather_data["weather"][0]["main"]
# Insert data into Cassandra
query = """
INSERT INTO simple_weather (id, city_name, temperature, timestamp, weather_description, weather_main)
VALUES (%s, %s, %s, toTimestamp(now()), %s, %s)
"""
session.execute(query, (uuid.uuid4(), city, temp, weather_desc, weather_main))
logger.info(f"Inserted weather for {city} at {timestamp}")
return True
except Exception as e:
logger.error(f"Error inserting data: {e}")
return False
def consume_weather_data():
"""Consume weather data from Kafka and store in Cassandra."""
# Initialize Cassandra
if not initialize_cassandra():
return
# Create Kafka consumer
consumer = Consumer(kafka_config)
consumer.subscribe(["weather_data"])
logger.info("Subscribed to topic: weather_data")
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logger.error(f"Consumer error: {msg.error()}")
break
try:
weather_data = json.loads(msg.value())
insert_weather_data(weather_data)
except Exception as e:
logger.error(f"Error processing message: {e}")
except KeyboardInterrupt:
logger.info("Stopping consumer")
finally:
consumer.close()
cluster.shutdown()
if __name__ == "__main__":
consume_weather_data()
Key Components of the Consumer Script:
Kafka Consumer Configuration: We set up a consumer to consume from the "weather_data" topic, with the appropriate security settings.
Cassandra Connection: We connect to the Cassandra cluster directly without authentication credentials.
Database Initialization: The
initialize_cassandra()
function creates the necessary keyspace and table if they haven't already been created.Data Processing: The
insert_weather_data()
function extracts pertinent fields from the weather data and inserts them into Cassandra.Continuous Consumption: The
consume_weather_data()
function continuously polls the Kafka topic and processes any messages it finds.
Running the Pipeline on Azure
Now that we've written both the producer and consumer scripts, let's run them on our Azure VM to see our data pipeline in action.
First, make sure your virtual environment is activated and all dependencies are installed. Then, run the producer script:
python3 weather_producer.py
As shown in the first screenshot, the producer is successfully retrieving weather data and producing it to Kafka:
2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 8
2025-04-07 05:56:47,013 - __main__ - INFO - Message delivered to weather_data [4] at offset 9
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 8
2025-04-07 05:56:47,035 - __main__ - INFO - Message delivered to weather_data [3] at offset 9
2025-04-07 05:56:47,072 - __main__ - INFO - Message delivered to weather_data [2] at offset 4
2025-04-07 05:56:47,072 - __main__ - INFO - Data extraction and production complete
Next, run the consumer script in a separate terminal:
python3 weather_consumer.py
The consumer subscribes to the Kafka topic and begins processing messages, as shown the second screenshot:
Subscribed to topic: weather_data
Cassandra table ready
Inserted weather for Johannesburg at 2025-04-07 05:56:45
Inserted weather for Lagos at 2025-04-07 05:55:29
Inserted weather for Nairobi at 2025-04-07 05:56:45
Inserted weather for Casablanca at 2025-04-07 05:53:05
Inserted weather for Kinshasa at 2025-04-07 05:56:46
Querying the Data
Now that the data is in Cassandra, we can query it using CQL. On your Azure VM, use the cqlsh
tool:
cqlsh localhost
Then query to retrieve the weather data:
USE weather_data;
SELECT * FROM simple_weather;
As in your third screenshot, this pulls back all of the weather data for our cities:
id | city_name | temperature | timestamp | weather_description | weather_main
--------------------------------------+--------------+-------------+--------------------------------+---------------------+-------------
9781d8f2-e7d1-484f-a780-4df61ed7c7da | Johannesburg | 15.02 | 2025-04-07 06:22:41.000000+0000 | overcast clouds | Clouds
f64ab11f-a732-4d47-84fb-f450d1e4a9bc | Kinshasa | 21.21 | 2025-04-07 06:22:41.000000+0000 | few clouds | Clouds
01840d74-0b10-461b-98b4-8eaf5fce2e0c | Nairobi | 18.62 | 2025-04-07 06:22:41.000000+0000 | broken clouds | Clouds
546d4df6-45b4-4956-94b1-b92b92bc1e84 | Lagos | 26.57 | 2025-04-07 06:22:41.000000+0000 | overcast clouds | Clouds
95d0dc97-a940-45a9-ab5d-28b9deba7b02 | Casablanca | 14.07 | 2025-04-07 06:20:30.000000+0000 | scattered clouds | Clouds
(5 rows)
Extending the Pipeline
This basic pipeline can be extended as follows:
Add More Cities: Expand the list of cities to get a broader geographic coverage.
Collect More Data Points: Modify the scripts to collect additional weather parameters like humidity, wind speed, and barometric pressure.
Data Aggregation: Add functionality to calculate averages, minimums, and maximums over time intervals of varying sizes.
Visualizations: Connect a visualization software like Grafana or Azure Data Explorer to your Cassandra database to create dashboards.
Alerts: Configure alerts for extreme weather conditions by adding thresholds for temperature, rainfall, etc.
Scale with Azure Kubernetes Service: For larger deployments, consider containerizing your applications and deploying them on AKS.
Common Issues and Troubleshooting
API Rate Limiting: The OpenWeatherMap API rate limits free tier accounts. If you're dealing with a large amount of cities, you might hit these limits. Consider using a paid tier or request throttling.
Kafka Connection Issues: Ensure that your Kafka credentials and bootstrap servers are correctly configured in the
.env
file.Cassandra Connectivity: Ensure that your Cassandra instance is accessible on your Azure VM and that the firewall rules allow connections.
Azure VM Connectivity: Check that your Network Security Group allows the necessary inbound and outbound traffic.
Disk Space: Monitor your Azure VM's disk space, especially if storing large amounts of historical weather data.
Top comments (0)