DEV Community

Stefen
Stefen

Posted on

Real-Time Data Processing with MySQL, Redpanda, MinIO, and Apache Spark Using Delta Lake

In this article, you will learn how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We will create a system that generates fake data simulating sensors on a bridge that flash car plates at each passage. The data will be stored in a MySQL database, and processed in real-time using Redpanda and Kafka Connect. We will then use MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we will integrate the Twilio API for real-time notifications.

Table of Contents

  1. Introduction

  2. Setting up the environment

  • Docker Compose configuration

  • Data generation and storage in MySQL

  • Creating an API for data ingestion

  • Setting up connectors for data streaming and storage

  1. Real-time data processing with Apache Spark
  • Reading data from MinIO

  • Data transformation and storage in the data warehouse

  • Integrating Twilio for real-time notifications

  1. Conclusion

1. Introduction

In this article, we will walk through the process of setting up a real-time data processing and analytics environment for vehicle plate recognition. We will use Docker to manage our services, MySQL for data storage, Redpanda as a streaming platform, MinIO as an object storage server, and Apache Spark for data processing and analysis. We will also integrate the Twilio API to send SMS notifications in real-time based on the processed data.

2. Setting up the environment

Docker Compose configuration

To begin, we will create a Docker Compose file that defines all the necessary services, networks, and volumes for our environment. The services include Redpanda, MinIO, MySQL, Kafka Connect, Adminer, Spark Master, Spark Workers, Jupyter Notebook, a data generator, and an API.

version: "3.7"
services:
  redpanda:
    image: vectorized/redpanda
    container_name: redpanda
    ports:
      - "9092:9092"
      - "29092:29092"
    command:
      - redpanda
      - start
      - --overprovisioned
      - --smp
      - "1"
      - --memory
      - "1G"
      - --reserve-memory
      - "0M"
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
      - --check=false
    networks:
      - spark_network  

  redpanda-console:
    image: vectorized/console
    container_name: redpanda_console
    depends_on:
      - redpanda
    ports:
      - "5000:8080"
    env_file:
      - .env
    networks:
      - spark_network  

  minio:
    hostname: minio
    image: "minio/minio"
    container_name: minio
    ports:
      - "9001:9001"
      - "9000:9000"
    command: [ "server", "/data", "--console-address", ":9001" ]
    volumes:
      - ./minio/data:/data
    env_file:
      - .env
    networks:
      - spark_network  

  mc:
    image: minio/mc
    container_name: mc
    hostname: mc
    environment:
      - AWS_ACCESS_KEY_ID=minio
      - AWS_SECRET_ACCESS_KEY=minio123
      - AWS_REGION=us-east-1
    entrypoint: >
      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
    depends_on:
      - minio
    networks:
      - spark_network  

  mysql:
    image: debezium/example-mysql:1.6
    container_name: mysql
    volumes:
      - ./mysql/data:/var/lib/mysql
    ports:
      - "3306:3306"
    env_file:
      - .env
    networks:
      - spark_network  

  kafka-connect:
    build:
      context: ./kafka
      dockerfile: ./Dockerfile
    container_name: kafka_connect
    depends_on:
      - redpanda
    ports:
      - "8083:8083"
    env_file:
      - .env
    networks:
      - spark_network  

  adminer:
    image: adminer:latest
    ports:
      - 8085:8080/tcp
    deploy:
     restart_policy:
       condition: on-failure 
    networks:
      - spark_network      

  spark-master:
    build:
      context: ./spark
      dockerfile: ./Dockerfile
    container_name: "spark-master"
    environment:
      - SPARK_MODE=master
      - SPARK_LOCAL_IP=spark-master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - "7077:7077"
      - "8080:8080"
    volumes:
      - ./spark/spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
    networks:
      - spark_network

  spark-worker-1:
    image: docker.io/bitnami/spark:3.3
    container_name: "spark-worker-1"
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - spark_network

  spark-worker-2:
    image: docker.io/bitnami/spark:3.3
    container_name: "spark-worker-2"
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - spark_network

  spark-notebook:
    build:
      context: ./notebooks
      dockerfile: ./Dockerfile
    container_name: "spark-notebook"
    user: root
    environment:
      - JUPYTER_ENABLE_LAB="yes"
      - GRANT_SUDO="yes"
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./notebooks/spark-defaults.conf:/usr/local/spark/conf/spark-defaults.conf
    ports:
      - "8888:8888"
      - "4040:4040"
    networks:
      - spark_network

  generate_data:
    build: ./generate_data
    container_name: generate_data
    command: python generate_data.py
    depends_on:
      - mysql
    networks:
      - spark_network

  api:
    build: ./api
    ports:
      - "8000:8000"
    depends_on:
      - mysql          


networks:
  spark_network:
    driver: bridge
    name: spark_network

docker-compose up --build -d
Enter fullscreen mode Exit fullscreen mode

Data generation and storage in MySQL

Once our environment is set up, we will generate fake data simulating sensors on a bridge that flash car plates at each passage. The data will include vehicle and owner information, subscription status, and other relevant fields. This data will be stored in a MySQL database and serve as the source of our real-time data processing pipeline.

import random
import uuid
from faker import Faker
import pandas as pd
import mysql.connector
from datetime import datetime, timedelta

# Initialize Faker
fake = Faker()

# Number of data points to generate
num_records = 1000

# Generate synthetic data
data = []

for _ in range(num_records):
    unique_id = str(uuid.uuid4())
    plate_number = f"{random.randint(1000, 9999)}-{fake.random_element(elements=('AAA', 'BBB', 'CCC', 'DDD', 'EEE', 'FFF', 'GGG', 'HHH', 'III', 'JJJ', 'KKK', 'LLL', 'MMM', 'NNN', 'OOO', 'PPP', 'QQQ', 'RRR', 'SSS', 'TTT', 'UUU', 'VVV', 'WWW', 'XXX', 'YYY', 'ZZZ'))}"

    car_info = {
        "make": fake.random_element(elements=("Toyota", "Honda", "Ford", "Chevrolet", "Nissan", "Volkswagen", "BMW", "Mercedes-Benz")),
        "year": random.randint(2000, 2023)
    }

    owner_info = {
        "name": fake.name(),
        "address": fake.address(),
        "phone_number": fake.phone_number().replace("x", " ext. ")  # Modify phone number format
    }

    subscription_status = fake.random_element(elements=("active", "expired", "none"))

    if subscription_status != "none":
        subscription_start = fake.date_between(start_date='-3y', end_date='today')
        subscription_end = subscription_start + timedelta(days=365)
    else:
        subscription_start = None
        subscription_end = None

    balance = round(random.uniform(0, 500), 2)

    timestamp = fake.date_time_between(start_date='-30d', end_date='now').strftime('%Y-%m-%d %H:%M:%S')


    record = {
        "id": unique_id,
        "plate_number": plate_number,
        "car_make": car_info["make"],
        "car_year": car_info["year"],
        "owner_name": owner_info["name"],
        "owner_address": owner_info["address"],
        "owner_phone_number": owner_info["phone_number"],
        "subscription_status": subscription_status,
        "subscription_start": subscription_start,
        "subscription_end": subscription_end,
        "balance": balance,
        "timestamp": timestamp
    }

    data.append(record)

# Convert data to a pandas DataFrame
df = pd.DataFrame(data)

# Connect to the MySQL database
db_config = {
    "host": "mysql",
    "user": "root",
    "password": "debezium",
    "database": "inventory"
}
conn = mysql.connector.connect(**db_config)

# Create a cursor
cursor = conn.cursor()

# Create the 'customers' table if it doesn't exist
create_table_query = '''
CREATE TABLE IF NOT EXISTS customers (
    id VARCHAR(255) NOT NULL,
    plate_number VARCHAR(255) NOT NULL,
    car_make VARCHAR(255) NOT NULL,
    car_year INT NOT NULL,
    owner_name VARCHAR(255) NOT NULL,
    owner_address TEXT NOT NULL,
    owner_phone_number VARCHAR(255) NOT NULL,
    subscription_status ENUM('active', 'expired', 'none') NOT NULL,
    subscription_start DATE,
    subscription_end DATE,
    balance DECIMAL(10, 2) NOT NULL,
    timestamp TIMESTAMP NOT NULL
)
'''
cursor.execute(create_table_query)

# Store the synthetic data in the 'customers' table
for index, row in df.iterrows():
    insert_query = '''
    INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    cursor.execute(insert_query, (
        row['id'],
        row['plate_number'],
        row['car_make'],
        row['car_year'],
        row['owner_name'],
        row['owner_address'],
        row['owner_phone_number'],
        row['subscription_status'],
        row['subscription_start'],
        row['subscription_end'],
        row['balance'],
        row['timestamp']
    ))

# Commit the changes and close the cursor
conn.commit()
cursor.close()

# Close the database connection
conn.close()

print("Synthetic data stored in the 'customers' table in the MySQL database")
Enter fullscreen mode Exit fullscreen mode

Creating an API for data ingestion

To facilitate data ingestion, we will create an API that allows us to send data as JSON objects. This API will be used to insert new data into the MySQL database, simulating the real-time data flow from the sensors on the bridge.

from flask import Flask, request, jsonify, render_template
import mysql.connector
import pandas as pd

app = Flask(__name__, template_folder='template')

db_config = {
        "host": "10.0.0.25",
        "user": "root",
        "password": "debezium",
        "database": "inventory"
    }

@app.route('/send_data', methods=['POST'])
def send_data():
    data = request.get_json()


    conn = mysql.connector.connect(**db_config)

    cursor = conn.cursor()

    insert_query = '''
    INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    cursor.execute(insert_query, (
        data['id'],
        data['plate_number'],
        data['car_make'],
        data['car_year'],
        data['owner_name'],
        data['owner_address'],
        data['owner_phone_number'],
        data['subscription_status'],
        data['subscription_start'],
        data['subscription_end'],
        data['balance'],
        data['timestamp']
    ))

    conn.commit()

    cursor.close()
    conn.close()

    return jsonify({"status": "success"}), 200

@app.route('/customers', methods=['GET'])
def customers():
    plate_number = request.args.get('plate_number', '')
    page = int(request.args.get('page', 1))
    items_per_page = 10

    conn = mysql.connector.connect(**db_config)

    # Create a cursor
    cursor = conn.cursor()

    # Fetch customers filtered by plate_number and apply pagination
    select_query = '''
    SELECT * FROM customers
    WHERE plate_number LIKE %s
    LIMIT %s OFFSET %s
    '''
    cursor.execute(select_query, (f"%{plate_number}%", items_per_page, (page - 1) * items_per_page))
    customers = cursor.fetchall()

    # Get the total number of customers
    cursor.execute("SELECT COUNT(*) FROM customers WHERE plate_number LIKE %s", (f"%{plate_number}%",))
    total_customers = cursor.fetchone()[0]

    # Close the cursor and connection
    cursor.close()
    conn.close()

    return render_template('customers.html', customers=customers, plate_number=plate_number, page=page, total_pages=(total_customers // items_per_page) + 1)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)
Enter fullscreen mode Exit fullscreen mode

Test api

import requests

data = {
    "id": "5a5c562e-4386-44ad-bf6f-bab91081781e",
    "plate_number": "7695-OOO",
    "car_make": "Ford",
    "car_year": 2012,
    "owner_name": "Stefen",
    "owner_address": "92834 Kim Unions\nPort Harryport, MD 61729",
    "owner_phone_number": "your number phone",
    "subscription_status": "active",
    "subscription_start": None,
    "subscription_end": None,
    "balance": 100.0,
    "timestamp": "2023-03-03T14:37:49",
}

response = requests.post("http://0.0.0.0:8000/send_data", json=data)

print(response.status_code)
print(response.json())

python request.py
Enter fullscreen mode Exit fullscreen mode

my initial balance is $100

Setting up connectors for data streaming and storage

With our data stored in MySQL, we will set up Kafka Connect connectors to stream the data from MySQL to Redpanda and then store it in MinIO, which will serve as our distributed object storage. This data storage will act as the “bronze” table in our data warehouse.

# create connector source for MySQL
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "src-mysql",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.include.list": "inventory",
    "decimal.handling.mode": "double",
    "topic.prefix": "dbserver1",
    "schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}'

# create connector sink MySQL to S3
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "sink_aws-s3",
  "config": {
    "topics.regex": "dbserver1.inventory.*",
    "topics.dir": "inventory",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "1",
    "store.url": "http://minio:9000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.region": "us-east-1",
    "s3.bucket.name": "warehouse",
    "aws.access.key.id": "minio",
    "aws.secret.access.key": "minio123"
  }
}'
Enter fullscreen mode Exit fullscreen mode

3. Real-time data processing with Apache Spark

Reading data from MinIO

Using Apache Spark, we will read the data stored in MinIO and process it further. This processing will involve selecting relevant fields and transforming the data into a more suitable format for analysis.

Data transformation and storage in the data warehouse

Once we have processed the data, we will store it in a “silver” table in our data warehouse. This table will be used for further analysis and processing.

Integrating Twilio for real-time notifications

To enhance our real-time data processing pipeline, we will integrate the Twilio API, allowing us to send SMS notifications based on specific conditions or events. For example, we could send an SMS to the vehicle owner when their subscription is about to expire or when their

from datetime import datetime as dt, timedelta, timezone
import pytz
from twilio.rest import Client
from pyspark.sql import Row
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType
import datetime
import mysql.connector
from typing import Optional

# Additional imports
from mysql.connector import Error

TWILIO_ACCOUNT_SID = ''
TWILIO_AUTH_TOKEN = ''
TWILIO_PHONE_NUMBER = ''

client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")

def get_rate_for_customer(timestamp, subscription_status):
    if subscription_status == 'active':
        if 0 <= timestamp.hour < 6 or 11 <= timestamp.hour < 16:
            return 2.99
        elif 6 <= timestamp.hour < 11 or 16 <= timestamp.hour < 23:
            return 3.99
    else:
        return 9.99

    # Add a default rate value to avoid NoneType issues
    return 0.0


def is_subscription_active(subscription_start: dt, subscription_end: dt, current_time: dt) -> bool:
    return subscription_start <= current_time <= subscription_end

def get_subscription_status(subscription_end: dt, current_time: dt) -> bool:
    grace_period = timedelta(days=7)
    return current_time <= subscription_end + grace_period


def send_sms(phone_number, message):
    try:
        client.messages.create(
            body=message,
            from_=TWILIO_PHONE_NUMBER,
            to=phone_number
        )
        print(f"SMS sent to {phone_number}: {message}")
    except Exception as e:
        print(f"Error sending SMS: {e}")

from pyspark.sql.functions import col

def is_valid_balance(value):
    try:
        float(value)
        return True
    except ValueError:
        return False

valid_balance_udf = udf(is_valid_balance, BooleanType())

silver_data = silver_data.filter(valid_balance_udf(col("balance")))

# Database configuration
db_config = {
    "host": "mysql",
    "user": "root",
    "password": "debezium",
    "database": "inventory"
}

def update_customer_balance(customer_id, new_balance):
    try:
        connection = mysql.connector.connect(**db_config)
        cursor = connection.cursor()
        update_query = "UPDATE customers SET balance = %s WHERE id = %s"
        cursor.execute(update_query, (new_balance, customer_id))
        connection.commit()
        print(f"Updated balance for customer {customer_id}: {new_balance}")
    except Error as e:
        print(f"Error updating balance: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close() 

from datetime import datetime, timezone

def safe_date_conversion(date_string: Optional[str]) -> dt:
    if date_string is None or not isinstance(date_string, str):
        return dt(1970, 1, 1, tzinfo=timezone.utc)
    try:
        return dt.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        return dt(1970, 1, 1, tzinfo=timezone.utc)

def process_plate(row: Row) -> None:
    print(f"Processing plate: {row.plate_number}")
    current_time = dt.now(timezone.utc)
    try:
        plate_timestamp = dt.fromisoformat(row.timestamp[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        plate_timestamp = dt.fromtimestamp(0, timezone.utc)

    subscription_start = safe_date_conversion(row.subscription_start)
    subscription_end = safe_date_conversion(row.subscription_end)

    is_active = is_subscription_active(subscription_start, subscription_end, current_time)
    rate = get_rate_for_customer(plate_timestamp, row.subscription_status)

    balance = float(row.balance)
    new_balance = balance - rate

    if row.subscription_status == 'none':
        message = f"Dear {row.owner_name}, your car with plate number {row.plate_number} is not registered. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif is_active:  # Changed from row.subscription_status == 'active'
        message = f"Dear {row.owner_name}, your subscription is active. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif not get_subscription_status(subscription_end, current_time):
        message = f"Dear {row.owner_name}, your subscription has expired. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)

        update_customer_balance(row.id, new_balance)

silver_data.foreach(process_plate)
Enter fullscreen mode Exit fullscreen mode

This script is designed to process a dataset containing information about car passages and their owners, including subscription status, balance, plate numbers, and owner details. It reads data from a “silver” table in a data warehouse, processes the data in real-time, sends SMS notifications to the car owners via the Twilio API, and updates the customer’s balance in a MySQL database.

Here’s a breakdown of the script:

  1. Import necessary libraries and modules for the script.

  2. Define Twilio credentials (account SID, auth token, and phone number) for sending SMS notifications.

  3. Create a SparkSession to read data from the “silver” table.

  4. Define utility functions:

  • get_rate_for_customer: Calculate the rate based on timestamp and subscription status.

  • is_subscription_active: Check if a subscription is active.

  • get_subscription_status: Check if a subscription is within the grace period.

  • send_sms: Send an SMS using the Twilio API.

  • is_valid_balance: Check if a given balance is valid (convertible to a float).

  • update_customer_balance: Update the customer balance in the MySQL database.

  • safe_date_conversion: Convert a date string to a datetime object, handling errors and missing values.

  • process_plate: Process each plate record, calculate the rate, send SMS notifications, and update the customer balance.

  1. Register a User-Defined Function (UDF) valid_balance_udf that filters records with valid balance values.

  2. Filter the dataset to keep records with valid balances using the valid_balance_udf.

  3. Define database configuration for connecting to the MySQL database.

  4. Use the foreach action to process each plate record using the process_plate function. This includes checking subscription status, calculating the rate, sending SMS notifications, and updating the customer balance.

gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")


import pyspark.sql.functions as F
from pyspark.sql import SparkSession

class MetricsAdapter:
    def __init__(self, silver_table, warehouse_path):
        self.silver_table = silver_table
        self.warehouse_path = warehouse_path

    def show_metrics(self):
        daily_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/daily_metrics')
        weekly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/weekly_metrics')
        monthly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/monthly_metrics')
        quarterly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/quarterly_metrics')
        yearly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/yearly_metrics')
        subscription_status_count = silver_data.groupBy("subscription_status").count()

        print("Daily Metrics:")
        daily_metrics.show(5)

        print("Weekly Metrics:")
        weekly_metrics.show(5)

        print("Monthly Metrics:")
        monthly_metrics.show(5)

        print("Quarterly Metrics:")
        quarterly_metrics.show(5)

        print("Yearly Metrics:")
        yearly_metrics.show(5)    

    def transform(self):
        # Calculate the week, month, quarter, and year from the timestamp
        time_based_metrics = self.silver_table.withColumn("date", F.to_date("timestamp")) \
            .withColumn("year", F.year("timestamp")) \
            .withColumn("quarter", F.quarter("timestamp")) \
            .withColumn("month", F.month("timestamp")) \
            .withColumn("week_of_year", F.weekofyear("timestamp")) \
            .withColumn("total_passages", F.lit(1)) \
            .withColumn("total_revenue", F.when(self.silver_table.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99))


        # Daily metrics
        daily_metrics = time_based_metrics.groupBy("date").agg(
            F.count("*").alias("total_passages"),
            F.sum(F.when(time_based_metrics.timestamp.substr(12, 2).cast("int") < 12, 2.99).otherwise(3.99)).alias("total_revenue")
        )
        daily_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/daily_metrics')

        # Weekly metrics
        weekly_metrics = time_based_metrics.groupBy("year", "week_of_year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        weekly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/weekly_metrics')

        # Monthly metrics
        monthly_metrics = time_based_metrics.groupBy("year", "month").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        monthly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/monthly_metrics')

        # Quarterly metrics
        quarterly_metrics = time_based_metrics.groupBy("year", "quarter").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        quarterly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/quarterly_metrics')

        # Yearly metrics
        yearly_metrics = time_based_metrics.groupBy("year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        yearly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/yearly_metrics')

# Example usage
spark = SparkSession.builder.getOrCreate()
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")
warehouse_path = "s3a://warehouse/inventory/gold_data"
metrics_adapter = MetricsAdapter(silver_data, warehouse_path)
metrics_adapter.transform()

metrics_adapter.show_metrics()
Enter fullscreen mode Exit fullscreen mode

The code calculates daily, weekly, monthly, quarterly, and yearly metrics, such as total passages and total revenue. It also defines a MetricsAdapter class that encapsulates the data transformation and metrics display logic.

The first line of code:

gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")
Enter fullscreen mode Exit fullscreen mode

writes the gold_data DataFrame to the specified S3 bucket in Parquet format, with the overwrite mode, which replaces any existing data in the destination.

The MetricsAdapter class has two primary methods: transform() and show_metrics().

transform() method:

  1. Calculates the date, year, quarter, month, and week of the year from the timestamp.

  2. Aggregates the data based on different time granularities (daily, weekly, monthly, quarterly, and yearly) using the groupBy and agg functions.

  3. Writes the aggregated metrics into Parquet format on the specified S3 bucket using Delta Lake format, which provides ACID transactions, versioning, and schema evolution for large-scale data lakes.

show_metrics() method:

  1. Reads the metrics data from the S3 bucket and formats it as Delta Lake.

  2. Displays the top 5 records of daily, weekly, monthly, quarterly, and yearly metrics using the show() function.

Finally, the example usage part of the code initializes a SparkSession, reads the silver_data from the S3 bucket, creates a MetricsAdapter instance with silver_data and the warehouse path, calls the transform() method to aggregate the data, and then calls the show_metrics() method to display the results.

Conclusion

In this article, we have demonstrated how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We created a system that generates fake data simulating a sensor, stores it in a MySQL database, and processes it in real-time using Redpanda and Kafka Connect. We then utilized MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we integrated the Twilio API for real-time notifications.

This project showcases the potential of using modern data processing tools to handle real-time scenarios, such as monitoring car passages on a bridge and notifying car owners about their subscription status and balance. The combination of these technologies enables scalable and efficient data processing, as well as the ability to respond quickly to changes in the data.

The knowledge gained from this project can be applied to various other real-time data processing and analytics use cases. By understanding and implementing these technologies, you can build powerful and efficient systems that are able to handle large amounts of data and provide valuable insights in real-time.

https://github.com/Stefen-Taime/stream-ingestion-redpanda-minio.git

https://medium.com/@stefentaime_10958/real-time-data-processing-and-analytics-with-docker-mysql-redpanda-minio-and-apache-spark-eca83f210ef6

Top comments (0)