Introduction
Apache Kafka and Apache Cassandra pair effectively because they complement each other's strengths: Kafka handles high throughput, real-time event streaming and ingestion, while Cassandra provides scalable, fault tolerant and low-latency persistent storage for processed data.
Example: A movies streaming company from their platform may be streaming billions of events per day including user viewing behavior, playback metrics and content recommendations. Kafka enables real-time streaming and processing of this events. These high velocity streams are then consumed and persisted into Cassandra that acts as a highly scalable, fault tolerant database for storing time-series data and user activity logs. With this combination, the movie company is able to achieve massive write throughput, low latency reads by recommendation engines and reliable handling of global traffic while maintaining high reliability. That is how Netflix does it.
What is Apache Cassandra?
It is a free, open-source NoSQL database designed to handle large volumes of data across multiple nodes using a columnar storage architecture. It supports both read and write operations one every node (a node is a single server or machine within the Cassandra cluster that stores data and handles read and write requests) enabling data replication across nodes and ensuring high availability without a single point of failure.
Setting up Apache Cassandra
The following steps show how to download and start Cassandra:
-
Make a folder for Cassandra.
mkdir cassandra
shell -
Download Cassandra using the following command.
wget https://dlcdn.apache.org/cassandra/5.0.8/apache-cassandra-5.0.8-bin.tar.gz -
Extract archive to Cassandra folder.
tar -xvf apache-cassandra-5.0.8-bin.tar.gz
shell -
Set up environment variables.
export CASSANDRA_HOME=~/cassandra/apache-cassandra-5.0.8-bin export PATH=$PATH:$CASSANDRA_HOME/bin Start Cassandra by navigating into the created cassandra folder after extraction
cd apache-cassandra-5.0.8then run the commandbin/cassandra.Start the CQL shell
Note: Cassandra Query Language (CQL) is the primary query language for Apache Cassandra, designed to feel familiar like SQL while working with Cassandra’s distributed wide-column data model. Unlike traditional SQL, CQL operates on Keyspaces (databases), tables (wide-column structures) and supports partition keys and clustering columns for data distribution and on-disk sorting. It allows you to create tables, insert data, perform queries with SELECT, ** WHERE** and ORDER BY and use lightweight transactions.-
Create a Keyspace(Database) and set the keyspace to be in use.
Using:
CREATE KEYSPACE weather_data2 WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1}; Create a table in the Keyspace(database) and use a SELECT statement to confirm table creation.

Errors encountered:
Cassandra process being killed during startup: When this happens, it's likely because of not having enough memory. This can be addressed by editing the
jvm-clients.optionsfile and adding the following.
This sets the amount of memory (ram) you want Cassandra to use. Initially, Cassandra calculates this values to half the total memory of your compute.Not recommended to run Cassandra as root
It is not an error but a recommendation. Avoid running Cassandra as root as there is a possibility of running into errors.
Using bin/cassandra, a successful Cassandra start looks like this. Leave that terminal open and start another terminal to proceed with the next steps.
To stop Cassandra use:pkill -f cassandra
Kafka-Cassandra Pipeline
Let's use an example of where we stream real-time weather data from OpenWeather API using Kafka and store it on Cassandra database.

1. Kafka Producer + OpenWeather API
from kafka import KafkaProducer
import requests
from dotenv import load_dotenv
import os
import json
load_dotenv()
API_KEY = os.getenv('API_KEY')
def pull_weather_data():
cities = ['New York', 'London', 'Johannesburg', 'Nairobi', 'Cairo', 'Doha', 'Tokyo', 'Sydney']
cities_weather_data = []
for city in cities:
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}"
response = requests.get(url)
weather_data = response.json()
cities_weather_data.append({
'City': weather_data['name'],
'Country': weather_data['sys']['country'],
'Temparature': weather_data['main']['temp'],
'Humidity': weather_data['main']['humidity'],
'Feels_Like': weather_data['main']['feels_like'],
'Last_update_time': weather_data['dt']
})
return cities_weather_data
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
topic = 'open_weather_api_cities_data'
while True:
weather_data = pull_weather_data()
producer.send(topic, weather_data)
print(f'From openweather: {weather_data}')
time.sleep(10)
The above python code (producer), utilizes a REST API to collect weather data of specified cities from OpenWeather and write the data into a Kafka topic.
To understand more on Kafka and Kafka producers, checkout the following article:
A Beginners guide to Real-time Data Streaming with Apache Kafka
- Output you get after running the Python Producer code:
2. Kafka Consumer + Insert into Cassandra
from kafka import KafkaConsumer
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from datetime import datetime
import json
cluster = Cluster(['localhost'])
session = cluster.connect()
session.set_keyspace('kafka_data')
print('Connected to cassandra')
insert_query = SimpleStatement(
'''
INSERT INTO cities_weather_data(
city,
country,
last_update_time,
temperature,
feels_like,
humidity
)
values (%s, %s, %s, %s, %s, %s)
'''
)
consumer = KafkaConsumer(
'open_weather_api_cities_data',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print('Connected to Kafka and Consumer started...')
for message in consumer:
weather_dict_data = message.value
print(weather_dict_data)
for data in weather_dict_data:
timestamp = datetime.fromtimestamp(data["Last_update_time"])
session.execute(
insert_query,
(
data['City'],
data['Country'],
timestamp,
data['Temparature'],
data['Feels_Like'],
data['Humidity']
)
)
print(f"Inserted: {data['City']}")
- Start by importing dependencies which include KafkaConsumer for connecting to Kafka and consume messages from a topic, Cluster which connects python to a Cassandra database cluster, SimpleStatement which prepares and executes Cassandra Query Language (CQL) statements, datetime to convert timestamps to python datetime and json to deserialize incoming Kafka json messages into python dictionaries.
- Connect to Cassandra.
Cluster(['localhost'])connects to a local instance of Cassandra,cluster.connect()creates a session used for communication with Cassandra andsession.set_keyspace('kafka_data')instructs Cassandra to use the "kafka_data" database for all operations. - Prepare an insert Query using
SimpleStatement()and store it in a variable. - Connect to Kafka.
KafkaConsumer()connects to Kafka, subscribes to the defined topic "open_weather_api_cities_data", connects to the local running kafka instance usingbootstrap_servers = 'localhost:9092', auto_offset_reset = 'earliest' instructs the consumer to start reading from the first message in the topic and then deserialize the incoming messages to python dictionaries. - Using an infinite for loop, read every message on the topic and keep the consumer open waiting for new incoming messages.
-
weather_dict_data = message.valueextract the weather data from he messages and store it in a variable. - The producer sends a list of all the city weather records, using a
for data in weather_dict_dataloop, process one city at a time and insert the data into Cassandra using session.execute() which executes the prepared Cassandra query while appending the obtained data. - On our data from OpenWeather, our timestamp was in unix, using
timestamp = datetime.fromtimestamp(data["Last_update_time"])we convert it to datetime. From1725344440to2024-09-03 10:20:40Why? For human readability. - Lastly we print
print(f"Inserted: {data['City']}")as successful insert message to confirm data insertion. - The output we get after running the python Consumer code:
- Query the database to get the result of the written data on Cassandra
Conclusion
By consuming weather data from a Kafka topic, transforming it into a structured format and writing it into a Cassandra keyspace(Database), we have built a simple scalable architecture capable of handling continuous high-throughput data streams. This is pattern is a common foundation in modern data engineering systems where Kafka acts as the collection/ingestion layer and Cassandra database serves as a storage layer optimized for fast read and writes. Together, these technologies enable reliable end-to-end pipelines used in applications such as monitoring systems, IoT platforms and real-time analytics engines. From here, the architecture can be extended further to include processing layers such as Spark or Flink or routing data into search and observability tools like Elasticsearch.

Top comments (0)