Introduction
In modern data engineering, handling high-velocity, real-time streams requires decoupled architectures that can scale seamlessly. A simple script fetching data from an API and writing it straight to a database creates a tight coupling; if the database goes down or the API experiences a spike, the entire system breaks.
This project implements a resilient Event-Driven Architecture (EDA). It extracts live global weather metrics from an API, streams them into an Apache Kafka topic managed via Docker, and processes them through an ETL (Extract, Transform, Load) consumer engine that flattens and persists the data into a PostgreSQL database.
Project System Architecture & Directory Layout
The project decouples data sourcing from data transformation and consumption using a publish-subscribe model. Docker isolates the streaming platform infrastructure, while Python applications drive the data operations.
text
openweather-kafka_confluent-project/
├── docker-compose.yml # Orchestrates Zookeeper, Kafka Broker, & Control Center
├── producer.py # Extracted RapidAPI multi-city pipeline (Ingestion)
├── consumer.py # Advanced Pandas & SQLAlchemy Postgres pipeline (ETL)
├── test.ipynb # Jupyter Notebook for interactive validation & debugging
└── .env # Local container and API credential configurations
Infrastructure Layer: Docker Compose & Commands
Instead of dealing with local, environment-specific installations of Kafka, the entire messaging backbone is containerized. The docker-compose.yml provisions a robust Confluent platform stack, exposing Kafka over port 9092 to the host machine.
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
Docker CLI Commands to Spin Up Infrastructure
To start the background message broker infrastructure, navigate to your project directory containing the configuration file and run:
# Start Kafka and Zookeeper services in detached mode
docker compose up -d
# Verify that your containers are running normally
docker ps
Data Ingestion: The Producer Layer (producer.py)
The producer.py script handles data ingestion. It loops continuously through an array of nine target international cities (Nairobi, Accra, Cape Town, Riga, Brussels, Moscow, Seoul, London, and Sucre), requests their real-time weather information via RapidAPI, and dispatches the payload to the weather_raw Kafka topic.
A standout feature here is the automatic inline JSON serialization using a lambda function passed directly into the KafkaProducer constructor.
from kafka import KafkaProducer
import time
import json
import requests
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("API_KEY")
API_HOST = os.getenv("API_HOST")
LANG = os.getenv("LANG")
topic = "weather_raw"
# Initialize Kafka Producer with integrated JSON byte-serializer
producer = KafkaProducer(
bootstrap_servers = 'localhost:9092',
value_serializer = lambda v : json.dumps(v).encode('utf-8')
)
Cities = ['Nairobi','Accra','Cape Town','Riga','Brussels','Moscow','Seoul','London','Sucre']
while True :
for city in Cities :
url = f"https://{API_HOST}/city?city={city}&lang={LANG}"
headers = {
'x-rapidapi-host': API_HOST,
'x-rapidapi-key' : API_KEY
}
try :
response = requests.get(url, headers=headers)
if response.status_code == 200 :
weather_data = response.json()
producer.send(topic, value=weather_data)
print(f"Sent weather data for {city}")
else :
print(f"Failed for {city} : {response.status_code} ")
except Exception as e :
print(f"Error for {city} : {e}")
producer.flush()
time.sleep(1)
To begin streaming live API payloads into your Kafka cluster, run the producer engine script in your terminal:
python3 producer.py
Output:
Storage & Transformation Tier: The ETL Consumer (consumer.py)
The consumer layer implements a true ETL pattern. Instead of just printing raw bytes, it targets the weather_raw topic, decodes the stream, flattens the highly nested API structures into a uniform format using Pandas, and loads the records into a PostgreSQL database instance using SQLAlchemy.
from kafka import KafkaConsumer
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
import json
import pandas as pd
load_dotenv()
Postgres_URI = os.getenv("POSTGRES_URI")
engine = create_engine(Postgres_URI)
# Initialize Kafka Consumer with native byte-decoding
consumer = KafkaConsumer(
'weather_raw',
bootstrap_servers = 'localhost:9092',
auto_offset_reset = 'earliest',
enable_auto_commit = True,
value_deserializer = lambda x : json.loads(x.decode('utf-8'))
)
print("Consumer started listening ...")
for message in consumer :
try :
data = message.value
# EXTRACT & TRANSFORM: Defensive parsing handles nested JSON safely
transformed_data = {
"city" : data.get("name"),
"temperature" : data.get("main",{}).get("temp"),
"humidity" : data.get("main",{}).get("humidity"),
"pressure" : data.get("main",{}).get("pressure"),
"weather" : data.get("weather",[{}])[0].get("main"),
"description" : data.get("weather",[{}])[0].get("description"),
"wind_speed" : data.get("wind",{}).get("speed")
}
# Structure as a Pandas DataFrame
df = pd.DataFrame([transformed_data])
print("\n Transformed weather data")
# LOAD: Persist metrics into the PostgreSQL destination table
df.to_sql("weather_kafka", con=engine, if_exists="append", index=False)
print(f"Loaded weather data for {transformed_data['city']}")
except Exception as e :
print(f"Consumer error : {e}")
Open a separate terminal shell pane and launch the engine to begin populating your relational database rows in real-time:
python3 consumer.py
Output:
Pipeline Verification & Data Verification
To verify the integration, we can monitor the execution traces across the python workflows, and query the final warehouse target to confirm data persistence.
Dual-Terminal Execution Log Comparison
When running both backend applications synchronously, your live terminal workspace layout matches the active message flow:
Verifying Records in the PostgreSQL Database
Because the consumer script implements df.to_sql(..., if_exists="append"), every iteration builds out relational records in real-time. Opening a connection tool or terminal CLI to your PostgreSQL instance reveals the transformed schemas waiting for analytics:
select * from weather_kafka;
Output log view:
Conclusion
This project successfully establishes a production-grade blueprint for real-time streaming data pipelines. By combining Docker container isolation with Kafka's decoupled storage guarantees, the system handles data ingestion loops safely without threatening the state of the loading layer. Using Python, Pandas, and SQLAlchemy turns nested API variations into structured relational records, resulting in an automated, robust data engine ready for downstream business intelligence dashboards.




Top comments (0)