DEV Community

Cover image for A Beginners guide to Real-time Data Streaming with Apache Kafka
GeraldM
GeraldM

Posted on

A Beginners guide to Real-time Data Streaming with Apache Kafka

Introduction

Ever wondered how banks are able to detect and stop fraud in real-time? This is how they do it.
Banks process thousands of transactions every second, end each transaction is recorded in backend systems as it happens. Instead of relying on traditional batch processing, With Kafka they are able to stream/read all these new events that are being recorded in backend systems. These events are then consumed by processing services that apply fraud detection rules, analyze patterns and flag suspicious activities. With this, in a matter of seconds, they are able to determine if your transaction is fraudulent or not and r the result stored on downstream systems for immediate action.

what is event streaming?

Event streaming is the practice of capturing data in real-time from sources like databases, sensors, cloud services and software applications in the form of streams of events, storing these streams durably for later retrieval and routing the event streams to different destination technologies as needed.

What is Apache Kafka?

Kafka is a distributed event streaming platform consisting of servers and clients that communicate via a high performance binary protocol built on TCP.

Key Components of Apache Kafka

The following are the key components that make up kafka.

1. Servers(brokers)
store and retrieve data
Kafka is run as a cluster of one or more servers that can span multiple datacenters or cloud regions. Some of the servers form the storage layer called the brokers and others run Kafka Connect to enable continuous import and export of data as event streams from existing systems such as databases and other kafka clusters. Kafka is fault tolerant such that if one of it's servers fails, the other servers will take over their work.

2. Clients(producers and consumers)
publish and read events
They allow you to write distributed applications and micro-services that read, write and process streams of events in parallel, at scale and in a fault-tolerant manner.

3. Producers
They publish (write) data records (messages) to Kafka topics. They are responsible for creating new data and sending it to the Kafka cluster. For example, a banking backend service sending transaction data as events. Kafka topics decide which topic and partition the data goes to.

4. Consumers
These are client applications that subscribe to (read) data records from Kafka topics. They process the data streams published by producers. For example a fraud detection system reading payment events.

5. Topics and Partitions
Topics are categories or feeds to which records are published. They are logical channels for organizing data streams. For example; transactions events streamed from banks backend systems can be stored in a topic named 'Transactions'.
Topics are further divided into partitions that enable parallel processing, which are ordered and immutable sequences of records.

6. Zookeeper (Kraft in newer versions)
Zookeeper/Kraft manages the cluster state/metadata and is in charge of electing a leader from the cluster of servers.
Historically, Kafka relied on an external Apache ZooKeeper for managing the cluster's metadata.In newer versions of Kafka, Kraft has been introduced to remove the dependency on ZooKeeper, simplifying the architecture.

Streaming Weather Data using Kafka

Let's do an example project where we get weather data using an API from OpenWeather website using a Kafka producer, write into a Kafka topic and read from the topic using a Kafka consumer.

Stage 1: Create a Producer

Using python, lets create a producer to get data from Open Weather using a rest API.

from kafka import KafkaProducer
import requests
from dotenv import load_dotenv
import os
import json

load_dotenv()

API_KEY = os.getenv('API_KEY')
Enter fullscreen mode Exit fullscreen mode
  • Import dependencies that we require for the producer to work. Which include KafkaProducer for connecting to Kafka and sending the collected data to the consumer, requests to perform a HTTP GET request to fetch data using the API, load_dotenv for getting our stored credentials (API key) from the dotenv file and json for serialization of data as Kafka sends data as bytes.

  • We then retrieve our API key using os.getenv().

def pull_weather_data():

    cities = ['New York', 'London', 'Johannesburg', 'Nairobi', 'Cairo', 'Doha', 'Tokyo', 'Sydney']

    cities_weather_data = []

    for city in cities:

        url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}"

        response = requests.get(url)

        weather_data = response.json()

        cities_weather_data.append({
            'City' : weather_data['name'],
            'Country' : weather_data['sys']['country'],
            'Temparature' : weather_data['main']['temp'],
            'Humidity' : weather_data['main']['humidity'],
            'Feels_Like' : weather_data['main']['feels_like'],
            'Last_update_time' : weather_data['dt']
        })

    return cities_weather_data
Enter fullscreen mode Exit fullscreen mode
  • We use a python function to pull_weather_data() to fetch and store the data.
  • Inside the function we define the cities from which we want to get weather data of and store them in a list. Create an empty list that we will use to store the collected data. The API request can only fetch data for each city at a time. To pass our list of cities to the query, we use a for loop to loop through the list.
  • To perform the HTTP request we use requests.get(). The API request returns data in json. We utilize .json() to convert the response to a python dictionary. From the dictionary we pick the data that we want and using .append() add into the empty list that we created.
  • The function then returns our newly populated list as the output.
producer = KafkaProducer( 
    bootstrap_servers = 'localhost:9092',     
    value_serializer = lambda v: json.dumps(v).encode('utf-8')
)
Enter fullscreen mode Exit fullscreen mode
  • Using KafkaProducer, we connect to the running instance of Kafka on port 9092. Kafka does not send data as dictionaries but as bytes, we pick each value as v from the python dictionary in our list, convert it to a json using json.dumps(v) string then to bytes using .encode('utf-8').
$ bin/kafka-topics.sh --create --topic open_weather_api_cities_data --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode
  • Create a topic in Kafka
topic = 'open_weather_api_cities_data'

while True:
    weather_data = pull_weather_data()
    producer.send(topic, weather_data) 
    print(f'From openweather: {weather_data}') 
    time.sleep(10)
Enter fullscreen mode Exit fullscreen mode
  • Provide the name of the topic that Kafka will write the data into and store it in a variable.
  • Use a while True condition to create an infinite loop that only stops when interrupted and continuously runs the pull_weather_data() function and send it into the Kafka topic using producer.send().
  • For visualization and debugging, you can print the data sent to the Kafka topic.
  • Lastly, using time.sleep(10), the loop waits for 10 seconds and then runs again.

To run the Producer, we save our python code in a producer.py file and run it.

 The output we get after running the producer.

Stage 2: Create a Consumer

from kafka import KafkaConsumer
import json
Enter fullscreen mode Exit fullscreen mode
  • Import dependencies that we require for the Consumer. Which include KafkaConsumer for connecting to Kafka and reading the messages from the topics and json for de-serialization of data back to a python dictionary.
consumer = KafkaConsumer(
    'open_weather_api_cities_data',
    bootstrap_servers = 'localhost:9092', 
    auto_offset_reset = 'earliest', 
    value_deserializer = lambda m: json.loads(m.decode('utf-8'))
)
Enter fullscreen mode Exit fullscreen mode
  • On this block we create a kafka Consumer using KafkaConsumer() and tell it how to read messages. This includes the message of the topic it will listen on and the location of Kafka which in our case is localhost at port 9092.

  • Kafka stores messages in order using offsets, which determine where the consumer starts reading from. In our case earliest, meaning start reading from the first message in the topic.

  • During writing into the topic, we converted the data into bytes, using a deserializer, we convert the data back to python using json.loads(m.decode('utf-8').

for message in consumer:
    print(f'Received:{message.value}')
Enter fullscreen mode Exit fullscreen mode
  • Use a continuous loop to constantly wait for new messages and then print them for visualization.

 The output of the Consumer after reading from the open_weather_api_cities_data topic.

Conclusion

In this article, we built a complete streaming workflow where a Python producer collected live weather data from the OpenWeather API and published it into a Kafka topic, while a consumer continuously read and processed the incoming messages. This producer-consumer architecture is widely used in real-world systems such as financial transaction monitoring, log aggregation, IoT telemetry, fraud detection, real-time analytics, and event-driven microservices because it allows applications to exchange data reliably and asynchronously at scale.

Top comments (0)