DEV Community

Cover image for Building a Fraud Detection Pipeline using Python, PostgreSQL, Apache Kafka, PySpark, Grafana and Scikit-learn
Denzel Kanyeki
Denzel Kanyeki

Posted on

Building a Fraud Detection Pipeline using Python, PostgreSQL, Apache Kafka, PySpark, Grafana and Scikit-learn

Introduction

Fraud doesn’t happen once in a while, it happens every second. Every card swipe, every online purchase, every wire transfer has a chance of being fraudulent. And by the time fraud is detected in batch reports, the damage is usually done.

I wanted to explore, What if fraud could be detected as it happens? Could a pipeline be built that not only processes financial transactions in real time, but also applies machine learning to flag suspicious activity on the fly?

In this project, I built a fraud detection system that:

  • Produces synthetic transaction data using Faker. Read more here about why synthetic data is mostly used in model training than real data.

  • Trains an Isolation Forest machine learning model on transactional data.

  • Uses Streamlit to visualize model performance and provide explainability.

  • Streams transactions into PostgreSQL, which integrates with Grafana for real-time dashboards and monitoring.

  • Is fully containerized with Docker for deployment.

Project Architecture

The flowchart below shows the project architecture used in this project

Project Architecture

Project Workflow

1. Kafka Producer

The Kafka producer:

  • Uses Faker to generate synthetic users and transactions.
from faker import Faker

faker = Faker()
Faker.seed(42)

def generate_users():
    first_name = faker.first_name()
    last_name = faker.last_name()
    email = faker.email()
    location = faker.country()
    user_id = str(uuid.uuid4())
    user = {
        'user_id': user_id,
        'first_name': first_name,
        'last_name': last_name,
        'full_name': first_name + ' ' + last_name,
        'email': email,
        'location': location
    }

    return user
Enter fullscreen mode Exit fullscreen mode

This is how data is structured into our Kafka topic:

{
  "user_id": "c5223ee0-c3b4-4149-ac2c-93daaedb16a7",
  "transaction_id": "8026c2d5-aaf6-4197-9242-0ecf633853f2",
  "timestamp": "2025-08-06T06:12:33.763770",
  "first_name": "Marie",
  "last_name": "Walton",
  "location": "Malaysia",
  "transaction_location": "Malaysia",
  "amount": 14129.53,
  "device": "Tablet",
  "is_fraud": 1
}
Enter fullscreen mode Exit fullscreen mode
  • Simulates both legitimate and fraudulent events:

  • Fraud rules include high transaction amounts, location mismatches, and unusual devices.

  • Streams messages to the Kafka topic (transactions-data) with keys as hostnames and values as JSON payloads.

  • Configured with KafkaProducer and SASL/SCRAM authentication for secure cloud Kafka hosted on Redpanda..

2. Kafka Consumers,

a. transaction_consumer

  • Built on Spark Structured Streaming.

  • Reads data continuously via readStream() from the Kafka topic.

  • Defines a schema for the transaction JSON payload.

  • Transforms raw Kafka records into a structured DataFrame with:

    • user_id, transaction_id, amount, device, timestamp, etc.
  • Writes all non-fraud transactions via writeStream() into the database table:

    • shop.transactions, used for downstream analytics and model training.

b. fraud_consumer

  • Also powered by Spark Structured Streaming.

  • Reads from the same Kafka topic but filters only fraudulent events, is_fraud = 1, for counterchecking.

  • Converts timestamps into SQL timestamp type.

  • Persists flagged frauds into the database table:

    • shop.fraud_transactions, used for fraud analysis, validation, and dashboards

For transaction_consumer, the code snippet below hows how data is written into Postgres using writeStream() from PySpark's Structured Streaming:

'''
transfrom_data() which reads streaming data from topic and transforms it
write_as_batch which streams data in micro batches suitable for writing to Postgres

'''
def write_to_transactions():
    df = transform_data()
    path = "/tmp/fraud_detection/transactions"
    try:
        return df.writeStream \
          .foreachBatch(write_as_batch) \
          .outputMode('append') \
          .option('checkpointLocation', path) \
          .start() \
          .awaitTermination()
    except Exception as e:
        print(f"Error streaming data to shop.transactions: {e}")
Enter fullscreen mode Exit fullscreen mode

3. Database and visualization

  • PostgreSQL acts as the sink for both consumers.

  • Grafana connects to PostgreSQL for real-time analytics:

    • Transaction trends
    • Fraud vs non-fraud comparisons
    • Top 5 fraudulent transactions
  • Streamlit application that provides ML model performance visualization.

Below are snapshots of the Grafana dashboard and Streamlit visualization. To access the dashboard, please follow this link

Grafana dashboard

Streamlit

Streamlit

4. Machine Learning, Teaching the System to Spot the Odd Ones Out

Fraudulent behavior often hides in subtle patterns. To detect anomalies, Isolation Forest model is trained offline on features like:

  • Transaction amount

  • Device frequency (how common a device is across transactions)

  • Location change (user’s home vs transaction location)

The trained model is saved (jobs/isolation_forest.pkl) and fraud predictions are visualized in Streamlit, providing explainability alongside Grafana’s real-time metrics.

The code snippet shows how the model is trained offline by reading data from a PostgreSQL table and saves the jobs inside a .pkl file.

# imports 

def train_model():
    try:
        engine = create_engine(os.getenv("URL"))
        df = pd.read_sql_table('transactions', con=engine, schema='shop')
        device_freq = df["device"].value_counts(normalize=True)
        df['device_freq'] = df["device"].map(device_freq)
        df["location_change"] = (df["location"] != df["transaction_location"]).astype('int')
        threshold = 0.05
        rare_devices = df[df['device_freq'] < threshold]

        features = df[['amount', 'device_freq', 'location_change']].copy()
        features.columns = ['amount', 'device_freq', 'location_change']

        n_estimators = 100
        contamination = 0.1 # expecting that 10% is fraudulent 

        iso_forest = IsolationForest(
            n_estimators=n_estimators,
            contamination=contamination,
            random_state=42
        )
        iso_forest.fit(features)

        path = "jobs/isolation_forest.pkl"
        if os.path.exists(path):
            print(f"Model already exists at {path}. Overwriting...")
        else:
            joblib.dump(iso_forest, path)
        print(f"Job loaded into {path} successfully.")

    except Exception as e:
        print(f'Error training model: {e}')

if __name__ == '__main__':
    train_model()
Enter fullscreen mode Exit fullscreen mode

Conclusion

Fraud detection is more than a data problem, it’s a streaming, scaling, real-time challenge.

This project showed how Kafka + Spark can handle the firehose of financial events, while PostgreSQL + Grafana turn raw data into live insights. On top of that, machine learning with Isolation Forest gives the system predictive power to flag anomalies before they cause damage.

What excites me most is how modular this architecture is. Want to replace Isolation Forest with a deep learning model? Easy. Want to scale Kafka across clusters? Done. Want to plug Grafana into Prometheus for infrastructure monitoring? Straightforward.

With open-source tools, fraud detection at scale is no longer locked away in banks’ back rooms, anyone can build it, understand it, and improve it.

You can access the GitHub repository here

If you’re curious, clone the repo, run it locally with Docker, and see fraud detection in action in real time. I'll appreciate your feedback!

Top comments (0)