In my last article, we broke down the core concepts of Apache Kafka — moving away from scheduled batch jobs and shifting toward real-time event streaming.
This article focuses on a practical Kafka project to help you understand the architecture better. The goal is simple: fetch live weather metrics from the OpenWeather API, stream those events instantly through an Apache Kafka topic, and consume them downstream to be stored in Apache Cassandra for time-based analysis.
To make this happen, we will be writing two standalone Python scripts: a custom weather_producer.py script to fetch and stream the data, and a separate weather_consumer.py script to read the stream and save it to our database.
Upgrading from Kafka 3.9.2 to Kafka 4.0 (KRaft Native)
Before writing our scripts, we have to address our Kafka environment. In my earlier practice sessions, I was running Apache Kafka version 3.9.2, which still relied heavily on an external service called Apache ZooKeeper to manage cluster metadata.
However, with the release of Apache Kafka 4.x, ZooKeeper support has been completely removed. Kafka now runs entirely in KRaft (Kafka Raft) mode, managing its own internal metadata log natively. This simplifies things dramatically, meaning we only have to run a single Kafka process instead of managing two entirely different software setups on our machine.
To get ready for this project, I had to completely clean out my old local environment. If you are following along on WSL or Linux, here is exactly how to uninstall version 3.9.2 and upgraded to Kafka 4.0:
i). Removing Kafka 3.9.2
First, stop any running instances of ZooKeeper and Kafka, then wipe out the old binaries and the local data directories to prevent any legacy configuration conflicts:
# Delete the old installation directory
rm -rf ~/kafka_2.13-3.9.2
# Clear out local temporary data/log directories used by the old version
rm -rf /tmp/zookeeper
rm -rf /tmp/kafka-logs
ii). Installing and Formatting Kafka 4.0
KRaft requires you to explicitly generate a cluster ID and format your storage directory before launching the broker:
# Download and extract Kafka 4.0
wget https://archive.apache.org/dist/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0
# Generate a unique cluster ID for KRaft
KRAFT_CLUSTER_ID=$(bin/kafka-storage.sh random-uuid)
# Format your storage log directory using that ID
bin/kafka-storage.sh format -t $KRAFT_CLUSTER_ID -c config/kraft/server.properties
Now, starting Kafka is as simple as running one single command targeting our KRaft configuration file:
bin/kafka-server-start.sh config/kraft/server.properties
Step 1: Creating the Weather Topic
Before our scripts can send or read data, our new 4.0 broker needs a destination ready. Using the Kafka CLI tools, create a dedicated topic named weather_updates with 3 partitions to allow for parallel consumption down the line:
bin/kafka-topics.sh --create --topic weather_updates --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
weather_updates topic
Step 2: Writing the Python Producer (weather_producer.py)
Our first script is a standalone Python script called weather_producer.py. Its sole job is to talk to the OpenWeather API, fetch the raw metrics, and hand the payload over to Kafka. We use the kafka-python library to handle the client connection.
Notice the continuous while True loop. Unlike Airflow DAGs that run and stop, streaming application scripts run indefinitely in your terminal:
# weather_producer.py
import time
import json
import requests
from kafka import KafkaProducer
API_KEY = "YOUR_OPENWEATHER_API_KEY"
CITY = "Nairobi"
URL = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}"
# Initialize Kafka Producer targeting our KRaft Broker port
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("--- Starting Weather Producer Loop ---")
while True:
try:
response = requests.get(URL)
if response.status_code == 200:
weather_data = response.json()
# Send payload to our Kafka topic
producer.send('weather_updates', value=weather_data)
print(f"Sent event to Kafka: {weather_data['name']} - {weather_data['main']['temp']}K")
except Exception as e:
print(f"Error fetching/sending data: {e}")
# Poll the API every 10 seconds to create a continuous stream
time.sleep(10)
Step 3: Setting Up Apache Cassandra For the First Time
If you have never used Apache Cassandra before, it is a distributed NoSQL database built specifically for lightning-fast write speeds and handling massive time-series data.
Because it doesn't come pre-installed on standard Linux distributions, we will need to install it first - binary version. Here is the step-by-step process to set it up on my WSL environment for the first time:
# 1. Navigate to your home directory
cd ~
sudo apt update
sudo apt install openjdk-11-jdk -y
It is important to note that Cassandra 3.x and 4.x strictly require Java 8 or Java 11.
Since we updated to Kafka 4.0 which requires Java 17 or Java 21, Cassandra will instantly crash on startup.
Therefore, run the above command to download and install Java 11 side-by-side with your existing Java 17.
# 2. Download the official Apache Cassandra binary tarball
wget https://archive.apache.org/dist/cassandra/4.1.4/apache-cassandra-4.1.4-bin.tar.gz
# 3. Extract the tarball
tar -xzf apache-cassandra-4.1.4-bin.tar.gz
# 4. Rename the folder to just 'cassandra' for cleaner navigation
mv apache-cassandra-4.1.4 cassandra
# 5. Clean up the downloaded tar.gz file to save space
rm apache-cassandra-4.1.4-bin.tar.gz
Now, everything for Cassandra lives explicitly inside ~/cassandra
Start Cassandra
Ubuntu allows you to have multiple versions of Java installed at the same time. You can pick which one is active by running:
sudo update-alternatives --config java
Look for the row that mentions java-11-openjdk. Type that selection number and hit Enter.
To make sure your terminal is officially using the older version, run java -version.
It should now say openjdk version "11.0.x"
Because Cassandra here is a binary version, to start it up, we will execute the startup script directly from the folder:
cd ~/cassandra
bin/cassandra -f
A large stream of startup logs will begin scrolling down your screen. The specific line INFO [main] ... Startup completed near the bottom confirms Cassandra started successfully.
Once the logs stop moving and Cassandra stays open safely:
Leave that terminal window completely alone (let it keep running).
Open a brand-new terminal tab/window in WSL.
Navigate to your folder and run your cluster status tool:
cd ~/cassandra
bin/nodetool status
You should see your clean grid return with UN 127.0.0.1

Step 4: Creating the Storage Schema
Now that Cassandra is running, we can log into the Cassandra Query Language shell (cqlsh) right from our command line to create our schema.
We will create a Keyspace (Cassandra’s version of a database schema) and a table structured specifically for historical analysis, sorting rows chronologically using a clustered timestamp:
cqlsh
Inside the interactive shell, run the following commands:
CREATE KEYSPACE weather_analytics
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE weather_analytics;
CREATE TABLE city_weather (
city_name text,
timestamp timestamp,
temperature float,
humidity int,
wind_speed float,
PRIMARY KEY (city_name, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);
Step 5: Writing the Python Consumer (weather_consumer.py)
Our second standalone script is weather_consumer.py. It runs completely independently from the producer, continuously listening to the weather_updates topic, parsing the incoming JSON data using the cassandra-driver client, and appending the records directly into Cassandra.
# weather_consumer.py
from kafka import KafkaConsumer
from cassandra.cluster import Cluster
import json
# Connect to local Cassandra instance
cassandra_cluster = Cluster(['127.0.0.1'])
session = cassandra_cluster.connect('weather_analytics')
# Subscribe to Kafka Topic
consumer = KafkaConsumer(
'weather_updates',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
print("--- Weather Consumer Listening for Events ---")
for message in consumer:
event = message.value
# Extract specific nested fields from OpenWeather JSON payload
city = event['name']
temp = event['main']['temp']
humid = event['main']['humidity']
wind = event['wind']['speed']
# Insert statement into Cassandra NoSQL table
insert_query = """
INSERT INTO city_weather (city_name, timestamp, temperature, humidity, wind_speed)
VALUES (%s, toTimestamp(now()), %s, %s, %s);
"""
session.execute(insert_query, (city, temp, humid, wind))
print(f"Successfully streamed and stored record for: {city}")
Verifying the Streaming Pipeline Data Flow
When both independent python scripts are active at the same time, the pipeline functions as a living unit. To confirm that our real-time records are landing correctly inside our NoSQL storage layer, we can log into the Cassandra shell (cqlsh) and run a quick verification query:
SELECT * FROM weather_analytics.city_weather LIMIT 5;
Major Lessons from Entering the Streaming Space
- Upgrading simplifies infrastructure: Dropping ZooKeeper and upgrading from 3.9.2 to 4.x made managing the local environment much smoother.
- Decoupling creates stability: If the Cassandra database goes down for maintenance, the Producer script doesn't care. It keeps pulling weather data and sending it to Kafka. Kafka will safely hold onto those messages until Cassandra recovers and the Consumer script turns back on. This completely prevents data loss!
What real-time data sources are you planning to stream in your next project? Let's brainstorm ideas in the comments below!




Top comments (0)