Introduction
Fraud detection is a critical component of modern financial systems. Real-time identification of suspicious activities can prevent financial losses and protect user trust. In this tutorial, we demonstrate how to build a real-time fraud detection system using Apache Kafka for data streaming and a pre-trained machine learning model for anomaly detection.
This proof-of-concept (POC) application demonstrates a real-time fraud detection system built using Kafka, Python, and a basic machine learning model. It simulates continuous transaction data streams, processes them with a Kafka consumer, classifies them with a trained ML model, and routes the messages to respective Kafka topics — either for alerts or for archiving. The goal is to show how event-streaming infrastructure and machine learning can be integrated to build scalable and responsive systems for real-time insights.
Architecture Overview
The underlying model in this POC is an Isolation Forest — a lightweight unsupervised anomaly detection algorithm. While it works well for outlier detection in smaller datasets, production-grade systems may leverage deep learning models (e.g., LSTM for temporal behavior), graph-based models (e.g., Graph Neural Networks for fraud rings), ensemble methods, or hybrid approaches that blend rules and ML predictions. These models are typically trained on large volumes of labeled data and are regularly retrained as fraud patterns evolve. Ultimately, this project serves as a foundational blueprint. The pipeline, architecture, and stream processing techniques demonstrated here can be extended to accommodate more sophisticated features and detection mechanisms. The flexibility of Kafka and Python allows developers and data scientists to quickly prototype and deploy real-time analytics and decision-making workflows in environments ranging from fintech and banking to e-commerce and telecom.
The system architecture consists of three main components:
- Kafka Producer: Generates simulated transactions.
- Kafka Consumer: Consumes transactions and classifies them using rules and a machine learning model.
- Kafka Topics: Used to route messages (transactions, alerts).
Setting Up Kafka and Redpanda
Redpanda is used as the Kafka-compatible streaming platform. Start services using Docker Compose. Ensure the Redpanda broker is running and accessible at kafka:9092
. You can view topics and streams using the Redpanda Console UI.
Transaction Producer Script
This script generates realistic transaction data with fields like user, amount, location, merchant, and device type. It sends the transaction every 0.5 seconds to the transactions
Kafka topic.
from kafka import KafkaProducer
import json, random, time
from datetime import datetime
producer = KafkaProducer(bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
users = ['John', 'Peter', 'Sam']
locations = ['US', 'IN', 'FR', 'BR', 'JP']
devices = ['mobile', 'desktop', 'tablet']
merchants = ['Amazon', 'Walmart', 'Apple', 'Nike', 'XYZ_Merchant']
txn_types = ['purchase', 'transfer', 'withdrawal', 'deposit']
while True:
data = {
"user": random.choice(users),
"amount": round(random.uniform(1.0, 500.0), 2),
"location": random.choice(locations),
"device": random.choice(devices),
"merchant": random.choice(merchants),
"txn_type": random.choice(txn_types),
"timestamp": datetime.utcnow().isoformat()
}
producer.send('transactions', data)
time.sleep(0.5)
Kafka Topic:
Fraud Detection Consumer Script
The consumer listens to the transactions
topic and classifies each transaction. It applies rule-based checks and uses a pre-trained ML model loaded from fraud_model.pkl
to detect anomalies.
from kafka import KafkaConsumer, KafkaProducer
import json
import pandas as pd
import joblib
from datetime import datetime
model = joblib.load("fraud_model.pkl")
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='kafka:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='fraud-detector'
)
producer = KafkaProducer(
bootstrap_servers='kafka:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Fraud detection consumer running...")
user_last_location = {}
for message in consumer:
txn = message.value
# RULE-BASED DETECTIONS
suspicious = False
reasons = []
# 1. High amount
if txn["amount"] > 400:
suspicious = True
reasons.append("High amount")
# 2. Location mismatch in short time
user = txn["user"]
if user in user_last_location and user_last_location[user] != txn["location"]:
suspicious = True
reasons.append("Location change from {} to {}".format(user_last_location[user], txn["location"]))
user_last_location[user] = txn["location"]
# 3. Suspicious merchant
if txn["merchant"] in ["XYZ_Merchant", "UnknownShop"]:
suspicious = True
reasons.append(f"Merchant {txn['merchant']} flagged")
# ML ANOMALY DETECTION
data_point = pd.DataFrame([{
"amount": txn["amount"],
"txn_per_hour": 3, # later we can track actual frequency
# Additional numeric fields can be added for ML
}])
ml_result = model.predict(data_point)[0]
if ml_result == -1:
suspicious = True
reasons.append("ML anomaly detection")
txn["fraudulent"] = suspicious
txn["reason"] = reasons
if suspicious:
print(f"Fraud Alert: {txn}")
producer.send('fraud_alerts', txn)
else:
print(f"Legit Txn: {txn}")
producer.send('classified_transactions', txn)
Fraud Detection Training Explained
Fraud detection is performed using a combination of rule-based heuristics and a machine learning anomaly detector. The detection logic includes:
- High transaction amouFraud detection is performent: Flags transactions above 400 units.
- Location change: Detects user transactions from different countries in quick succession.
- Merchant verification: Certain merchants (e.g., XYZ_Merchant, UnknownShop) are flagged as high risk.
- ML Model: A trained Isolation Forest model predicts anomalies based on transaction patterns.
Each suspicious transaction includes a fraudulent
flag and a reason
field detailing which checks triggered the alert.
Sample Output and Monitoring
Fraudulent transactions are sent to fraud_alerts
topic. Legitimate ones goes to classified_transactions
.
fraud_alerts:
classified_transactions:
You can monitor these in the Redpanda Console or via custom UIs built for fraud review teams.
Docker Configs:
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
producer:
build: .
command: python producer.py
depends_on:
- kafka
consumer:
build: .
command: python consumer.py
depends_on:
- kafka
redpanda-console:
image: redpandadata/console:latest
ports:
- "8080:8080"
environment:
KAFKA_BROKERS: kafka:9092
depends_on:
- kafka
Dockerfile:
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Train the model during build
RUN python train_model.py
CMD ["python", "consumer.py"]
Future Enhancements
It is important to emphasise that this implementation is intentionally simplified for educational and prototyping purposes. In real-world environments, fraud detection systems are significantly more complex and tailored to the organization’s specific threat landscape and user behavior. Attributes such as user session duration, transaction location history, IP address reputation, device fingerprinting, login velocity, spending behavior patterns, merchant credibility, and historical fraud signals are often used to detect anomalies.
Potential improvements include:
- Adding temporal tracking of user behavior (transactions per minute/hour).
- Including geolocation data and device fingerprinting.
- Real-time alert dashboards using WebSocket or gRPC APIs.
- Model retraining based on analyst feedback.
CodeBase: Github
Linkedin: Linkedin
Website: https://karthikpr.vercel.app/
Top comments (0)