DEV Community

Cover image for Real-Time Fraud Detection Using Kafka and Machine Learning
Karthik Parameswaran
Karthik Parameswaran

Posted on

Real-Time Fraud Detection Using Kafka and Machine Learning

Introduction

Fraud detection is a critical component of modern financial systems. Real-time identification of suspicious activities can prevent financial losses and protect user trust. In this tutorial, we demonstrate how to build a real-time fraud detection system using Apache Kafka for data streaming and a pre-trained machine learning model for anomaly detection.

This proof-of-concept (POC) application demonstrates a real-time fraud detection system built using Kafka, Python, and a basic machine learning model. It simulates continuous transaction data streams, processes them with a Kafka consumer, classifies them with a trained ML model, and routes the messages to respective Kafka topics — either for alerts or for archiving. The goal is to show how event-streaming infrastructure and machine learning can be integrated to build scalable and responsive systems for real-time insights.

Architecture Overview

The underlying model in this POC is an Isolation Forest — a lightweight unsupervised anomaly detection algorithm. While it works well for outlier detection in smaller datasets, production-grade systems may leverage deep learning models (e.g., LSTM for temporal behavior), graph-based models (e.g., Graph Neural Networks for fraud rings), ensemble methods, or hybrid approaches that blend rules and ML predictions. These models are typically trained on large volumes of labeled data and are regularly retrained as fraud patterns evolve. Ultimately, this project serves as a foundational blueprint. The pipeline, architecture, and stream processing techniques demonstrated here can be extended to accommodate more sophisticated features and detection mechanisms. The flexibility of Kafka and Python allows developers and data scientists to quickly prototype and deploy real-time analytics and decision-making workflows in environments ranging from fintech and banking to e-commerce and telecom.

The system architecture consists of three main components:

  • Kafka Producer: Generates simulated transactions.
  • Kafka Consumer: Consumes transactions and classifies them using rules and a machine learning model.
  • Kafka Topics: Used to route messages (transactions, alerts).

ETL system architecture

Setting Up Kafka and Redpanda

Redpanda is used as the Kafka-compatible streaming platform. Start services using Docker Compose. Ensure the Redpanda broker is running and accessible at kafka:9092. You can view topics and streams using the Redpanda Console UI.

Transaction Producer Script

This script generates realistic transaction data with fields like user, amount, location, merchant, and device type. It sends the transaction every 0.5 seconds to the transactions Kafka topic.

from kafka import KafkaProducer
import json, random, time
from datetime import datetime

producer = KafkaProducer(bootstrap_servers='kafka:9092',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

users = ['John', 'Peter', 'Sam']
locations = ['US', 'IN', 'FR', 'BR', 'JP']
devices = ['mobile', 'desktop', 'tablet']
merchants = ['Amazon', 'Walmart', 'Apple', 'Nike', 'XYZ_Merchant']
txn_types = ['purchase', 'transfer', 'withdrawal', 'deposit']

while True:
    data = {
        "user": random.choice(users),
        "amount": round(random.uniform(1.0, 500.0), 2),
        "location": random.choice(locations),
        "device": random.choice(devices),
        "merchant": random.choice(merchants),
        "txn_type": random.choice(txn_types),
        "timestamp": datetime.utcnow().isoformat()
    }
    producer.send('transactions', data)
    time.sleep(0.5)
Enter fullscreen mode Exit fullscreen mode

Kafka Topic:

Producer Data

Fraud Detection Consumer Script

The consumer listens to the transactions topic and classifies each transaction. It applies rule-based checks and uses a pre-trained ML model loaded from fraud_model.pkl to detect anomalies.

from kafka import KafkaConsumer, KafkaProducer
import json
import pandas as pd
import joblib
from datetime import datetime

model = joblib.load("fraud_model.pkl")

consumer = KafkaConsumer(
    'transactions',
    bootstrap_servers='kafka:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='fraud-detector'
)

producer = KafkaProducer(
    bootstrap_servers='kafka:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

print("Fraud detection consumer running...")

user_last_location = {}

for message in consumer:
    txn = message.value

    # RULE-BASED DETECTIONS
    suspicious = False
    reasons = []

    # 1. High amount
    if txn["amount"] > 400:
        suspicious = True
        reasons.append("High amount")

    # 2. Location mismatch in short time
    user = txn["user"]
    if user in user_last_location and user_last_location[user] != txn["location"]:
        suspicious = True
        reasons.append("Location change from {} to {}".format(user_last_location[user], txn["location"]))
    user_last_location[user] = txn["location"]

    # 3. Suspicious merchant
    if txn["merchant"] in ["XYZ_Merchant", "UnknownShop"]:
        suspicious = True
        reasons.append(f"Merchant {txn['merchant']} flagged")

    # ML ANOMALY DETECTION
    data_point = pd.DataFrame([{
        "amount": txn["amount"],
        "txn_per_hour": 3,   # later we can track actual frequency
        # Additional numeric fields can be added for ML
    }])

    ml_result = model.predict(data_point)[0]
    if ml_result == -1:
        suspicious = True
        reasons.append("ML anomaly detection")

    txn["fraudulent"] = suspicious
    txn["reason"] = reasons

    if suspicious:
        print(f"Fraud Alert: {txn}")
        producer.send('fraud_alerts', txn)
    else:
        print(f"Legit Txn: {txn}")
        producer.send('classified_transactions', txn)
Enter fullscreen mode Exit fullscreen mode

Fraud Detection Training Explained

Fraud detection is performed using a combination of rule-based heuristics and a machine learning anomaly detector. The detection logic includes:

  • High transaction amouFraud detection is performent: Flags transactions above 400 units.
  • Location change: Detects user transactions from different countries in quick succession.
  • Merchant verification: Certain merchants (e.g., XYZ_Merchant, UnknownShop) are flagged as high risk.
  • ML Model: A trained Isolation Forest model predicts anomalies based on transaction patterns.

Each suspicious transaction includes a fraudulent flag and a reason field detailing which checks triggered the alert.

Sample Output and Monitoring

Fraudulent transactions are sent to fraud_alerts topic. Legitimate ones goes to classified_transactions.

fraud_alerts:

fraud_alerts

classified_transactions:

classified_transactions

You can monitor these in the Redpanda Console or via custom UIs built for fraud review teams.

Docker Configs:

services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper


  producer:
    build: .
    command: python producer.py
    depends_on:
      - kafka

  consumer:
    build: .
    command: python consumer.py
    depends_on:
      - kafka

  redpanda-console:
    image: redpandadata/console:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_BROKERS: kafka:9092
    depends_on:
      - kafka
Enter fullscreen mode Exit fullscreen mode

Dockerfile:

FROM python:3.10-slim
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Train the model during build
RUN python train_model.py

CMD ["python", "consumer.py"]

Enter fullscreen mode Exit fullscreen mode

Future Enhancements

It is important to emphasise that this implementation is intentionally simplified for educational and prototyping purposes. In real-world environments, fraud detection systems are significantly more complex and tailored to the organization’s specific threat landscape and user behavior. Attributes such as user session duration, transaction location history, IP address reputation, device fingerprinting, login velocity, spending behavior patterns, merchant credibility, and historical fraud signals are often used to detect anomalies.

Potential improvements include:

  • Adding temporal tracking of user behavior (transactions per minute/hour).
  • Including geolocation data and device fingerprinting.
  • Real-time alert dashboards using WebSocket or gRPC APIs.
  • Model retraining based on analyst feedback.

CodeBase: Github

Linkedin: Linkedin

Website: https://karthikpr.vercel.app/

Top comments (0)