DEV Community

Cover image for Building a Real-Time Kafka + Cassandra Pipeline
GeraldM
GeraldM

Posted on

Building a Real-Time Kafka + Cassandra Pipeline

Introduction

Apache Kafka and Apache Cassandra pair effectively because they complement each other's strengths: Kafka handles high throughput, real-time event streaming and ingestion, while Cassandra provides scalable, fault tolerant and low-latency persistent storage for processed data.

Example: A movies streaming company from their platform may be streaming billions of events per day including user viewing behavior, playback metrics and content recommendations. Kafka enables real-time streaming and processing of this events. These high velocity streams are then consumed and persisted into Cassandra that acts as a highly scalable, fault tolerant database for storing time-series data and user activity logs. With this combination, the movie company is able to achieve massive write throughput, low latency reads by recommendation engines and reliable handling of global traffic while maintaining high reliability. That is how Netflix does it.

What is Apache Cassandra?

It is a free, open-source NoSQL database designed to handle large volumes of data across multiple nodes using a columnar storage architecture. It supports both read and write operations one every node (a node is a single server or machine within the Cassandra cluster that stores data and handles read and write requests) enabling data replication across nodes and ensuring high availability without a single point of failure.

Setting up Apache Cassandra

The following steps show how to download and start Cassandra:

  1. Make a folder for Cassandra.

    mkdir cassandra
    


    shell

  2. Download Cassandra using the following command.

    wget https://dlcdn.apache.org/cassandra/5.0.8/apache-cassandra-5.0.8-bin.tar.gz
    
  3. Extract archive to Cassandra folder.

    tar -xvf apache-cassandra-5.0.8-bin.tar.gz
    


    shell

  4. Set up environment variables.

    export CASSANDRA_HOME=~/cassandra/apache-cassandra-5.0.8-bin
    export PATH=$PATH:$CASSANDRA_HOME/bin
    
  5. Start Cassandra by navigating into the created cassandra folder after extraction cd apache-cassandra-5.0.8 then run the command bin/cassandra.

  6. Start the CQL shell
     Note: Cassandra Query Language (CQL) is the primary query language for Apache Cassandra, designed to feel familiar like SQL while working with Cassandra’s distributed wide-column data model. Unlike traditional SQL, CQL operates on Keyspaces (databases), tables (wide-column structures) and supports partition keys and clustering columns for data distribution and on-disk sorting. It allows you to create tables, insert data, perform queries with SELECT, ** WHERE** and ORDER BY and use lightweight transactions.

  7. Create a Keyspace(Database) and set the keyspace to be in use.
    Using:

    CREATE KEYSPACE weather_data2 WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
    


  8. Create a table in the Keyspace(database) and use a SELECT statement to confirm table creation.

Errors encountered:

  • Cassandra process being killed during startup: When this happens, it's likely because of not having enough memory. This can be addressed by editing the jvm-clients.options file and adding the following.
     This sets the amount of memory (ram) you want Cassandra to use. Initially, Cassandra calculates this values to half the total memory of your compute.

  • Not recommended to run Cassandra as root
     It is not an error but a recommendation. Avoid running Cassandra as root as there is a possibility of running into errors.
     Using bin/cassandra, a successful Cassandra start looks like this. Leave that terminal open and start another terminal to proceed with the next steps.
    To stop Cassandra use: pkill -f cassandra

Kafka-Cassandra Pipeline

Let's use an example of where we stream real-time weather data from OpenWeather API using Kafka and store it on Cassandra database.

1. Kafka Producer + OpenWeather API

from kafka import KafkaProducer
import requests
from dotenv import load_dotenv
import os
import json

load_dotenv()

API_KEY = os.getenv('API_KEY')

def pull_weather_data():
    cities = ['New York', 'London', 'Johannesburg', 'Nairobi', 'Cairo', 'Doha', 'Tokyo', 'Sydney']
    cities_weather_data = []

    for city in cities:
        url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}"
        response = requests.get(url)
        weather_data = response.json()

        cities_weather_data.append({
            'City': weather_data['name'],
            'Country': weather_data['sys']['country'],
            'Temparature': weather_data['main']['temp'],
            'Humidity': weather_data['main']['humidity'],
            'Feels_Like': weather_data['main']['feels_like'],
            'Last_update_time': weather_data['dt']
        })

    return cities_weather_data

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

topic = 'open_weather_api_cities_data'

while True:
    weather_data = pull_weather_data()
    producer.send(topic, weather_data)
    print(f'From openweather: {weather_data}')
    time.sleep(10)
Enter fullscreen mode Exit fullscreen mode

The above python code (producer), utilizes a REST API to collect weather data of specified cities from OpenWeather and write the data into a Kafka topic.
To understand more on Kafka and Kafka producers, checkout the following article:
A Beginners guide to Real-time Data Streaming with Apache Kafka

  • Output you get after running the Python Producer code:

2. Kafka Consumer + Insert into Cassandra

from kafka import KafkaConsumer
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from datetime import datetime
import json

cluster = Cluster(['localhost'])
session = cluster.connect()
session.set_keyspace('kafka_data')

print('Connected to cassandra')

insert_query = SimpleStatement(
    '''
    INSERT INTO cities_weather_data(
        city,
        country,
        last_update_time,
        temperature,
        feels_like,
        humidity
    )
    values (%s, %s, %s, %s, %s, %s)
    '''
)

consumer = KafkaConsumer(
    'open_weather_api_cities_data',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print('Connected to Kafka and Consumer started...')

for message in consumer:
    weather_dict_data = message.value
    print(weather_dict_data)

    for data in weather_dict_data:
        timestamp = datetime.fromtimestamp(data["Last_update_time"])

        session.execute(
            insert_query,
            (
                data['City'],
                data['Country'],
                timestamp,
                data['Temparature'],
                data['Feels_Like'],
                data['Humidity']
            )
        )

    print(f"Inserted: {data['City']}")
Enter fullscreen mode Exit fullscreen mode
  • Start by importing dependencies which include KafkaConsumer for connecting to Kafka and consume messages from a topic, Cluster which connects python to a Cassandra database cluster, SimpleStatement which prepares and executes Cassandra Query Language (CQL) statements, datetime to convert timestamps to python datetime and json to deserialize incoming Kafka json messages into python dictionaries.
  • Connect to Cassandra. Cluster(['localhost']) connects to a local instance of Cassandra, cluster.connect() creates a session used for communication with Cassandra and session.set_keyspace('kafka_data') instructs Cassandra to use the "kafka_data" database for all operations.
  • Prepare an insert Query using SimpleStatement() and store it in a variable.
  • Connect to Kafka. KafkaConsumer() connects to Kafka, subscribes to the defined topic "open_weather_api_cities_data", connects to the local running kafka instance using bootstrap_servers = 'localhost:9092', auto_offset_reset = 'earliest' instructs the consumer to start reading from the first message in the topic and then deserialize the incoming messages to python dictionaries.
  • Using an infinite for loop, read every message on the topic and keep the consumer open waiting for new incoming messages.
  • weather_dict_data = message.value extract the weather data from he messages and store it in a variable.
  • The producer sends a list of all the city weather records, using a for data in weather_dict_data loop, process one city at a time and insert the data into Cassandra using session.execute() which executes the prepared Cassandra query while appending the obtained data.
  • On our data from OpenWeather, our timestamp was in unix, using timestamp = datetime.fromtimestamp(data["Last_update_time"]) we convert it to datetime. From 1725344440 to 2024-09-03 10:20:40 Why? For human readability.
  • Lastly we print print(f"Inserted: {data['City']}") as successful insert message to confirm data insertion.
  • The output we get after running the python Consumer code:
  • Query the database to get the result of the written data on Cassandra

Conclusion

By consuming weather data from a Kafka topic, transforming it into a structured format and writing it into a Cassandra keyspace(Database), we have built a simple scalable architecture capable of handling continuous high-throughput data streams. This is pattern is a common foundation in modern data engineering systems where Kafka acts as the collection/ingestion layer and Cassandra database serves as a storage layer optimized for fast read and writes. Together, these technologies enable reliable end-to-end pipelines used in applications such as monitoring systems, IoT platforms and real-time analytics engines. From here, the architecture can be extended further to include processing layers such as Spark or Flink or routing data into search and observability tools like Elasticsearch.

Top comments (0)