Introduction
Ever wondered how banks are able to detect and stop fraud in real-time? This is how they do it.
Banks process thousands of transactions every second, end each transaction is recorded in backend systems as it happens. Instead of relying on traditional batch processing, With Kafka they are able to stream/read all these new events that are being recorded in backend systems. These events are then consumed by processing services that apply fraud detection rules, analyze patterns and flag suspicious activities. With this, in a matter of seconds, they are able to determine if your transaction is fraudulent or not and r the result stored on downstream systems for immediate action.
what is event streaming?
Event streaming is the practice of capturing data in real-time from sources like databases, sensors, cloud services and software applications in the form of streams of events, storing these streams durably for later retrieval and routing the event streams to different destination technologies as needed.
What is Apache Kafka?
Kafka is a distributed event streaming platform consisting of servers and clients that communicate via a high performance binary protocol built on TCP.
Key Components of Apache Kafka
The following are the key components that make up kafka.
1. Servers(brokers)
store and retrieve data
Kafka is run as a cluster of one or more servers that can span multiple datacenters or cloud regions. Some of the servers form the storage layer called the brokers and others run Kafka Connect to enable continuous import and export of data as event streams from existing systems such as databases and other kafka clusters. Kafka is fault tolerant such that if one of it's servers fails, the other servers will take over their work.
2. Clients(producers and consumers)
publish and read events
They allow you to write distributed applications and micro-services that read, write and process streams of events in parallel, at scale and in a fault-tolerant manner.
3. Producers
They publish (write) data records (messages) to Kafka topics. They are responsible for creating new data and sending it to the Kafka cluster. For example, a banking backend service sending transaction data as events. Kafka topics decide which topic and partition the data goes to.
4. Consumers
These are client applications that subscribe to (read) data records from Kafka topics. They process the data streams published by producers. For example a fraud detection system reading payment events.
5. Topics and Partitions
Topics are categories or feeds to which records are published. They are logical channels for organizing data streams. For example; transactions events streamed from banks backend systems can be stored in a topic named 'Transactions'.
Topics are further divided into partitions that enable parallel processing, which are ordered and immutable sequences of records.
6. Zookeeper (Kraft in newer versions)
Zookeeper/Kraft manages the cluster state/metadata and is in charge of electing a leader from the cluster of servers.
Historically, Kafka relied on an external Apache ZooKeeper for managing the cluster's metadata.In newer versions of Kafka, Kraft has been introduced to remove the dependency on ZooKeeper, simplifying the architecture.
Streaming Weather Data using Kafka
Let's do an example project where we get weather data using an API from OpenWeather website using a Kafka producer, write into a Kafka topic and read from the topic using a Kafka consumer.
Stage 1: Create a Producer
Using python, lets create a producer to get data from Open Weather using a rest API.
from kafka import KafkaProducer
import requests
from dotenv import load_dotenv
import os
import json
load_dotenv()
API_KEY = os.getenv('API_KEY')
Import dependencies that we require for the producer to work. Which include KafkaProducer for connecting to Kafka and sending the collected data to the consumer, requests to perform a HTTP GET request to fetch data using the API, load_dotenv for getting our stored credentials (API key) from the dotenv file and json for serialization of data as Kafka sends data as bytes.
We then retrieve our API key using
os.getenv().
def pull_weather_data():
cities = ['New York', 'London', 'Johannesburg', 'Nairobi', 'Cairo', 'Doha', 'Tokyo', 'Sydney']
cities_weather_data = []
for city in cities:
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}"
response = requests.get(url)
weather_data = response.json()
cities_weather_data.append({
'City' : weather_data['name'],
'Country' : weather_data['sys']['country'],
'Temparature' : weather_data['main']['temp'],
'Humidity' : weather_data['main']['humidity'],
'Feels_Like' : weather_data['main']['feels_like'],
'Last_update_time' : weather_data['dt']
})
return cities_weather_data
- We use a python function to
pull_weather_data()to fetch and store the data. - Inside the function we define the cities from which we want to get weather data of and store them in a list. Create an empty list that we will use to store the collected data. The API request can only fetch data for each city at a time. To pass our list of cities to the query, we use a for loop to loop through the list.
- To perform the HTTP request we use
requests.get(). The API request returns data in json. We utilize.json()to convert the response to a python dictionary. From the dictionary we pick the data that we want and using.append()add into the empty list that we created. - The function then returns our newly populated list as the output.
producer = KafkaProducer(
bootstrap_servers = 'localhost:9092',
value_serializer = lambda v: json.dumps(v).encode('utf-8')
)
- Using KafkaProducer, we connect to the running instance of Kafka on port 9092. Kafka does not send data as dictionaries but as bytes, we pick each value as v from the python dictionary in our list, convert it to a json using
json.dumps(v)string then to bytes using.encode('utf-8').
$ bin/kafka-topics.sh --create --topic open_weather_api_cities_data --bootstrap-server localhost:9092
- Create a topic in Kafka
topic = 'open_weather_api_cities_data'
while True:
weather_data = pull_weather_data()
producer.send(topic, weather_data)
print(f'From openweather: {weather_data}')
time.sleep(10)
- Provide the name of the topic that Kafka will write the data into and store it in a variable.
- Use a while True condition to create an infinite loop that only stops when interrupted and continuously runs the
pull_weather_data()function and send it into the Kafka topic usingproducer.send(). - For visualization and debugging, you can print the data sent to the Kafka topic.
- Lastly, using
time.sleep(10), the loop waits for 10 seconds and then runs again.
To run the Producer, we save our python code in a producer.py file and run it.
The output we get after running the producer.
Stage 2: Create a Consumer
from kafka import KafkaConsumer
import json
- Import dependencies that we require for the Consumer. Which include KafkaConsumer for connecting to Kafka and reading the messages from the topics and json for de-serialization of data back to a python dictionary.
consumer = KafkaConsumer(
'open_weather_api_cities_data',
bootstrap_servers = 'localhost:9092',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('utf-8'))
)
On this block we create a kafka Consumer using
KafkaConsumer()and tell it how to read messages. This includes the message of the topic it will listen on and the location of Kafka which in our case is localhost at port 9092.Kafka stores messages in order using offsets, which determine where the consumer starts reading from. In our case
earliest, meaning start reading from the first message in the topic.During writing into the topic, we converted the data into bytes, using a deserializer, we convert the data back to python using
json.loads(m.decode('utf-8').
for message in consumer:
print(f'Received:{message.value}')
- Use a continuous loop to constantly wait for new messages and then print them for visualization.
The output of the Consumer after reading from the open_weather_api_cities_data topic.
Conclusion
In this article, we built a complete streaming workflow where a Python producer collected live weather data from the OpenWeather API and published it into a Kafka topic, while a consumer continuously read and processed the incoming messages. This producer-consumer architecture is widely used in real-world systems such as financial transaction monitoring, log aggregation, IoT telemetry, fraud detection, real-time analytics, and event-driven microservices because it allows applications to exchange data reliably and asynchronously at scale.

Top comments (0)