<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Stefen</title>
    <description>The latest articles on DEV Community by Stefen (@stefentaime).</description>
    <link>https://dev.to/stefentaime</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F980393%2Fd6e2f453-b86e-45f6-a281-c50b2a5e0814.png</url>
      <title>DEV Community: Stefen</title>
      <link>https://dev.to/stefentaime</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/stefentaime"/>
    <language>en</language>
    <item>
      <title>Automating Data Pipeline Deployment on AWS with Terraform: Utilizing Lambda, Glue, Crawler, Redshift, and S3</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Wed, 26 Jul 2023 15:37:39 +0000</pubDate>
      <link>https://dev.to/stefentaime/automating-data-pipeline-deployment-on-aws-with-terraform-utilizing-lambda-glue-crawler-redshift-and-s3-3075</link>
      <guid>https://dev.to/stefentaime/automating-data-pipeline-deployment-on-aws-with-terraform-utilizing-lambda-glue-crawler-redshift-and-s3-3075</guid>
      <description>&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/automating-data-pipeline-deployment-on-aws-with-terraform-utilizing-lambda-glue-crawler-1621e0736edd" rel="noopener noreferrer"&gt;https://medium.com/@stefentaime_10958/automating-data-pipeline-deployment-on-aws-with-terraform-utilizing-lambda-glue-crawler-1621e0736edd&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3840%2F1%2AX8GQswkaH8T278wAinFm0g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3840%2F1%2AX8GQswkaH8T278wAinFm0g.png" alt="Automating Data Pipeline Deployment on AWS with Terraform"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Table of Contents
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Objective&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Pre-requisites&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Components&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Source Systems&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Schedule &amp;amp; Orchestrate&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Extract&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Load&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Transform&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data Visualization&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Choosing Tools &amp;amp; Frameworks&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Future Work &amp;amp; Improvements&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Further Reading&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Setup&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Important Note on Costs&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Objective
&lt;/h2&gt;

&lt;p&gt;The objective of this guide is to demonstrate how to automate the deployment of a data pipeline on AWS using Terraform. The pipeline will utilize AWS services such as Lambda, Glue, Crawler, Redshift, and S3. The data for this pipeline will be extracted from a Stock Market API, processed, and transformed to create various views for data analysis.&lt;/p&gt;

&lt;h2&gt;
  
  
  Pre-requisites
&lt;/h2&gt;

&lt;p&gt;Before we begin, make sure you have the following:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Basic understanding of AWS services&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Familiarity with Terraform&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;AWS account with IAM :&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2852%2F1%2ApKU5x1DsGE71zwBCwW9jTg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2852%2F1%2ApKU5x1DsGE71zwBCwW9jTg.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Terraform installed on your local machine&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Components
&lt;/h2&gt;

&lt;p&gt;The main components of our data pipeline are:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;AWS Lambda: Used for running serverless functions.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;AWS Glue: Used for ETL (Extract, Transform, Load) operations.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;AWS Crawler: Used for cataloging data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Amazon Redshift: Used for data warehousing and analysis.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;AWS S3: Used for data storage.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Source Systems
&lt;/h2&gt;

&lt;p&gt;In this pipeline, the source system is an API from which Lambda1 extracts data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Schedule &amp;amp; Orchestrate
&lt;/h2&gt;

&lt;p&gt;Lambda2 is scheduled to execute the Crawler and the Glue Job, orchestrating the flow of data through the pipeline.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import boto3&lt;br&gt;
import os&lt;br&gt;
import time

&lt;p&gt;def lambda_handler(event, context):&lt;br&gt;
    s3 = boto3.client('s3')&lt;br&gt;
    glue = boto3.client('glue')&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bucket_name = os.environ['BUCKET_NAME']

folders = ['AAPL', 'IBM', 'MSFT']

for folder in folders:
    objects = s3.list_objects_v2(
        Bucket=bucket_name,
        Prefix=folder + '/'
    )

    if not any(obj['Key'].endswith('.json') for obj in objects.get('Contents', [])):
        raise Exception(f"No JSON files found in {folder} folder")

glue.start_crawler(Name=os.environ['GLUE_CRAWLER_NAME'])

while True:
    crawler = glue.get_crawler(Name=os.environ['GLUE_CRAWLER_NAME'])
    if crawler['Crawler']['State'] == 'RUNNING':
        break
    time.sleep(10)  


while True:
    crawler = glue.get_crawler(Name=os.environ['GLUE_CRAWLER_NAME'])
    if crawler['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
        break
    time.sleep(60)  

glue.start_job_run(JobName=os.environ['GLUE_JOB_NAME'])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Extract&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Data is extracted from the API by Lambda1 and stored in an S3 bucket.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import json&lt;br&gt;
import boto3&lt;br&gt;
import requests&lt;br&gt;
import os&lt;br&gt;
from datetime import datetime

&lt;p&gt;def flatten_data(data):&lt;br&gt;
    metadata = data['Meta Data']&lt;br&gt;
    time_series = data['Time Series (Daily)']&lt;br&gt;
    new_data = []&lt;br&gt;
    for date, values in time_series.items():&lt;br&gt;
        flattened_record = metadata.copy()&lt;br&gt;
        flattened_record.update(values)&lt;br&gt;
        flattened_record['date'] = date&lt;br&gt;
        new_data.append(flattened_record)&lt;br&gt;
    return new_data&lt;/p&gt;

&lt;p&gt;def lambda_handler(event, context):&lt;br&gt;
    s3 = boto3.resource('s3')&lt;br&gt;
    apikey = ''&lt;br&gt;
    symbols = ['MSFT', 'AAPL', 'IBM']&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;bucket_name = os.environ['BUCKET_NAME']

for symbol in symbols:
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&amp;amp;amp;symbol={symbol}&amp;amp;amp;outputsize=full&amp;amp;amp;apikey={apikey}'
    r = requests.get(url)
    data = r.json()

    data = flatten_data(data)


    date_str = datetime.now().strftime('%Y-%m-%d')


    key = f'{symbol}/{date_str}-{symbol}.json'


    lines = ""
    for record in data[:100]:  # Only take the first 100 records
        line = json.dumps(record) + "\n"
        lines += line


    s3.Bucket(bucket_name).put_object(Key=key, Body=lines)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Load&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;The Crawler loads the data from the S3 bucket into a database with three tables.&lt;/p&gt;

&lt;h2&gt;
  
  
  Transform
&lt;/h2&gt;

&lt;p&gt;The Glue Job transforms the data by reading it from the catalog, applying transformations, and writing the output back into the S3 bucket.&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import sys&lt;br&gt;
from awsglue.transforms import *&lt;br&gt;
from awsglue.utils import getResolvedOptions&lt;br&gt;
from pyspark.context import SparkContext&lt;br&gt;
from awsglue.context import GlueContext&lt;br&gt;
from awsglue.job import Job&lt;br&gt;
from pyspark.sql import functions as F&lt;br&gt;
from awsglue.dynamicframe import DynamicFrame

&lt;p&gt;args = getResolvedOptions(sys.argv, ["JOB_NAME"])&lt;br&gt;
sc = SparkContext()&lt;br&gt;
glueContext = GlueContext(sc)&lt;br&gt;
spark = glueContext.spark_session&lt;br&gt;
job = Job(glueContext)&lt;br&gt;
job.init(args["JOB_NAME"], args)&lt;/p&gt;

&lt;p&gt;table_names = ["aapl", "ibm", "msft"]&lt;/p&gt;

&lt;p&gt;for table_name in table_names:&lt;br&gt;
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(&lt;br&gt;
        database="av_financial_analysis",&lt;br&gt;
        table_name=table_name,&lt;br&gt;
        transformation_ctx="S3bucket_node1",&lt;br&gt;
    )&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;ApplyMapping_node2 = ApplyMapping.apply(
    frame=S3bucket_node1,
    mappings=[
        ("`1. information`", "string", "`1. information`", "string"),
        ("`2. symbol`", "string", "`2. symbol`", "string"),
        ("`3. last refreshed`", "string", "`3. last refreshed`", "date"), 
        ("`4. output size`", "string", "`4. output size`", "string"),
        ("`5. time zone`", "string", "`5. time zone`", "string"),
        ("`1. open`", "string", "`1. open`", "double"),
        ("`2. high`", "string", "`2. high`", "double"),
        ("`3. low`", "string", "`3. low`", "double"),
        ("`4. close`", "string", "`4. close`", "double"),
        ("`5. volume`", "string", "`5. volume`", "bigint"),
        ("date", "string", "date", "date"), 
        ("partition_0", "string", "partition_0", "string"),
    ],
    transformation_ctx="ApplyMapping_node2",
)


df = ApplyMapping_node2.toDF()

# Group by the 'symbol' column and calculate the mean, min, max of the specified columns
grouped_df = df.groupBy("`2. symbol`").agg(
    F.mean("`1. open`").alias("average_open"),
    F.min("`1. open`").alias("min_open"),
    F.max("`1. open`").alias("max_open"),

    F.mean("`4. close`").alias("average_close"),
    F.min("`4. close`").alias("min_close"),
    F.max("`4. close`").alias("max_close"),

    F.mean("`2. high`").alias("average_high"),
    F.min("`2. high`").alias("min_high"),
    F.max("`2. high`").alias("max_high"),

    F.mean("`3. low`").alias("average_low"),
    F.min("`3. low`").alias("min_low"),
    F.max("`3. low`").alias("max_low"),
)

# Convert back to DynamicFrame
grouped_dyf = DynamicFrame.fromDF(grouped_df, glueContext, "grouped_dyf")

glueContext.write_dynamic_frame.from_options(
    frame = grouped_dyf,
    connection_type = "s3",
    connection_options = {"path": f"s3://av-financial-analysis-bucket/output/{table_name}"},
    format = "csv",
)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;job.commit()&lt;br&gt;
&lt;/p&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Data Visualization&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Redshift reads the data from the catalog and creates views for visualization. Here are some examples of the views that can be created:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Stock performance comparison view: This view compares the daily performance of three stocks (AAPL, IBM, and MSFT). It includes columns for date, stock symbol, opening price, closing price, highest price, lowest price, and stock volume.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Daily stock statistics view: This view calculates daily statistics for each stock, such as the percentage change between the opening and closing price, the difference between the highest and lowest price, and the total volume of shares traded.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stock trends view: This view plots stock trends over a period of time. For example, it can calculate the moving average of closing prices over 7 days, 30 days, and 90 days for each stock.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Most-traded shares view: This view ranks stocks according to the total volume of shares traded each day. This can help identify the most popular or active stocks on the market.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stock correlation view: This view examines the correlation between the price movements of different stocks. For example, if the price of the AAPL share rises, does the price of the IBM share also rise?&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Choosing Tools &amp;amp; Frameworks&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The tools and frameworks were chosen based on their integration with AWS and their ability to handle the tasks required for this pipeline. Terraform was chosen for its infrastructure as code capabilities, allowing for easy deployment and management of the pipeline.&lt;/p&gt;

&lt;h2&gt;
  
  
  Future Work &amp;amp; Improvements
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Regularly monitor and optimize your pipeline to ensure it remains efficient as your data grows.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Implement proper error handling and alerting mechanisms to quickly identify and resolve any issues.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setup
&lt;/h2&gt;

&lt;p&gt;To get started with this project, follow the steps below:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Ensure you have configured your AWS environment using aws configure.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Clone the project repository to your local machine using the following command: git clone &lt;a href="https://github.com/Stefen-Taime/etl_onaws_deploy_with_terraform.git" rel="noopener noreferrer"&gt;https://github.com/Stefen-Taime/etl_onaws_deploy_with_terraform.git&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Navigate to the project directory: cd etl_onaws_deploy_with_terraform&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Familiarize yourself with the project structure using the tree command. The project structure should look like this:&lt;/p&gt;

&lt;p&gt;.&lt;br&gt;
├── glue&lt;br&gt;
│   ├── crawler&lt;br&gt;
│   │   └── main.tf&lt;br&gt;
│   └── job&lt;br&gt;
│       ├── glue_job.py&lt;br&gt;
│       └── main.tf&lt;br&gt;
├── lambda_functions&lt;br&gt;
│   ├── lambda1&lt;br&gt;
│   │   ├── deploy.sh&lt;br&gt;
│   │   ├── lambda_function.py&lt;br&gt;
│   │   ├── main.tf&lt;br&gt;
│   │   └── requirements.txt&lt;br&gt;
│   └── lambda2&lt;br&gt;
│       ├── deploy.sh&lt;br&gt;
│       ├── lambda_function.py&lt;br&gt;
│       └── main.tf&lt;br&gt;
├── main.tf&lt;br&gt;
├── outputs.tf&lt;br&gt;
├── redshift&lt;br&gt;
│   ├── network.tf&lt;br&gt;
│   ├── outputs.tf&lt;br&gt;
│   ├── provider.tf&lt;br&gt;
│   ├── redshift-cluster.tf&lt;br&gt;
│   ├── redshift-iam.tf&lt;br&gt;
│   ├── security-group.tf&lt;br&gt;
│   ├── terraform.tfstate&lt;br&gt;
│   ├── terraform.tfvars&lt;br&gt;
│   └── variables.tf&lt;br&gt;
├── s3&lt;br&gt;
│   └── main.tf&lt;br&gt;
└── variables.tf&lt;/p&gt;
&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Package the Lambda1 function. Navigate to the Lambda1 directory (cd lambda_functions/lambda1), grant execute permissions to the deployment script (chmod a+x deploy.sh), and run the script (./deploy.sh). You should see a deployment_package.zip file generated.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Return to the root directory of the project. At this point, we will deploy the first two essential modules for data ingestion: the Lambda1 function and the S3 bucket. In the main.tf file located at the root of the project, you can keep only the S3 and Lambda1 modules and comment out or temporarily remove the rest.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Once ready, run terraform init to initialize your Terraform workspace, followed by terraform plan. At this stage, you will need to enter the ARN of the Lambda function. You can enter an example ARN, such as MyArnLambda.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;After that, run terraform apply -var="account_id=". You can find your account ID in the AWS console at the top right. If everything goes well, you should see an output similar to this image:&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2AMQCNMiBRCjXrPQNsR_2_tQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2AMQCNMiBRCjXrPQNsR_2_tQ.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Go to the AWS console and execute the Lambda function. Check the S3 bucket, and you should see three folders: aapl, ibm, msft.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2572%2F1%2AgBqMgz2F394h0Xp3V6PV8A.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2572%2F1%2AgBqMgz2F394h0Xp3V6PV8A.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3584%2F1%2AyfoHRj_E_EFMSta1Srcbzg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3584%2F1%2AyfoHRj_E_EFMSta1Srcbzg.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Once deployed, return to the main.tf file at the root of the project and uncomment the Module2 section, which includes the Glue Crawler and Glue Job. Run terraform init, terraform plan, and terraform apply again.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;The output should look like this:&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2ATlQjxn_zExGIWD6I7qF98Q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2ATlQjxn_zExGIWD6I7qF98Q.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3122%2F1%2ASqN7rOTzxjyNZvUT8fJ64Q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3122%2F1%2ASqN7rOTzxjyNZvUT8fJ64Q.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Deploy Module3, which includes the Lambda2 function. Do the same as you did with Lambda1: navigate to the Lambda2 directory (cd lambda_functions/lambda2), grant execute permissions to the deployment script (chmod a+x deploy.sh), and run the script (./deploy.sh).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Return to the root of the project and run terraform init, terraform plan, and terraform apply.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2004%2F1%2Aj3wTYloA20vH8YDImNrptw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2004%2F1%2Aj3wTYloA20vH8YDImNrptw.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Once deployed, go to the AWS Lambda console and execute the second function. It should trigger the Crawler and the Glue Job. You can verify this by checking the image:&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3010%2F1%2Adkwrb7BAiu17k7Hd3rZyng.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3010%2F1%2Adkwrb7BAiu17k7Hd3rZyng.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3532%2F1%2ApIiyJB0Bvk0m6i709N7N9w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3532%2F1%2ApIiyJB0Bvk0m6i709N7N9w.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2AJNs3aKKIbvpeK5zN7yvOJQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2AJNs3aKKIbvpeK5zN7yvOJQ.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3044%2F1%2AcbtjpE65vhL2DnWy6lNncQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3044%2F1%2AcbtjpE65vhL2DnWy6lNncQ.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Note: The Glue Job execution may fail. This is because the Glue Job executes at the same time as the Crawler, and the Glue Job uses the catalog created by the Crawler. The error occurs because the catalog is not found as it is being created at the same time. The solution is to manually re-execute the Glue Job by clicking on ‘Run’ at the top right.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3640%2F1%2ApYQwosNmAnxi5Ei1U49Akg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3640%2F1%2ApYQwosNmAnxi5Ei1U49Akg.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Once all these are executed, you should have an output folder in the bucket and a database containing three tables in the catalog.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3094%2F1%2AvHtC4Aeu6j44oiBR9orOKA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3094%2F1%2AvHtC4Aeu6j44oiBR9orOKA.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;The final step of the project will be to deploy the Redshift cluster. To do this, navigate to the redshift directory and fill in your AWS key and ID in the terraform.tfvars file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Still in the redshift directory, run terraform init, terraform plan, and terraform apply.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2132%2F1%2AWCzKqUiowWPRVBz1XqLk1Q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2132%2F1%2AWCzKqUiowWPRVBz1XqLk1Q.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Once deployed, connect to the data catalog in Redshift, which contains three tables. You can create scripts for various views such as comparison of stock performance, daily stock statistics, stock trends, most traded stocks, and correlation between stocks.&lt;/p&gt;

&lt;p&gt;SELECT &lt;br&gt;
  "date",&lt;br&gt;
  symbol, &lt;br&gt;
  "1. open" as "open", &lt;br&gt;
  "4. close" as "close", &lt;br&gt;
  "2. high" as "high", &lt;br&gt;
  "3. low" as "low", &lt;br&gt;
  "5. volume" as "volume"&lt;br&gt;
FROM &lt;br&gt;
  (&lt;br&gt;
    SELECT &lt;br&gt;
      "date", &lt;br&gt;
      'AAPL' as symbol, &lt;br&gt;
      "1. open", &lt;br&gt;
      "4. close", &lt;br&gt;
      "2. high", &lt;br&gt;
      "3. low", &lt;br&gt;
      "5. volume"&lt;br&gt;
    FROM &lt;br&gt;
      test.aapl&lt;br&gt;
    UNION ALL&lt;br&gt;
    SELECT &lt;br&gt;
      "date", &lt;br&gt;
      'IBM' as symbol, &lt;br&gt;
      "1. open", &lt;br&gt;
      "4. close", &lt;br&gt;
      "2. high", &lt;br&gt;
      "3. low", &lt;br&gt;
      "5. volume"&lt;br&gt;
    FROM &lt;br&gt;
      test.ibm&lt;br&gt;
    UNION ALL&lt;br&gt;
    SELECT &lt;br&gt;
      "date", &lt;br&gt;
      'MSFT' as symbol, &lt;br&gt;
      "1. open", &lt;br&gt;
      "4. close", &lt;br&gt;
      "2. high", &lt;br&gt;
      "3. low", &lt;br&gt;
      "5. volume"&lt;br&gt;
    FROM &lt;br&gt;
      test.msft&lt;br&gt;
  ) &lt;br&gt;
ORDER BY &lt;br&gt;
  "date", &lt;br&gt;
  symbol;&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3772%2F1%2AJcGy7hLdVJex2YrlFlPEIQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F3772%2F1%2AJcGy7hLdVJex2YrlFlPEIQ.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Once finished, return to the redshift directory and destroy the infrastructure redshift with terraform destroy. This is crucial to avoid additional costs.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2ACMJg0kirNHlYN1mJ9CMT-w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fcdn-images-1.medium.com%2Fmax%2F2000%2F1%2ACMJg0kirNHlYN1mJ9CMT-w.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Also, in the root of the project, run terraform destroy to destroy the Lambda functions, S3 bucket, Crawler, and Glue. You may encounter an error saying that the bucket is not empty. Just empty the bucket manually and try again.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Congratulations! You are now capable of managing a deployment on AWS with Terraform.&lt;/p&gt;

&lt;h2&gt;
  
  
  Important Note on Costs
&lt;/h2&gt;

&lt;p&gt;Remember, AWS services are not free, and costs can accumulate over time. It’s crucial to destroy your environment when you’re done using it to avoid unnecessary charges. You can do this by running terraform destroy in your terminal. Please note that I am not responsible for any costs associated with running this pipeline in your AWS environment.&lt;/p&gt;

</description>
      <category>softwareengineering</category>
      <category>dataengineering</category>
      <category>aws</category>
      <category>terraform</category>
    </item>
    <item>
      <title>Building a Modern Data Pipeline: A Deep Dive into Terraform, AWS Lambda and S3, Snowflake, DBT, Mage AI, and Dash</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 26 Jun 2023 05:44:10 +0000</pubDate>
      <link>https://dev.to/stefentaime/building-a-modern-data-pipeline-a-deep-dive-into-terraform-aws-lambda-and-s3-snowflake-dbt-mage-ai-and-dash-3jng</link>
      <guid>https://dev.to/stefentaime/building-a-modern-data-pipeline-a-deep-dive-into-terraform-aws-lambda-and-s3-snowflake-dbt-mage-ai-and-dash-3jng</guid>
      <description>&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/building-a-modern-data-pipeline-a-deep-dive-into-terraform-aws-lambda-and-s3-snowflake-dbt-cac6816f2100"&gt;https://medium.com/@stefentaime_10958/building-a-modern-data-pipeline-a-deep-dive-into-terraform-aws-lambda-and-s3-snowflake-dbt-cac6816f2100&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---VEI-4-W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/7vwan3laz6wa4gxb05bm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---VEI-4-W--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/7vwan3laz6wa4gxb05bm.png" alt="Image description" width="800" height="587"&gt;&lt;/a&gt;&lt;a href="https://link.medium.com/eVU7N3rcWAb"&gt;https://link.medium.com/eVU7N3rcWAb&lt;/a&gt;&lt;/p&gt;

</description>
      <category>mageai</category>
      <category>dbt</category>
      <category>terraform</category>
      <category>snowflake</category>
    </item>
    <item>
      <title>Creating a Election Monitoring System Using MongoDB, Spark, Twilio SMS Notifications, and Dash</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Tue, 13 Jun 2023 00:55:25 +0000</pubDate>
      <link>https://dev.to/stefentaime/creating-a-election-monitoring-system-using-mongodb-spark-twilio-sms-notifications-and-dash-504o</link>
      <guid>https://dev.to/stefentaime/creating-a-election-monitoring-system-using-mongodb-spark-twilio-sms-notifications-and-dash-504o</guid>
      <description>&lt;h2&gt;
  
  
  Creating a Election Monitoring System Using MongoDB, Spark, Twilio SMS Notifications, and Dash
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hid8GfSo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3720/1%2AsjwZvn23nt0X6cuFt8pM0w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hid8GfSo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3720/1%2AsjwZvn23nt0X6cuFt8pM0w.png" alt="" width="800" height="373"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this article, we present a proof-of-concept (POC) for an innovative solution that tackles this challenge in the context of election monitoring. This solution was devised for a government that approached a young digital company specializing in data, with a desire to make election results more transparent, accessible, and real-time.&lt;/p&gt;

&lt;p&gt;The system proposed is designed to ingest voter data, process and analyze it, alert the media and concerned parties via SMS once the results are ready, and finally display the results on an interactive map via a Dash application.&lt;/p&gt;

&lt;h2&gt;
  
  
  The Data Pipeline
&lt;/h2&gt;

&lt;p&gt;In this context, the Spark cluster is set up with a worker node, which will execute the tasks assigned by the Spark master. This setup allows for efficient handling of data processing tasks, which can be split among multiple worker nodes if necessary.&lt;/p&gt;

&lt;p&gt;The data the system processes come from an intriguing source: a synthetic dataset of voting records. A script using the Python library Faker generates this data, imitating realistic voting behavior across all US states and the District of Columbia. The synthetic data is stored in MongoDB, a popular NoSQL database known for its flexibility and scalability, making it an excellent choice for handling large datasets like voting records.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from datetime import datetime
from faker import Faker
from pymongo import MongoClient

# Init Faker
fake = Faker()

## Init MongoDB
client = MongoClient('mongodb://root:example@localhost:27017/')
db = client['admin']
collection = db['votes']

state_weights = {
    "Alabama": (0.60, 0.40),
    "Alaska": (0.55, 0.45),
    "Arizona": (0.15, 0.85),
    "Arkansas": (0.20, 0.80),
    "California": (0.15, 0.85),
    "Colorado": (0.70, 0.30),
    "Connecticut": (0.10, 0.90),
    "Delaware": (0.34, 0.66),
    "Florida": (0.82, 0.18),
    "Georgia": (0.95, 0.05),
    "Hawaii": (0.50, 0.50),
    "Idaho": (0.67, 0.33),
    "Illinois": (0.60, 0.40),
    "Indiana": ((0.15, 0.85)),
    "Iowa": (0.45, 0.55),
    "Kansas": (0.40, 0.60),
    "Kentucky": (0.62, 0.38),
    "Louisiana": (0.58, 0.42),
    "Maine": (0.60, 0.40),
    "Maryland": (0.55, 0.45),
    "Massachusetts": (0.63, 0.37),
    "Michigan": (0.62, 0.38),
    "Minnesota": (0.61, 0.39),
    "Mississippi": (0.41, 0.59),
    "Missouri": (0.60, 0.40),
    "Montana": (0.57, 0.43),
    "Nebraska": (0.56, 0.44),
    "Nevada": (0.55, 0.45),
    "New Hampshire": (0.54, 0.46),
    "New Jersey": (0.53, 0.47),
    "New Mexico": (0.52, 0.48),
    "New York": (0.51, 0.49),
    "North Carolina": (0.50, 0.50),
    "North Dakota": (0.05, 0.95),
    "Ohio": (0.58, 0.42),
    "Oklahoma": (0.57, 0.43),
    "Oregon": (0.56, 0.44),
    "Pennsylvania": (0.55, 0.45),
    "Rhode Island": (0.50, 0.50),
    "South Carolina": (0.53, 0.47),
    "South Dakota": (0.48, 0.52),
    "Tennessee": (0.51, 0.49),
    "Texas": (0.60, 0.40),
    "Utah": (0.59, 0.41),
    "Vermont": (0.58, 0.42),
    "Virginia": (0.57, 0.43),
    "Washington": (0.44, 0.56),
    "West Virginia": (0.55, 0.45),
    "Wisconsin": (0.46, 0.54),
    "Wyoming": (0.53, 0.47),
    "District of Columbia": ((0.15, 0.85))  
}

def generate_vote(state):
    weights = state_weights.get(state, (0.50, 0.50))  # Get the weights for the state, or use (0.50, 0.50) as a default
    vote = {
        "voting_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),  # Updated line
        "voter": {
            "voter_id": str(fake.unique.random_number(digits=9)),
            "first_name": fake.first_name(),
            "last_name": fake.last_name(),
            "address": {
                "street": fake.street_address(),
                "city": fake.city(),
                "state": state,
                "zip_code": fake.zipcode()
            },
            "birth_date": str(fake.date_of_birth(minimum_age=18, maximum_age=90)),
            "gender": fake.random_element(elements=('Male', 'Female')),
        },
        "vote": {
            "voting_date": "2023-06-06",
            "voting_location": fake.address(),
            "election": {
                "type": "Presidential Election",
                "year": "2023"
            },
            "vote_valid": "Yes",
            "voting_choice": {
                "party": fake.random_element(elements=('Republican', 'Democrat')),
            }
        }
    }

    return vote

# List of states
states = ["Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado",
          "Connecticut", "Delaware", "Florida", "Georgia", "Hawaii", "Idaho", "Illinois",
          "Indiana", "Iowa", "Kansas", "Kentucky", "Louisiana", "Maine", "Maryland",
          "Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri", "Montana",
          "Nebraska", "Nevada", "New Hampshire", "New Jersey", "New Mexico", "New York",
          "North Carolina", "North Dakota", "Ohio", "Oklahoma", "Oregon", "Pennsylvania",
          "Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah",
          "Vermont", "Virginia", "Washington", "West Virginia", "Wisconsin", "Wyoming",
          "District of Columbia"]


# Generate fake voting data for each state and insert into MongoDB
for state in states:
    for i in range(1, 61):
        vote = generate_vote(state)
        collection.insert_one(vote)

print(f"All votes saved to MongoDB")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;For each state, the synthetic data simulates voter choices based on predefined probabilities, reflecting historical voting patterns. This data, consisting of 60 voters for each state, serves as the input for the Spark processing system.&lt;/p&gt;

&lt;p&gt;The Spark system processes the data, determining the winning party in each state. It then calculates the percentage of votes each party has won. This critical information is then fed into an SMS notification system, alerting media outlets and the relevant parties with real-time election results.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: '3.1'
services:

  # ===================== #
  #     Apache Spark      #
  # ===================== #
  spark:
    image: bitnami/spark:3.3.1
    environment:
      - SPARK_MODE=master
    ports:
      - '8080:8080'
      - '7077:7077'
    volumes:
      - ./data:/data
      - ./src:/src
  spark-worker:
    image: bitnami/spark:3.3.1
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_EXECUTOR_MEMORY=4G
      - SPARK_WORKER_CORES=4
    ports:
      - '8081:8081'
    volumes:
      - ./data:/data
      - ./src:/src

  # ===================== #
  #        MongoDB        #
  # ===================== #
  mongo:
    image: mongo:4.4
    volumes:
      - ./mongo:/data/db
    ports:
      - '27017:27017'
    environment:
      - MONGO_INITDB_ROOT_USERNAME=root
      - MONGO_INITDB_ROOT_PASSWORD=example
  mongo-express:
    image: mongo-express
    ports:
      - '8091:8081'
    environment:
      - ME_CONFIG_MONGODB_ADMINUSERNAME=root
      - ME_CONFIG_MONGODB_ADMINPASSWORD=example
      - ME_CONFIG_MONGODB_SERVER=mongo
      - ME_CONFIG_MONGODB_PORT=27017
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Data Processing with PySpark (Job 1)
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Create a SparkSession: The code initiates a SparkSession, which is an entry point to any Spark functionality. When it starts, it connects to the MongoDB database where the data is stored.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Load Data: The code then reads the data from MongoDB and loads it into a DataFrame, which is a distributed collection of data organized into named columns. It’s similar to a table in a relational database and can be manipulated in similar ways.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data Processing: The code selects the relevant fields from the DataFrame (state, party, and validity of the vote), groups them by state and party, and counts the number of valid votes for each party in each state.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Find Winners: Next, the code finds the party with the most votes in each state. It does this by ranking the parties within each state based on the number of votes they got and then selecting the one with the highest rank (i.e., the one with the most votes).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Calculate Percentages: The code then calculates the percentage of votes each winning party got in its state. It does this by dividing the number of votes the winning party got by the total votes in that state and multiplying by 100.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Write Results: Finally, the code saves the results, which include the winning party and their vote percentage in each state, back to MongoDB but in a different collection called ‘election_results’.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;So in essence, this code processes voting records to determine the party that won the most votes in each state and calculates what percentage of the total votes in that state the winning party received. This analysis can give a clear picture of the distribution of votes in an election.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import count, expr, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .appName('MongoDBIntegration') \
    .config("spark.mongodb.input.uri", "mongodb://root:example@mongo:27017/admin.votes") \
    .getOrCreate()

# Load the MongoDB data into a DataFrame
df = spark.read.format("mongo").load()

# Extract relevant fields and group by state and party
result = df.select(
    df["voter.address.state"].alias("state"),
    df["vote.voting_choice.party"].alias("party"),
    df["vote.vote_valid"].alias("validity")
).where(col("validity") == "Yes").groupby("state", "party").agg(count("validity").alias("votes"))

# Find the party with the most votes in each state
winners = result.withColumn("rn", F.row_number().over(Window.partitionBy("state").orderBy(F.desc("votes")))).filter(col("rn") == 1).drop("rn")

# Calculate the percentage of victory
total_votes = result.groupby("state").agg(F.sum("votes").alias("total_votes"))
winners_with_percentage = winners.join(total_votes, "state").withColumn("percentage", (col("votes") / col("total_votes")) * 100)

# Write the result to MongoDB
winners_with_percentage.write.format("mongo").mode("overwrite").option("spark.mongodb.output.uri", "mongodb://root:example@mongo:27017/admin.election_results").save()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Output:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{&lt;br&gt;
    _id: ObjectId('64873b3df42ba41d32f3d1a6'),&lt;br&gt;
    state: 'Utah',&lt;br&gt;
    party: 'Republican',&lt;br&gt;
    votes: 127,&lt;br&gt;
    total_votes: 240,&lt;br&gt;
    percentage: 52.916666666666664&lt;br&gt;
}&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Data Processing with PySpark (Job 2)&lt;br&gt;
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Create a SparkSession and Load Data: The script starts a SparkSession and then loads data from a MongoDB collection.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Set Electoral Votes by State: The United States uses a system called the Electoral College to decide the outcome of presidential elections. Each state has a number of votes in the Electoral College that is largely proportional to its population. This script creates a dictionary that maps each state to its number of electoral votes. Then it converts this dictionary into a DataFrame.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Join Electoral Votes with Election Data: The script combines the election results data with the electoral votes data, based on the state name. This gives us a DataFrame where each row has the state name, the party, the votes that party received, and the number of electoral votes that state has.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Calculate Nationwide Votes: The script calculates the total votes received by each party nationwide.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Identify the Nationwide Winner: The script determines the party that got the most votes nationwide.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Calculate Maximum State Votes and Handle Ties: The script identifies the maximum number of votes received in each state and handles ties by giving the electoral votes to the nationwide winner.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Calculate Total Grand Electors for Each Party: The script then calculates the total number of electoral votes (“grand electors”) each party received nationwide, considering the rule of tie-breaking.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Save the Results: The script saves the electoral votes results back to MongoDB.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Notify the Results via SMS: Using Twilio, an online messaging service, the script then sends an SMS with the election results. The results are formatted as a string which includes each party and the number of electoral votes they won.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Stop the SparkSession: Lastly, the script stops the SparkSession, releasing its resources.&lt;/p&gt;

&lt;p&gt;from pyspark.sql import SparkSession&lt;br&gt;
from pyspark.sql.functions import max&lt;br&gt;
from pyspark.sql import functions as F&lt;br&gt;
from pyspark.sql.window import Window&lt;br&gt;
from twilio.rest import Client&lt;/p&gt;

&lt;p&gt;spark = SparkSession.builder \&lt;br&gt;
    .appName("ElectionResults") \&lt;br&gt;
    .config("spark.mongodb.input.uri", "mongodb://root:example@mongo:27017/admin.election_results") \&lt;br&gt;
    .getOrCreate()&lt;/p&gt;

&lt;p&gt;df = spark.read.format("mongo").load()&lt;/p&gt;
&lt;h1&gt;
  
  
  create a dictionary of grand electors by state
&lt;/h1&gt;

&lt;p&gt;electors_dict = {&lt;br&gt;
    "Alabama": 9,&lt;br&gt;
    "Alaska": 3,&lt;br&gt;
    "Arizona": 11,&lt;br&gt;
    "Arkansas": 6,&lt;br&gt;
    "California": 55,&lt;br&gt;
    "Colorado": 9,&lt;br&gt;
    "Connecticut": 7,&lt;br&gt;
    "Delaware": 3,&lt;br&gt;
    "Florida": 29,&lt;br&gt;
    "Georgia": 16,&lt;br&gt;
    "Hawaii": 4,&lt;br&gt;
    "Idaho": 4,&lt;br&gt;
    "Illinois": 20,&lt;br&gt;
    "Indiana": 11,&lt;br&gt;
    "Iowa": 6,&lt;br&gt;
    "Kansas": 6,&lt;br&gt;
    "Kentucky": 8,&lt;br&gt;
    "Louisiana": 8,&lt;br&gt;
    "Maine": 4,&lt;br&gt;
    "Maryland": 10,&lt;br&gt;
    "Massachusetts": 11,&lt;br&gt;
    "Michigan": 16,&lt;br&gt;
    "Minnesota": 10,&lt;br&gt;
    "Mississippi": 6,&lt;br&gt;
    "Missouri": 10,&lt;br&gt;
    "Montana": 3,&lt;br&gt;
    "Nebraska": 5,&lt;br&gt;
    "Nevada": 6,&lt;br&gt;
    "New Hampshire": 4,&lt;br&gt;
    "New Jersey": 14,&lt;br&gt;
    "New Mexico": 5,&lt;br&gt;
    "New York": 29,&lt;br&gt;
    "North Carolina": 15,&lt;br&gt;
    "North Dakota": 3,&lt;br&gt;
    "Ohio": 18,&lt;br&gt;
    "Oklahoma": 7,&lt;br&gt;
    "Oregon": 7,&lt;br&gt;
    "Pennsylvania": 20,&lt;br&gt;
    "Rhode Island": 4,&lt;br&gt;
    "South Carolina": 9,&lt;br&gt;
    "South Dakota": 3,&lt;br&gt;
    "Tennessee": 11,&lt;br&gt;
    "Texas": 38,&lt;br&gt;
    "Utah": 6,&lt;br&gt;
    "Vermont": 3,&lt;br&gt;
    "Virginia": 13,&lt;br&gt;
    "Washington": 12,&lt;br&gt;
    "West Virginia": 5,&lt;br&gt;
    "Wisconsin": 10,&lt;br&gt;
    "Wyoming": 3,&lt;br&gt;
    "District of Columbia": 3&lt;br&gt;
}&lt;/p&gt;
&lt;h1&gt;
  
  
  Convert dictionary to DataFrame
&lt;/h1&gt;

&lt;p&gt;electors_df = spark.createDataFrame([(k, v) for k, v in electors_dict.items()], ["state", "electors"])&lt;/p&gt;

&lt;p&gt;df = df.join(electors_df, on="state", how="inner")&lt;/p&gt;

&lt;p&gt;nationwide_df = df.groupBy("party").agg(F.sum("votes").alias("total_votes"))&lt;/p&gt;

&lt;p&gt;nationwide_winner = nationwide_df.orderBy(F.desc("total_votes")).first()[0]&lt;/p&gt;
&lt;h1&gt;
  
  
  Identify maximum votes in each state
&lt;/h1&gt;

&lt;p&gt;state_max_df = df.groupBy("state").agg(max("votes").alias("max_votes"))&lt;/p&gt;

&lt;p&gt;df = df.join(state_max_df, on="state", how="inner")&lt;/p&gt;

&lt;p&gt;window = Window.partitionBy(df['state'])&lt;/p&gt;

&lt;p&gt;from pyspark.sql.functions import when&lt;/p&gt;

&lt;p&gt;df = df.withColumn('winners', F.sum(when(df.votes == df.max_votes, 1).otherwise(0)).over(window))&lt;/p&gt;

&lt;p&gt;df = df.withColumn('final_party', when(df.winners &amp;gt; 1, nationwide_winner).otherwise(df.party))&lt;/p&gt;

&lt;p&gt;result_df = df.groupBy("final_party").sum("electors")&lt;/p&gt;
&lt;h1&gt;
  
  
  Save the result to MongoDB
&lt;/h1&gt;

&lt;p&gt;result_df.write.format("mongo").option("uri", "mongodb://root:example@mongo:27017/admin.election_results_out").mode("overwrite").save()&lt;/p&gt;

&lt;p&gt;account_sid = ''&lt;br&gt;
auth_token = ''&lt;br&gt;
client = Client(account_sid, auth_token)&lt;/p&gt;

&lt;p&gt;result = result_df.collect()&lt;/p&gt;

&lt;p&gt;result_str = "\n".join([f"{row['final_party']}: {row['sum(electors)']} electors" for row in result])&lt;/p&gt;

&lt;p&gt;message_body = f"Dear recipient, \n\nWe are pleased to share with you the final election results:\n\n{result_str}\n\nWe would like to express our gratitude for your patience and interest in our democratic process. For more detailed results, please visit our official website.\n\nBest regards,\n[Election Committee]"&lt;/p&gt;

&lt;p&gt;message = client.messages.create(&lt;br&gt;
    from_='',&lt;br&gt;
    body=message_body,&lt;br&gt;
    to=''&lt;br&gt;
)&lt;/p&gt;

&lt;p&gt;print(f"Message sent with id {message.sid}")&lt;/p&gt;
&lt;h1&gt;
  
  
  Stop the SparkSession
&lt;/h1&gt;

&lt;p&gt;spark.stop()&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Output:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{&lt;br&gt;
    _id: ObjectId('6487445c358709227a7e9c71'),&lt;br&gt;
    final_party: 'Republican',&lt;br&gt;
    'sum(electors)': 201&lt;br&gt;
}

&lt;p&gt;{&lt;br&gt;
    _id: ObjectId('6487445c358709227a7e9c72'),&lt;br&gt;
    final_party: 'Democrat',&lt;br&gt;
    'sum(electors)': 337&lt;br&gt;
}&lt;br&gt;
&lt;/p&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Notification of Results&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---vOBZzFZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2160/1%2ADXpFgmETQw5WMKybNM4V_w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---vOBZzFZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2160/1%2ADXpFgmETQw5WMKybNM4V_w.png" alt="" width="800" height="992"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Visualization with Dash
&lt;/h2&gt;

&lt;p&gt;The final step involves visualizing the results using Dash, a productive Python framework for building web analytic applications. It allows us to construct an interactive map of the United States, where each state is colored according to the party that won the majority of votes: blue for Democrats and red for Republicans. This enables users to easily and intuitively understand the election results.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Connect to a Database: The script connects to a database (specifically MongoDB) where the election results are stored.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define the Geographic Data: The script contains a list of states with their latitude and longitude coordinates. This data will help to plot each state accurately on the map.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create a State Name to Abbreviation Dictionary: This dictionary is used to map full state names to their abbreviations (like “New York” to “NY”), because the map uses abbreviations.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Set Up the Application: The script sets up an app using a framework called Dash, which helps in building interactive web applications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define the Application Layout: The layout of the app is defined to include a graphical element (a map in this case).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Update the Map: A function is defined that updates the map each time it’s called. This function does a few things:&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;a. Get Election Results: The function fetches the election results from the database.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;b. Process Results: It processes these results to extract the necessary data. For each state, it gets the party that won and the percentage of votes that party received. Parties are assigned a numerical value to color-code them later (0 for Republican and 1 for Democrat).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;c. Prepare Hover Text: This is the text that appears when you hover over a state on the map. It shows the party that won and the percentage of votes they received.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;d. Create the Map: The function creates a map of the United States, with each state color-coded based on the party that won there (blue for Democrats and red for Republicans).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;e. Add Legends: Legends are added to the map to indicate which color corresponds to which party.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;f. Adjust the Layout: Finally, the function adjusts the layout of the map and returns it. The map is displayed in the web application.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hid8GfSo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3720/1%2AsjwZvn23nt0X6cuFt8pM0w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hid8GfSo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3720/1%2AsjwZvn23nt0X6cuFt8pM0w.png" alt="" width="800" height="373"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I hope this guide will give you a better understanding of how MongoDB, PySpark, Twilio and Dash can be used to build an efficient, high-performance data pipeline.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/creating-a-real-time-election-monitoring-system-using-mongodb-spark-sms-notifications-and-dash-e3b276180dcb"&gt;Medium&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--FjJ8ySJi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/auzp3iide37b21wky9sx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--FjJ8ySJi--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://dev-to-uploads.s3.amazonaws.com/uploads/articles/auzp3iide37b21wky9sx.png" alt="Image description" width="800" height="373"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
      <category>data</category>
      <category>spark</category>
      <category>dash</category>
      <category>mongodb</category>
    </item>
    <item>
      <title>End to end data engineering project with Spark, Mongodb, Minio, postgres and Metabase</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:25:20 +0000</pubDate>
      <link>https://dev.to/stefentaime/end-to-end-data-engineering-project-with-spark-mongodb-minio-postgres-and-metabase-3b36</link>
      <guid>https://dev.to/stefentaime/end-to-end-data-engineering-project-with-spark-mongodb-minio-postgres-and-metabase-3b36</guid>
      <description>&lt;h3&gt;
  
  
  Utilizing of open source technologies for the implementation of a data pipeline
&lt;/h3&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--lOnz1TWm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2708/1%2A9e6xb4j1RO9SPpPKdPea4w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--lOnz1TWm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2708/1%2A9e6xb4j1RO9SPpPKdPea4w.png" alt="" width="800" height="415"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Source Code
&lt;/h2&gt;

&lt;p&gt;All the source code demonstrated in this post is open-source and available on &lt;a href="%5Bhttps://github.com/Stefen-Taime/projet_data%5D(https://github.com/Stefen-Taime/projet_data)"&gt;GitHub&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;git clone &lt;a href="https://github.com/Stefen-Taime/projet_data.git"&gt;https://github.com/Stefen-Taime/projet_data.git&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;As a prerequisite for this post, you will need to create the following resources:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;(1) Linux Machine;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;(1) Docker ;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;(1) Docker Compose;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;(1) Virtualenv;&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;h3&gt;
  
  
  Setup
&lt;/h3&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;git clone https://github.com/Stefen-Taime/projet_data.git
cd projet_data/extractor

pip install -r requirements.txt
python main.py

or

docker build --tag=extractor .
docker-compose up run

#This folder contains code used to create a downlaods folder, iteratively download files from a list of uris, unzip them and delete zip files.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;At this point you should have in the extractor directory with a new folder Dowloads with 2 csv files&lt;/p&gt;

&lt;h3&gt;
  
  
  then
&lt;/h3&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;cd ..
cd docker

docker-compose -f docker-compose-nosql.yml up -d  #for mongodb
docker-compose -f docker-compose-sql.yml up -d    #for postgres and adminer port 8085, metabase port 3000
docker-compose -f docker-compose-s3.yml up -d     #for minio port 9000
docker-compose -f docker-compose-spark.yml up -d  #for spark master and jupyter notebook port 8888


cd ..
cd loader
pip install -r requirements.txt

# ! modify the path DATA and DATA_FOR_MONGODB variables in .env

python loader.py mongodb  #upload data in mongodb database (if you have an error, manually create an auto-mpg database and enter an auto collection and try again)
python loader.py minio    #upload data in minio(if you have an error, manually create a landing compartment and try again)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;He must have now an auto-mpg database and inside an auto collection with data in it for mongodb and also data in minio&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--oEz2F3fr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3520/1%2Aj00Z9SvxjH62D21N2KLF7w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--oEz2F3fr--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3520/1%2Aj00Z9SvxjH62D21N2KLF7w.png" alt="" width="800" height="408"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--fT3ahGwx--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3784/1%2AMgnNor5dNPvEibVgEP77XQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--fT3ahGwx--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3784/1%2AMgnNor5dNPvEibVgEP77XQ.png" alt="" width="800" height="339"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  then
&lt;/h3&gt;

&lt;p&gt;go to localhost 8888 and the password is “stefen”.once in jupyter notebook run all cells&lt;/p&gt;

&lt;p&gt;go to localhost 8085&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KvBwE6zP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3784/1%2AJoNepCQpLl6nS2MYoBsDWQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KvBwE6zP--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3784/1%2AJoNepCQpLl6nS2MYoBsDWQ.png" alt="" width="800" height="342"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;go to localhost 3000&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--f8DzqoJ9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3754/1%2ApJ3ARMpJJ68j8hdpPK49IQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--f8DzqoJ9--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3754/1%2ApJ3ARMpJJ68j8hdpPK49IQ.png" alt="" width="800" height="302"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Cleaning Up
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/end-to-end-data-engineering-project-with-spark-mongodb-minio-postgres-and-metabase-2c400672b50d"&gt;https://medium.com/@stefentaime_10958/end-to-end-data-engineering-project-with-spark-mongodb-minio-postgres-and-metabase-2c400672b50d&lt;/a&gt;&lt;/p&gt;

</description>
      <category>spark</category>
      <category>postgres</category>
      <category>metabase</category>
      <category>mongodb</category>
    </item>
    <item>
      <title>ELT Airflow Pipeline Project</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:23:29 +0000</pubDate>
      <link>https://dev.to/stefentaime/elt-airflow-pipeline-project-m5m</link>
      <guid>https://dev.to/stefentaime/elt-airflow-pipeline-project-m5m</guid>
      <description>&lt;h2&gt;
  
  
  About
&lt;/h2&gt;

&lt;p&gt;Project using data engineering concepts.&lt;/p&gt;

&lt;p&gt;The project is an ELT (Extract, Load, Transform) data pipeline, orchestrated with Apache Airflow through Docker containers.&lt;/p&gt;

&lt;p&gt;Faker is used as a package to generate data to a mysql database. The data is extracted from mysql, transformed with pandas and Sql and then loaded into an Olap postgres database. A notification is then sent by email once the whole process is completed.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--YMUfYI6u--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2708/1%2AmUtAQzioW_Ct5B4I5B8PDA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--YMUfYI6u--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2708/1%2AmUtAQzioW_Ct5B4I5B8PDA.png" alt="" width="800" height="460"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://docs.docker.com/get-docker/"&gt;Docker&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://docs.docker.com/compose/"&gt;Docker Compose&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://mailtrap.io/"&gt;Mailtrap Account&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Setup mailtrap
&lt;/h2&gt;

&lt;p&gt;One platform toTest, Send, Control your emails:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--aSbvHVsm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2406/1%2AlgSWIBGFZdXurBntiHwQjA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--aSbvHVsm--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2406/1%2AlgSWIBGFZdXurBntiHwQjA.png" alt="" width="800" height="494"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup
&lt;/h2&gt;

&lt;p&gt;Clone the project to your desired location:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ git clone https://github.com/Stefen-Taime/airflow_etl.git
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;fill the AIRFLOW_&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_USER, AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;&lt;em&gt;SMTP_PASSWORD, AIRFLOW&lt;/em&gt;&lt;em&gt;SMTP&lt;/em&gt;_SMTP_MAIL_FROM in .envExample file:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;AIRFLOW_ADMIN_MAIL=airflow
AIRFLOW_ADMIN_FIRSTNAME=airflow
AIRFLOW_ADMIN_NAME=airflow
AIRFLOW_ADMIN_PASSWORD=airflowpassword
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgres+psycopg2://airflow:airflowpassword@postgres:5432/airflow
AIRFLOW__CORE__FERNET_KEY=81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
AIRFLOW_CONN_METADATA_DB=postgres+psycopg2://airflow:airflowpassword@postgres:5432/airflow
AIRFLOW_VAR__METADATA_DB_SCHEMA=airflow
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC=5
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__SMTP__SMTP_HOST=smtp.mailtrap.io
AIRFLOW__SMTP__SMTP_PORT=2525
AIRFLOW__SMTP__SMTP_USER=xxxxxxxxxxx
AIRFLOW__SMTP__SMTP_PASSWORD=xxxxxxx
AIRFLOW__SMTP__SMTP_MAIL_FROM=your_email@gmail.com
AIRFLOW__WEBSERVER__BASE_URL=http://localhost:8080
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflowpassword
POSTGRES_DB=airflow
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_UID=1000
AIRFLOW_GID=0
PG_VER=14-alpine
POSTGRES_SRC_PASSWORD=Sup3rS3c3t
PORT=5432
POSTGRES_USER_OLAP=postgres
HOSTNAME=olap
ONTAINER_NAME=postgres
POSTGRES_DB_OLAP=postgres
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;grant permissions to the bash script:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;chmod a+x build_Services.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Bash:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ ./build_Services.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Build Docker:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker-compose up --build -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;When everything is done, you can check all the containers running:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker ps&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  oltp Interface&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Now you can access adminer web interface by going to &lt;a href="http://localhost:8085/"&gt;http://localhost:8085&lt;/a&gt; with the default user which is in the docker-compose.yml:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Système     MySQL&lt;br&gt;
Serveur     oltp&lt;br&gt;
user        root&lt;br&gt;
password    myrootpassword&lt;br&gt;&lt;br&gt;
Database    testdb&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  olap Interface&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Now you can access new adminer web interface by going to &lt;a href="http://localhost:8085/"&gt;http://localhost:8085&lt;/a&gt; with the default user which is in the docker-compose.yml:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Système     PostgesSQL&lt;br&gt;
Serveur     olap&lt;br&gt;
user        postgres&lt;br&gt;
password    Sup3rS3c3t&lt;br&gt;&lt;br&gt;
Database    postgres&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  Airflow Interface&lt;br&gt;
&lt;/h2&gt;

&lt;p&gt;Now you can access Airflow web interface by going to &lt;a href="http://localhost:8080/"&gt;http://localhost:8080&lt;/a&gt; with the default user which is in the docker-compose.yml. &lt;strong&gt;Username/Password: airflow/airflowpassword&lt;/strong&gt;:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--4uY42mjJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3834/1%2Ag5dOEFcUbrFUKJmvGEKBcA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--4uY42mjJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3834/1%2Ag5dOEFcUbrFUKJmvGEKBcA.png" alt="" width="800" height="244"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Airflow DAG
&lt;/h2&gt;

&lt;p&gt;Now you can run Airflow etl dag:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--Ti2qOw_g--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3762/1%2AfCb9ndT0iYOf3buibMW3Nw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--Ti2qOw_g--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3762/1%2AfCb9ndT0iYOf3buibMW3Nw.png" alt="" width="800" height="277"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Check oltp and olap database
&lt;/h2&gt;

&lt;p&gt;:)&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--NYvhKC8v--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3570/1%2ACLgjyVaoAliEWOEX582FTQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--NYvhKC8v--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3570/1%2ACLgjyVaoAliEWOEX582FTQ.png" alt="" width="800" height="269"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s---w1FXoUo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3376/1%2AotdNZLYZd7Sr5V0LDrSMlQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s---w1FXoUo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3376/1%2AotdNZLYZd7Sr5V0LDrSMlQ.png" alt="" width="800" height="369"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Check your &lt;a href="http://mailtrap.io/inboxes"&gt;mailtrap.io/inboxes&lt;/a&gt;
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--weTsSIqp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2260/1%2AiugJpfXCbJv5rVgyigWU3w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--weTsSIqp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2260/1%2AiugJpfXCbJv5rVgyigWU3w.png" alt="" width="800" height="239"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Shut down or restart Airflow
&lt;/h2&gt;

&lt;p&gt;If you need to make changes or shut down:&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ docker-compose down&lt;br&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
&lt;br&gt;
  &lt;br&gt;
  &lt;br&gt;
  References&lt;br&gt;
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html"&gt;Apache Airflow Documentation&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://api-docs.mailtrap.io/"&gt;The following documentation mailtrap&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;a href="https://faker.readthedocs.io/en/master/"&gt;Faker&lt;/a&gt;&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/elt-airflow-pipeline-project-dcf834c1be17"&gt;https://medium.com/@stefentaime_10958/elt-airflow-pipeline-project-dcf834c1be17&lt;/a&gt;&lt;/p&gt;

</description>
      <category>airflow</category>
      <category>pipeline</category>
      <category>etl</category>
    </item>
    <item>
      <title>Building a Scalable RSS Feed Pipeline with Apache Airflow, Kafka, and MongoDB, Flask Api</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:20:29 +0000</pubDate>
      <link>https://dev.to/stefentaime/building-a-scalable-rss-feed-pipeline-with-apache-airflow-kafka-and-mongodb-flask-api-52gi</link>
      <guid>https://dev.to/stefentaime/building-a-scalable-rss-feed-pipeline-with-apache-airflow-kafka-and-mongodb-flask-api-52gi</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--IIL5jIWK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2000/1%2ASCbdQEYl9SkUn-y51NK7rQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--IIL5jIWK--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2000/1%2ASCbdQEYl9SkUn-y51NK7rQ.png" alt="" width="500" height="500"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In today’s data-driven world, processing large volumes of data in real-time has become essential for many organizations. The Extract, Transform, Load (ETL) process is a common way to manage the flow of data between systems. In this article, we’ll walk through how to build a scalable ETL pipeline using Apache Airflow, Kafka, and Python, Mongo and Flask&lt;/p&gt;

&lt;p&gt;In this pipeline, the RSS feeds are scraped using a Python library called feedparser. This library is used to parse the XML data in the RSS feeds and extract the relevant information. The parsed data is then transformed into a standardized JSON format using Python's built-in json library. This format includes fields such as title, summary, link, published_date, and language, which make the data easier to analyze and consume.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;NEWS_FEEDS = {
        "en": [
            "https://www.cnn.com/rss/edition.rss",
            "https://www.bbc.com/news/10628494",
            "https://www.nbcnews.com/id/303207/device/rss/rss.xml",
            "https://www.foxnews.com/about/rss/"
        ],
        "pl": [
            "https://www.tvn24.pl/najnowsze.xml",
            "https://www.rmf24.pl/fakty/polska/feed",
            "https://wiadomosci.wp.pl/rss",
            "https://www.money.pl/rss/wszystkie"
        ],
        "es": [
            "https://www.elpais.com/rss/feed.html?feedId=1022",
            "https://www.abc.es/rss/feeds/abc_EspanaEspana.xml",
            "https://www.elconfidencial.com/rss/",
            "https://www.elperiodico.com/es/rss/"
        ],
        "de": [
            "https://www.tagesschau.de/xml/rss2",
            "https://www.faz.net/rss/aktuell/",
            "https://www.zeit.de/rss",
            "https://www.spiegel.de/schlagzeilen/tops/index.rss"
        ],
        "fr": [
            "https://www.lemonde.fr/rss/une.xml",
            "https://www.lefigaro.fr/rss/figaro_actualites.xml",
            "https://www.liberation.fr/rss/",
            "https://www.lci.fr/rss"
        ]
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  What is Apache Airflow?
&lt;/h2&gt;

&lt;p&gt;Apache Airflow is a platform used to programmatically author, schedule, and monitor workflows. It allows developers to create complex workflows by defining tasks and their dependencies. Airflow makes it easy to monitor the execution of tasks and provides an intuitive web interface to visualize the workflow.&lt;/p&gt;
&lt;h2&gt;
  
  
  What is Kafka?
&lt;/h2&gt;

&lt;p&gt;Apache Kafka is a distributed event streaming platform that allows you to publish and subscribe to streams of records. Kafka provides high-throughput, low-latency, and fault-tolerant data transport. Kafka can be used for real-time data processing, streaming analytics, and log aggregation.&lt;/p&gt;
&lt;h2&gt;
  
  
  Implementing the ETL pipeline
&lt;/h2&gt;

&lt;p&gt;To implement the ETL pipeline, we’ll use Python and the following libraries:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;feedparser: A Python library that parses RSS feeds&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;beautifulsoup4: A Python library that extracts data from HTML and XML files&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;kafka-python: A Python library that provides a Kafka client&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;redis: A Python library that provides a Redis client&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;First, we’ll define a DAG (Directed Acyclic Graph) in Airflow to run the pipeline on a scheduled basis. The DAG consists of four tasks:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;Update the proxy pool: This task retrieves a list of proxy servers from Redis or a public API, tests their connectivity, and stores the valid proxies in Redis. We’ll use the proxies to avoid getting blocked by the RSS feed servers.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Extract news: This task reads the RSS feeds using the valid proxies, extracts the news articles, and stores them in a list. We’ll use concurrent programming to speed up the extraction process.&lt;/li&gt;
&lt;li&gt;Validate data: This task checks if the news articles have all the required fields (title, link, and summary), and stores the valid articles in a separate list.&lt;/li&gt;
&lt;li&gt;Send to Kafka: This task sends the validated news articles to a Kafka topic, using the JsonConverter to serialize the data.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;from airflow import DAG&lt;br&gt;
from airflow.operators.python_operator import PythonOperator&lt;br&gt;
from airflow import DAG&lt;br&gt;
from airflow.utils.dates import days_ago&lt;/p&gt;

&lt;p&gt;from datetime import datetime, timedelta&lt;br&gt;
import feedparser&lt;br&gt;
from bs4 import BeautifulSoup&lt;br&gt;
from kafka import KafkaProducer&lt;br&gt;
from kafka.errors import KafkaError&lt;br&gt;
import json&lt;br&gt;
import requests&lt;br&gt;
import random&lt;br&gt;
import redis&lt;br&gt;
import concurrent.futures&lt;br&gt;
import html&lt;/p&gt;

&lt;p&gt;NEWS_FEEDS = {&lt;br&gt;
        "en": [&lt;br&gt;
            "&lt;a href="https://www.cnn.com/rss/edition.rss"&gt;https://www.cnn.com/rss/edition.rss&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.bbc.com/news/10628494"&gt;https://www.bbc.com/news/10628494&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.nbcnews.com/id/303207/device/rss/rss.xml"&gt;https://www.nbcnews.com/id/303207/device/rss/rss.xml&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.foxnews.com/about/rss/"&gt;https://www.foxnews.com/about/rss/&lt;/a&gt;"&lt;br&gt;
        ],&lt;br&gt;
        "pl": [&lt;br&gt;
            "&lt;a href="https://www.tvn24.pl/najnowsze.xml"&gt;https://www.tvn24.pl/najnowsze.xml&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.rmf24.pl/fakty/polska/feed"&gt;https://www.rmf24.pl/fakty/polska/feed&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://wiadomosci.wp.pl/rss"&gt;https://wiadomosci.wp.pl/rss&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.money.pl/rss/wszystkie"&gt;https://www.money.pl/rss/wszystkie&lt;/a&gt;"&lt;br&gt;
        ],&lt;br&gt;
        "es": [&lt;br&gt;
            "&lt;a href="https://www.elpais.com/rss/feed.html?feedId=1022"&gt;https://www.elpais.com/rss/feed.html?feedId=1022&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.abc.es/rss/feeds/abc_EspanaEspana.xml"&gt;https://www.abc.es/rss/feeds/abc_EspanaEspana.xml&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.elconfidencial.com/rss/"&gt;https://www.elconfidencial.com/rss/&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.elperiodico.com/es/rss/"&gt;https://www.elperiodico.com/es/rss/&lt;/a&gt;"&lt;br&gt;
        ],&lt;br&gt;
        "de": [&lt;br&gt;
            "&lt;a href="https://www.tagesschau.de/xml/rss2"&gt;https://www.tagesschau.de/xml/rss2&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.faz.net/rss/aktuell/"&gt;https://www.faz.net/rss/aktuell/&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.zeit.de/rss"&gt;https://www.zeit.de/rss&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.spiegel.de/schlagzeilen/tops/index.rss"&gt;https://www.spiegel.de/schlagzeilen/tops/index.rss&lt;/a&gt;"&lt;br&gt;
        ],&lt;br&gt;
        "fr": [&lt;br&gt;
            "&lt;a href="https://www.lemonde.fr/rss/une.xml"&gt;https://www.lemonde.fr/rss/une.xml&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.lefigaro.fr/rss/figaro_actualites.xml"&gt;https://www.lefigaro.fr/rss/figaro_actualites.xml&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.liberation.fr/rss/"&gt;https://www.liberation.fr/rss/&lt;/a&gt;",&lt;br&gt;
            "&lt;a href="https://www.lci.fr/rss"&gt;https://www.lci.fr/rss&lt;/a&gt;"&lt;br&gt;
        ]&lt;br&gt;
    }&lt;/p&gt;

&lt;p&gt;headers_list = [&lt;br&gt;
    {&lt;br&gt;
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:77.0) Gecko/20100101 Firefox/77.0",&lt;br&gt;
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,&lt;em&gt;/&lt;/em&gt;;q=0.8",&lt;br&gt;
        "Accept-Language": "en-US,en;q=0.5",&lt;br&gt;
        "Referer": "&lt;a href="https://www.google.com/"&gt;https://www.google.com/&lt;/a&gt;",&lt;br&gt;
        "DNT": "1",&lt;br&gt;
        "Connection": "keep-alive",&lt;br&gt;
        "Upgrade-Insecure-Requests": "1"&lt;br&gt;
    },&lt;br&gt;
    {&lt;br&gt;
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0",&lt;br&gt;
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,&lt;em&gt;/&lt;/em&gt;;q=0.8",&lt;br&gt;
        "Accept-Language": "en-US,en;q=0.5",&lt;br&gt;
        "Referer": "&lt;a href="https://www.google.com/"&gt;https://www.google.com/&lt;/a&gt;",&lt;br&gt;
        "DNT": "1",&lt;br&gt;
        "Connection": "keep-alive",&lt;br&gt;
        "Upgrade-Insecure-Requests": "1"&lt;br&gt;
    },&lt;br&gt;
    {&lt;br&gt;
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,&lt;em&gt;/&lt;/em&gt;;q=0.8",&lt;br&gt;
        "Accept-Encoding": "gzip, deflate", &lt;br&gt;
        "Accept-Language": "en-GB,en-US;q=0.9,en;q=0.8", &lt;br&gt;
        "Dnt": "1", &lt;br&gt;
        "Referer": "&lt;a href="https://www.google.com/"&gt;https://www.google.com/&lt;/a&gt;",&lt;br&gt;
        "Upgrade-Insecure-Requests": "1", &lt;br&gt;
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36", &lt;br&gt;
        "X-Amzn-Trace-Id": "Root=1-5ee7bae0-82260c065baf5ad7f0b3a3e3"&lt;br&gt;
    },&lt;br&gt;
    {&lt;br&gt;
        "User-Agent": 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0',&lt;br&gt;
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,&lt;em&gt;/&lt;/em&gt;;q=0.8",&lt;br&gt;
        "Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",&lt;br&gt;
        "Referer": "&lt;a href="https://www.reddit.com/"&gt;https://www.reddit.com/&lt;/a&gt;",&lt;br&gt;
        "DNT": "1",&lt;br&gt;
        "Connection": "keep-alive",&lt;br&gt;
        "Upgrade-Insecure-Requests": "1"&lt;br&gt;
    }&lt;br&gt;&lt;br&gt;
]&lt;/p&gt;
&lt;h1&gt;
  
  
  Define default_args dictionary to pass to the DAG
&lt;/h1&gt;

&lt;p&gt;ARGS = {&lt;br&gt;
    "owner": "stefentaime",&lt;br&gt;
    "start_date": days_ago(0),&lt;br&gt;
    "retries": 1,&lt;br&gt;
    "retry_delay": timedelta(seconds=30)&lt;br&gt;
}&lt;/p&gt;

&lt;p&gt;dag = DAG(&lt;br&gt;
    dag_id="ETL-Pipeline",&lt;br&gt;
    default_args=ARGS,&lt;br&gt;
    description="",&lt;br&gt;
    schedule_interval="0 0 1 * *",&lt;br&gt;
    tags=["ETL", "kafka", "Scrapting"]&lt;br&gt;
)&lt;/p&gt;

&lt;p&gt;REDIS_CONFIG = {'host': 'redis', 'port': 6379, 'decode_responses': True}&lt;br&gt;
REDIS_KEY = 'proxies'&lt;br&gt;
PROXY_WEBPAGE = '&lt;a href="https://free-proxy-list.net/"&gt;https://free-proxy-list.net/&lt;/a&gt;'&lt;br&gt;
TESTING_URL = '&lt;a href="https://httpbin.org/ip"&gt;https://httpbin.org/ip&lt;/a&gt;'&lt;br&gt;
MAX_WORKERS = 20&lt;br&gt;
PROXY_EXPIRATION = timedelta(minutes=5)&lt;/p&gt;

&lt;p&gt;def get_proxies():&lt;br&gt;
    r = redis.Redis(**REDIS_CONFIG)&lt;br&gt;
    if r.exists(REDIS_KEY):&lt;br&gt;
        proxies = r.lrange(REDIS_KEY, 0, -1)&lt;br&gt;
        expiration = r.ttl(REDIS_KEY)&lt;br&gt;
        if expiration == -1:&lt;br&gt;
            r.expire(REDIS_KEY, PROXY_EXPIRATION)&lt;br&gt;
        elif expiration &amp;lt; PROXY_EXPIRATION.total_seconds():&lt;br&gt;
            r.delete(REDIS_KEY)&lt;br&gt;
            proxies = []&lt;br&gt;
    else:&lt;br&gt;
        proxies = []&lt;br&gt;
    if not proxies:&lt;br&gt;
        headers = random.choice(headers_list)&lt;br&gt;
        page = requests.get(PROXY_WEBPAGE, headers=headers)&lt;br&gt;
        soup = BeautifulSoup(page.content, 'html.parser')&lt;br&gt;
        for row in soup.find('tbody').find_all('tr'):&lt;br&gt;
            proxy = row.find_all('td')[0].text + ':' + row.find_all('td')[1].text&lt;br&gt;
            proxies.append(proxy)&lt;br&gt;
        r.rpush(REDIS_KEY, *proxies)&lt;br&gt;
        r.expire(REDIS_KEY, PROXY_EXPIRATION)&lt;br&gt;
    return proxies&lt;/p&gt;

&lt;p&gt;def update_proxypool(**kwargs):&lt;br&gt;
    get_proxies()&lt;/p&gt;

&lt;p&gt;def test_proxy(proxies):&lt;br&gt;
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:&lt;br&gt;
        results = list(executor.map(test_single_proxy, proxies))&lt;br&gt;
    return (proxy for valid, proxy in zip(results, proxies) if valid)&lt;/p&gt;

&lt;p&gt;def test_single_proxy(proxy):&lt;br&gt;
    headers = random.choice(headers_list)&lt;br&gt;
    try:&lt;br&gt;
        resp = requests.get(TESTING_URL, headers=headers, proxies={"http": proxy, "https": proxy}, timeout=3)&lt;br&gt;
        if resp.status_code == 200:&lt;br&gt;
            return True&lt;br&gt;
    except:&lt;br&gt;
        pass&lt;br&gt;
    return False&lt;/p&gt;
&lt;h1&gt;
  
  
  Define the task to update the proxypool
&lt;/h1&gt;

&lt;p&gt;def update_proxypool(**kwargs):&lt;br&gt;
    proxies = get_proxies()&lt;br&gt;
    valid_proxies = list(test_proxy(proxies))&lt;br&gt;
    kwargs['ti'].xcom_push(key='valid_proxies', value=valid_proxies)&lt;/p&gt;

&lt;p&gt;import datetime&lt;/p&gt;

&lt;p&gt;next_id = 1&lt;/p&gt;

&lt;p&gt;def extract_website_name(link):&lt;br&gt;
    # Extract the website name from the link&lt;br&gt;
    website_name = link.split('//')[1].split('/')[0]&lt;br&gt;
    # Remove any leading "&lt;a href="http://www."&gt;www.&lt;/a&gt;" from the website name&lt;br&gt;
    website_name = website_name.replace('&lt;a href="http://www."&gt;www.&lt;/a&gt;', '')&lt;br&gt;
    return website_name&lt;/p&gt;

&lt;p&gt;def extract_article_data(entry, language):&lt;br&gt;
    global next_id&lt;br&gt;
    title = entry.title.encode('ascii', 'ignore').decode()&lt;br&gt;
    soup = BeautifulSoup(entry.summary, 'html.parser')&lt;br&gt;
    summary = html.unescape(soup.get_text().strip().replace('\xa0', ' '))&lt;br&gt;
    link = entry.link&lt;br&gt;
    date_published = entry.get('published_parsed', None)&lt;br&gt;
    if date_published is not None:&lt;br&gt;
        date_published = datetime.datetime(*date_published[:6])&lt;br&gt;
        time_since_published = datetime.datetime.utcnow() - date_published&lt;br&gt;
        if time_since_published &amp;lt; datetime.timedelta(hours=1):&lt;br&gt;
            today = datetime.datetime.utcnow().strftime("%d-%m-%Y")&lt;br&gt;
            website_name = extract_website_name(link)&lt;br&gt;
            unique_id = f"{language.upper()}{next_id:02d}-{website_name}-01-{today}"&lt;br&gt;
            next_id += 1&lt;br&gt;
            return {&lt;br&gt;
                'id': unique_id,&lt;br&gt;
                'title': title,&lt;br&gt;
                'link': link,&lt;br&gt;
                'summary': summary,&lt;br&gt;
                'language': language&lt;br&gt;
            }&lt;br&gt;
    return None&lt;/p&gt;

&lt;p&gt;def extract_news_feed(feed_url, language, proxy):&lt;br&gt;
    feed = feedparser.parse(feed_url, request_headers={'User-Agent': proxy})&lt;br&gt;
    articles = []&lt;br&gt;
    extracted_articles = set()&lt;br&gt;
    for entry in feed.entries:&lt;br&gt;
        if len(articles) &amp;gt;= 2:&lt;br&gt;
            break&lt;br&gt;
        link = entry.link&lt;br&gt;
        title = entry.title.encode('ascii', 'ignore').decode()&lt;br&gt;
        unique_id = f'{language}-{link}-{title}'&lt;br&gt;
        if unique_id in extracted_articles:&lt;br&gt;
            continue&lt;br&gt;
        extracted_articles.add(unique_id)&lt;br&gt;
        article_data = extract_article_data(entry, language)&lt;br&gt;
        if article_data is not None:&lt;br&gt;
            articles.append(article_data)&lt;br&gt;
    return articles&lt;/p&gt;

&lt;p&gt;def extract_news(**kwargs):&lt;br&gt;
    valid_proxies = set(kwargs['ti'].xcom_pull(key='valid_proxies'))&lt;br&gt;
    articles = []&lt;br&gt;
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:&lt;br&gt;
        futures = [executor.submit(extract_news_feed, feed_url, language, proxy) for language in NEWS_FEEDS.keys() &lt;br&gt;
                   for feed_url in NEWS_FEEDS[language] for proxy in valid_proxies]&lt;br&gt;
        for future in concurrent.futures.as_completed(futures):&lt;br&gt;
            result = future.result()&lt;br&gt;
            if result is not None:&lt;br&gt;
                articles.extend(result)&lt;br&gt;
        kwargs['ti'].xcom_push(key='articles', value=articles)&lt;br&gt;
    return articles&lt;/p&gt;
&lt;h1&gt;
  
  
  Define the task to validate the quality of the data
&lt;/h1&gt;

&lt;p&gt;def validate_data(**kwargs):&lt;br&gt;
    articles = kwargs['ti'].xcom_pull(key='articles', task_ids='extract_news')&lt;br&gt;
    validated_articles = [article for article in articles if all(article.get(k) for k in ('title', 'link', 'summary'))]&lt;br&gt;
    kwargs['ti'].xcom_push(key='validated_articles', value=validated_articles)&lt;br&gt;
    return validated_articles&lt;/p&gt;
&lt;h1&gt;
  
  
  Define the task to send data to the Kafka topic
&lt;/h1&gt;

&lt;p&gt;def send_to_kafka(**kwargs):&lt;br&gt;
    validated_articles = kwargs['ti'].xcom_pull(key='validated_articles', task_ids='validate_data')&lt;br&gt;
    producer = KafkaProducer(bootstrap_servers='broker:29092')&lt;br&gt;
    for article in validated_articles:&lt;br&gt;
        try:&lt;br&gt;
            producer.send('rss_feeds', key=article['title'].encode(), value=json.dumps(article).encode())&lt;br&gt;
        except KafkaError as e:&lt;br&gt;
            print(f"Failed to send message to Kafka: {e}")&lt;br&gt;
        producer.flush()&lt;br&gt;
    print("Data sent to Kafka successfully.")&lt;/p&gt;
&lt;h1&gt;
  
  
  Define the task dependencies
&lt;/h1&gt;

&lt;p&gt;update_proxypool_task = PythonOperator(task_id='update_proxypool', python_callable=update_proxypool, provide_context=True, dag=dag)&lt;br&gt;
extract_news_task = PythonOperator(task_id='extract_news', python_callable=extract_news, provide_context=True, dag=dag)&lt;br&gt;
validate_data_task = PythonOperator(task_id='validate_data', python_callable=validate_data, provide_context=True, dag=dag)&lt;br&gt;
send_to_kafka_task = PythonOperator(task_id='send_to_kafka', python_callable=send_to_kafka, provide_context=True, dag=dag)&lt;/p&gt;
&lt;h1&gt;
  
  
  Set the task dependencies
&lt;/h1&gt;

&lt;p&gt;update_proxypool_task &amp;gt;&amp;gt; extract_news_task &amp;gt;&amp;gt; validate_data_task &amp;gt;&amp;gt; send_to_kafka_task&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--uqZiyW2I--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3752/1%2A7YyfecNx4iCIn4ouBE5yXg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--uqZiyW2I--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3752/1%2A7YyfecNx4iCIn4ouBE5yXg.png" alt="" width="800" height="376"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Next, we’ll deploy a Kafka connector to consume the news articles from the Kafka topic and load them into MongoDB. We’ll use the MongoSinkConnector from the mongo-kafka-connect library, which provides an efficient and reliable way to integrate Kafka with MongoDB. The connector is configured to read the news articles from the Kafka topic, and write them to a MongoDB collection in the demo database.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "name": "mongodb-sink-connector",
    "config": {
      "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max": "1",
      "topics": "rss_feeds",
      "connection.uri": "mongodb://debezium:dbz@mongo:27017/demo?authSource=admin",
      "database": "demo",
      "collection": "rss_feeds_collection",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--qrvOg1l3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3172/1%2ApdQRa9G6KgwuSLsiK7RCFA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--qrvOg1l3--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3172/1%2ApdQRa9G6KgwuSLsiK7RCFA.png" alt="" width="800" height="345"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;To run the pipeline, you need to set up the following components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Apache Airflow: Use pip to install Airflow, and create a Python script that defines the DAG.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Redis: Set up a Redis instance to store the proxy servers.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Kafka: Install and configure a Kafka cluster with a single broker, and create a Kafka topic named rss_feeds.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;MongoDB: Install and configure a MongoDB cluster, and create a database named demo.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Kafka Connector: Deploy the mongo-kafka-connect connector to your Kafka cluster, and configure it to read from the rss_feeds topic and write to the rss_feeds_collection collection in the demo database.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Flask web application to serve news articles stored in a MongoDB database. The web application provides the following endpoints:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pymongo import MongoClient
from bson.objectid import ObjectId
from flask import Flask, request, jsonify, render_template

client = MongoClient('mongodb://debezium:dbz@localhost:27017/?authSource=admin')
db = client['demo']
collection = db['rss_feeds_collection']


app = Flask(__name__, template_folder='/path/template')

# get all news articles
@app.route('/news', methods=['GET'])
def get_all_news():
    cursor = collection.find({}, {"_id": 0})
    news = []
    for item in cursor:
        news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})
    return jsonify({'news': news})

# get a news article by id
@app.route('/news/&amp;lt;id&amp;gt;', methods=['GET'])
def get_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        return jsonify({'_id': str(item['_id']), 'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language']})
    else:
        return jsonify({'error': 'News article not found'})

# update a news article by id
@app.route('/news/&amp;lt;id&amp;gt;', methods=['PUT'])
def update_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        data = request.get_json()
        collection.update_one({'id': id}, {'$set': data})
        return jsonify({'message': 'News article updated successfully'})
    else:
        return jsonify({'error': 'News article not found'})

# delete a news article by id
@app.route('/news/&amp;lt;id&amp;gt;', methods=['DELETE'])
def delete_news_by_id(id):
    item = collection.find_one({'id': id})
    if item:
        collection.delete_one({'id': id})
        return jsonify({'message': 'News article deleted successfully'})
    else:
        return jsonify({'error': 'News article not found'})


# render a web page with news articles
@app.route('/', methods=['GET'])
def news_page():
    page = request.args.get('page', 1, type=int)
    language = request.args.get('language')

    # build query for language filtering
    query = {} if not language else {'language': language}

    # retrieve total count and paginated news articles
    count = collection.count_documents(query)
    cursor = collection.find(query, {"_id": 0}).skip((page-1)*5).limit(8)
    news = []
    for item in cursor:
        news.append({'title': item['title'], 'summary': item['summary'], 'link': item['link'], 'language': item['language'], 'id': item['id']})

    # calculate number of pages for pagination
    num_pages = count // 8 + (1 if count % 8 &amp;gt; 0 else 0)

    return render_template('index.html', news=news, page=page, language=language, num_pages=num_pages)

if __name__ == '__main__':
    app.run(debug=True)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;/news: GET all news articles from the database&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;/news/: GET a news article with the specified id from the database&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;/news/: PUT updates a news article with the specified id in the database&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;/news/: DELETE deletes a news article with the specified id from the database&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;/: GET a web page that displays paginated news articles with an optional language filter&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--hPVj--8J--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3762/1%2A8bV7pbg24O2SBjGy6TjX9w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--hPVj--8J--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3762/1%2A8bV7pbg24O2SBjGy6TjX9w.png" alt="" width="800" height="409"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Prerequisites
&lt;/h2&gt;

&lt;p&gt;Before we start, make sure you have the following installed:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Python 3&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Docker and Docker Compose&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;A text editor&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Steps To Run:
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;Clone the project to your desired location:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;$ git clone &lt;a href="https://github.com/Stefen-Taime/Scalable-RSS-Feed-Pipeline.git"&gt;https://github.com/Stefen-Taime/Scalable-RSS-Feed-Pipeline.git&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Execute the following command that will create the .env file containing the Airflow UID needed by docker-compose:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ echo -e "AIRFLOW_UID=$(id -u)" &amp;gt; .env&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Build Docker:&lt;/p&gt;

&lt;p&gt;$ docker-compose build&lt;/p&gt;

&lt;p&gt;Initialize Airflow database:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ docker-compose up airflow-init&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Start Containers:&lt;/p&gt;

&lt;p&gt;$ docker-compose up -d&lt;/p&gt;

&lt;p&gt;When everything is done, you can check all the containers running:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ docker ps&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now you can access Airflow web interface by going to &lt;a href="http://localhost:8080/"&gt;http://localhost:8080&lt;/a&gt; with the default user which is in the docker-compose.yml. Username/Password: airflow. Now, we can trigger our DAG and see all the tasks running.&lt;/p&gt;

&lt;p&gt;To setup Kafka and MongoDB, navigate to cd mongo-kafka:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ cd mongo-kafka&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Start Kafka and MongoDB containers:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ docker-compose up -d&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Execute the following command that will create SinkConnector for MongoDB:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ curl -X POST \ -H "Content-Type: application/json" \ --data @mongo-sink.json \ &lt;a href="http://localhost:8083/connectors"&gt;http://localhost:8083/connectors&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Execute the following command that will Run Api&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;$ python api.pi&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;In conclusion, this article has covered a variety of topics related to building a scalable RSS feed pipeline. We started by discussing RSS feeds and how to scrape them using Python. We then explored the use of Apache Airflow for orchestrating the pipeline and scheduling tasks.&lt;/p&gt;

&lt;p&gt;Next, we looked at how to use Kafka as a message broker to handle the data flow between the different components of the pipeline. We also examined the use of Kafka Connect to integrate Kafka with MongoDB and to enable easy data ingestion.&lt;/p&gt;

&lt;p&gt;To visualize the data ingested into MongoDB, we built a simple Flask API with Jinja templates to render a web page with paginated news articles. We used Bootstrap to make the page responsive and added filtering capabilities based on the language of the news articles.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/building-a-scalable-rss-feed-pipeline-with-apache-airflow-kafka-and-mongodb-flask-api-da379cc2e3fb"&gt;https://medium.com/@stefentaime_10958/building-a-scalable-rss-feed-pipeline-with-apache-airflow-kafka-and-mongodb-flask-api-da379cc2e3fb&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Real-Time Data Processing with MySQL, Redpanda, MinIO, and Apache Spark Using Delta Lake</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:18:45 +0000</pubDate>
      <link>https://dev.to/stefentaime/real-time-data-processing-with-mysql-redpanda-minio-and-apache-spark-using-delta-lake-2i9o</link>
      <guid>https://dev.to/stefentaime/real-time-data-processing-with-mysql-redpanda-minio-and-apache-spark-using-delta-lake-2i9o</guid>
      <description>&lt;p&gt;In this article, you will learn how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We will create a system that generates fake data simulating sensors on a bridge that flash car plates at each passage. The data will be stored in a MySQL database, and processed in real-time using Redpanda and Kafka Connect. We will then use MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we will integrate the Twilio API for real-time notifications.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--_dYRJpQz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2974/1%2AsPBGohDjls781VcZfJLnhQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--_dYRJpQz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2974/1%2AsPBGohDjls781VcZfJLnhQ.png" alt="" width="800" height="408"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Table of Contents
&lt;/h2&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Introduction&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Setting up the environment&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Docker Compose configuration&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data generation and storage in MySQL&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Creating an API for data ingestion&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Setting up connectors for data streaming and storage&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Real-time data processing with Apache Spark&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Reading data from MinIO&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data transformation and storage in the data warehouse&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Integrating Twilio for real-time notifications&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Conclusion&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  1. Introduction
&lt;/h2&gt;

&lt;p&gt;In this article, we will walk through the process of setting up a real-time data processing and analytics environment for vehicle plate recognition. We will use Docker to manage our services, MySQL for data storage, Redpanda as a streaming platform, MinIO as an object storage server, and Apache Spark for data processing and analysis. We will also integrate the Twilio API to send SMS notifications in real-time based on the processed data.&lt;/p&gt;

&lt;h2&gt;
  
  
  2. Setting up the environment
&lt;/h2&gt;

&lt;h2&gt;
  
  
  Docker Compose configuration
&lt;/h2&gt;

&lt;p&gt;To begin, we will create a Docker Compose file that defines all the necessary services, networks, and volumes for our environment. The services include Redpanda, MinIO, MySQL, Kafka Connect, Adminer, Spark Master, Spark Workers, Jupyter Notebook, a data generator, and an API.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: "3.7"
services:
  redpanda:
    image: vectorized/redpanda
    container_name: redpanda
    ports:
      - "9092:9092"
      - "29092:29092"
    command:
      - redpanda
      - start
      - --overprovisioned
      - --smp
      - "1"
      - --memory
      - "1G"
      - --reserve-memory
      - "0M"
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://redpanda:9092
      - --check=false
    networks:
      - spark_network  

  redpanda-console:
    image: vectorized/console
    container_name: redpanda_console
    depends_on:
      - redpanda
    ports:
      - "5000:8080"
    env_file:
      - .env
    networks:
      - spark_network  

  minio:
    hostname: minio
    image: "minio/minio"
    container_name: minio
    ports:
      - "9001:9001"
      - "9000:9000"
    command: [ "server", "/data", "--console-address", ":9001" ]
    volumes:
      - ./minio/data:/data
    env_file:
      - .env
    networks:
      - spark_network  

  mc:
    image: minio/mc
    container_name: mc
    hostname: mc
    environment:
      - AWS_ACCESS_KEY_ID=minio
      - AWS_SECRET_ACCESS_KEY=minio123
      - AWS_REGION=us-east-1
    entrypoint: &amp;gt;
      /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' &amp;amp;&amp;amp; sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
    depends_on:
      - minio
    networks:
      - spark_network  

  mysql:
    image: debezium/example-mysql:1.6
    container_name: mysql
    volumes:
      - ./mysql/data:/var/lib/mysql
    ports:
      - "3306:3306"
    env_file:
      - .env
    networks:
      - spark_network  

  kafka-connect:
    build:
      context: ./kafka
      dockerfile: ./Dockerfile
    container_name: kafka_connect
    depends_on:
      - redpanda
    ports:
      - "8083:8083"
    env_file:
      - .env
    networks:
      - spark_network  

  adminer:
    image: adminer:latest
    ports:
      - 8085:8080/tcp
    deploy:
     restart_policy:
       condition: on-failure 
    networks:
      - spark_network      

  spark-master:
    build:
      context: ./spark
      dockerfile: ./Dockerfile
    container_name: "spark-master"
    environment:
      - SPARK_MODE=master
      - SPARK_LOCAL_IP=spark-master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - "7077:7077"
      - "8080:8080"
    volumes:
      - ./spark/spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
    networks:
      - spark_network

  spark-worker-1:
    image: docker.io/bitnami/spark:3.3
    container_name: "spark-worker-1"
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - spark_network

  spark-worker-2:
    image: docker.io/bitnami/spark:3.3
    container_name: "spark-worker-2"
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - spark_network

  spark-notebook:
    build:
      context: ./notebooks
      dockerfile: ./Dockerfile
    container_name: "spark-notebook"
    user: root
    environment:
      - JUPYTER_ENABLE_LAB="yes"
      - GRANT_SUDO="yes"
    volumes:
      - ./notebooks:/home/jovyan/work
      - ./notebooks/spark-defaults.conf:/usr/local/spark/conf/spark-defaults.conf
    ports:
      - "8888:8888"
      - "4040:4040"
    networks:
      - spark_network

  generate_data:
    build: ./generate_data
    container_name: generate_data
    command: python generate_data.py
    depends_on:
      - mysql
    networks:
      - spark_network

  api:
    build: ./api
    ports:
      - "8000:8000"
    depends_on:
      - mysql          


networks:
  spark_network:
    driver: bridge
    name: spark_network

docker-compose up --build -d
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Data generation and storage in MySQL
&lt;/h2&gt;

&lt;p&gt;Once our environment is set up, we will generate fake data simulating sensors on a bridge that flash car plates at each passage. The data will include vehicle and owner information, subscription status, and other relevant fields. This data will be stored in a MySQL database and serve as the source of our real-time data processing pipeline.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import random
import uuid
from faker import Faker
import pandas as pd
import mysql.connector
from datetime import datetime, timedelta

# Initialize Faker
fake = Faker()

# Number of data points to generate
num_records = 1000

# Generate synthetic data
data = []

for _ in range(num_records):
    unique_id = str(uuid.uuid4())
    plate_number = f"{random.randint(1000, 9999)}-{fake.random_element(elements=('AAA', 'BBB', 'CCC', 'DDD', 'EEE', 'FFF', 'GGG', 'HHH', 'III', 'JJJ', 'KKK', 'LLL', 'MMM', 'NNN', 'OOO', 'PPP', 'QQQ', 'RRR', 'SSS', 'TTT', 'UUU', 'VVV', 'WWW', 'XXX', 'YYY', 'ZZZ'))}"

    car_info = {
        "make": fake.random_element(elements=("Toyota", "Honda", "Ford", "Chevrolet", "Nissan", "Volkswagen", "BMW", "Mercedes-Benz")),
        "year": random.randint(2000, 2023)
    }

    owner_info = {
        "name": fake.name(),
        "address": fake.address(),
        "phone_number": fake.phone_number().replace("x", " ext. ")  # Modify phone number format
    }

    subscription_status = fake.random_element(elements=("active", "expired", "none"))

    if subscription_status != "none":
        subscription_start = fake.date_between(start_date='-3y', end_date='today')
        subscription_end = subscription_start + timedelta(days=365)
    else:
        subscription_start = None
        subscription_end = None

    balance = round(random.uniform(0, 500), 2)

    timestamp = fake.date_time_between(start_date='-30d', end_date='now').strftime('%Y-%m-%d %H:%M:%S')


    record = {
        "id": unique_id,
        "plate_number": plate_number,
        "car_make": car_info["make"],
        "car_year": car_info["year"],
        "owner_name": owner_info["name"],
        "owner_address": owner_info["address"],
        "owner_phone_number": owner_info["phone_number"],
        "subscription_status": subscription_status,
        "subscription_start": subscription_start,
        "subscription_end": subscription_end,
        "balance": balance,
        "timestamp": timestamp
    }

    data.append(record)

# Convert data to a pandas DataFrame
df = pd.DataFrame(data)

# Connect to the MySQL database
db_config = {
    "host": "mysql",
    "user": "root",
    "password": "debezium",
    "database": "inventory"
}
conn = mysql.connector.connect(**db_config)

# Create a cursor
cursor = conn.cursor()

# Create the 'customers' table if it doesn't exist
create_table_query = '''
CREATE TABLE IF NOT EXISTS customers (
    id VARCHAR(255) NOT NULL,
    plate_number VARCHAR(255) NOT NULL,
    car_make VARCHAR(255) NOT NULL,
    car_year INT NOT NULL,
    owner_name VARCHAR(255) NOT NULL,
    owner_address TEXT NOT NULL,
    owner_phone_number VARCHAR(255) NOT NULL,
    subscription_status ENUM('active', 'expired', 'none') NOT NULL,
    subscription_start DATE,
    subscription_end DATE,
    balance DECIMAL(10, 2) NOT NULL,
    timestamp TIMESTAMP NOT NULL
)
'''
cursor.execute(create_table_query)

# Store the synthetic data in the 'customers' table
for index, row in df.iterrows():
    insert_query = '''
    INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    cursor.execute(insert_query, (
        row['id'],
        row['plate_number'],
        row['car_make'],
        row['car_year'],
        row['owner_name'],
        row['owner_address'],
        row['owner_phone_number'],
        row['subscription_status'],
        row['subscription_start'],
        row['subscription_end'],
        row['balance'],
        row['timestamp']
    ))

# Commit the changes and close the cursor
conn.commit()
cursor.close()

# Close the database connection
conn.close()

print("Synthetic data stored in the 'customers' table in the MySQL database")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wm9jOqpR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3786/1%2AeKCKb9bHMhGhBRXOWqio1A.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wm9jOqpR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3786/1%2AeKCKb9bHMhGhBRXOWqio1A.png" alt="" width="800" height="351"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Creating an API for data ingestion
&lt;/h2&gt;

&lt;p&gt;To facilitate data ingestion, we will create an API that allows us to send data as JSON objects. This API will be used to insert new data into the MySQL database, simulating the real-time data flow from the sensors on the bridge.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from flask import Flask, request, jsonify, render_template
import mysql.connector
import pandas as pd

app = Flask(__name__, template_folder='template')

db_config = {
        "host": "10.0.0.25",
        "user": "root",
        "password": "debezium",
        "database": "inventory"
    }

@app.route('/send_data', methods=['POST'])
def send_data():
    data = request.get_json()


    conn = mysql.connector.connect(**db_config)

    cursor = conn.cursor()

    insert_query = '''
    INSERT INTO customers (id, plate_number, car_make, car_year, owner_name, owner_address, owner_phone_number, subscription_status, subscription_start, subscription_end, balance, timestamp)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    cursor.execute(insert_query, (
        data['id'],
        data['plate_number'],
        data['car_make'],
        data['car_year'],
        data['owner_name'],
        data['owner_address'],
        data['owner_phone_number'],
        data['subscription_status'],
        data['subscription_start'],
        data['subscription_end'],
        data['balance'],
        data['timestamp']
    ))

    conn.commit()

    cursor.close()
    conn.close()

    return jsonify({"status": "success"}), 200

@app.route('/customers', methods=['GET'])
def customers():
    plate_number = request.args.get('plate_number', '')
    page = int(request.args.get('page', 1))
    items_per_page = 10

    conn = mysql.connector.connect(**db_config)

    # Create a cursor
    cursor = conn.cursor()

    # Fetch customers filtered by plate_number and apply pagination
    select_query = '''
    SELECT * FROM customers
    WHERE plate_number LIKE %s
    LIMIT %s OFFSET %s
    '''
    cursor.execute(select_query, (f"%{plate_number}%", items_per_page, (page - 1) * items_per_page))
    customers = cursor.fetchall()

    # Get the total number of customers
    cursor.execute("SELECT COUNT(*) FROM customers WHERE plate_number LIKE %s", (f"%{plate_number}%",))
    total_customers = cursor.fetchone()[0]

    # Close the cursor and connection
    cursor.close()
    conn.close()

    return render_template('customers.html', customers=customers, plate_number=plate_number, page=page, total_pages=(total_customers // items_per_page) + 1)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Test api
&lt;/h2&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import requests

data = {
    "id": "5a5c562e-4386-44ad-bf6f-bab91081781e",
    "plate_number": "7695-OOO",
    "car_make": "Ford",
    "car_year": 2012,
    "owner_name": "Stefen",
    "owner_address": "92834 Kim Unions\nPort Harryport, MD 61729",
    "owner_phone_number": "your number phone",
    "subscription_status": "active",
    "subscription_start": None,
    "subscription_end": None,
    "balance": 100.0,
    "timestamp": "2023-03-03T14:37:49",
}

response = requests.post("http://0.0.0.0:8000/send_data", json=data)

print(response.status_code)
print(response.json())

python request.py
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--BZ6GTDCE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3726/1%2AKNIiMZAr2kji-wp815GPLw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--BZ6GTDCE--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3726/1%2AKNIiMZAr2kji-wp815GPLw.png" alt="" width="800" height="170"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;my initial balance is $100&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Setting up connectors for data streaming and storage
&lt;/h2&gt;

&lt;p&gt;With our data stored in MySQL, we will set up Kafka Connect connectors to stream the data from MySQL to Redpanda and then store it in MinIO, which will serve as our distributed object storage. This data storage will act as the “bronze” table in our data warehouse.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# create connector source for MySQL
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "src-mysql",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.include.list": "inventory",
    "decimal.handling.mode": "double",
    "topic.prefix": "dbserver1",
    "schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}'

# create connector sink MySQL to S3
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "sink_aws-s3",
  "config": {
    "topics.regex": "dbserver1.inventory.*",
    "topics.dir": "inventory",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "flush.size": "1",
    "store.url": "http://minio:9000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.region": "us-east-1",
    "s3.bucket.name": "warehouse",
    "aws.access.key.id": "minio",
    "aws.secret.access.key": "minio123"
  }
}'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--D9Q56nz5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3714/1%2A68kAe4CjJYIyLYLlPW9ywg.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--D9Q56nz5--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3714/1%2A68kAe4CjJYIyLYLlPW9ywg.png" alt="" width="800" height="372"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ujTlPVqz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3800/1%2Ajf3Vhvq2uiKVoHmHX1M9Vw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ujTlPVqz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3800/1%2Ajf3Vhvq2uiKVoHmHX1M9Vw.png" alt="" width="800" height="371"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  3. Real-time data processing with Apache Spark
&lt;/h2&gt;

&lt;h2&gt;
  
  
  Reading data from MinIO
&lt;/h2&gt;

&lt;p&gt;Using Apache Spark, we will read the data stored in MinIO and process it further. This processing will involve selecting relevant fields and transforming the data into a more suitable format for analysis.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--uMnxRjTf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3162/1%2Aql8_YlFoEwSHfAN37DYeDw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--uMnxRjTf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3162/1%2Aql8_YlFoEwSHfAN37DYeDw.png" alt="" width="800" height="419"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ZCag1j3y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3096/1%2A0RkeGySa93Zp5kQG_knY9A.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ZCag1j3y--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3096/1%2A0RkeGySa93Zp5kQG_knY9A.png" alt="" width="800" height="378"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Data transformation and storage in the data warehouse
&lt;/h2&gt;

&lt;p&gt;Once we have processed the data, we will store it in a “silver” table in our data warehouse. This table will be used for further analysis and processing.&lt;/p&gt;

&lt;h2&gt;
  
  
  Integrating Twilio for real-time notifications
&lt;/h2&gt;

&lt;p&gt;To enhance our real-time data processing pipeline, we will integrate the Twilio API, allowing us to send SMS notifications based on specific conditions or events. For example, we could send an SMS to the vehicle owner when their subscription is about to expire or when their&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from datetime import datetime as dt, timedelta, timezone
import pytz
from twilio.rest import Client
from pyspark.sql import Row
from datetime import datetime, timezone
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import BooleanType
import datetime
import mysql.connector
from typing import Optional

# Additional imports
from mysql.connector import Error

TWILIO_ACCOUNT_SID = ''
TWILIO_AUTH_TOKEN = ''
TWILIO_PHONE_NUMBER = ''

client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")

def get_rate_for_customer(timestamp, subscription_status):
    if subscription_status == 'active':
        if 0 &amp;lt;= timestamp.hour &amp;lt; 6 or 11 &amp;lt;= timestamp.hour &amp;lt; 16:
            return 2.99
        elif 6 &amp;lt;= timestamp.hour &amp;lt; 11 or 16 &amp;lt;= timestamp.hour &amp;lt; 23:
            return 3.99
    else:
        return 9.99

    # Add a default rate value to avoid NoneType issues
    return 0.0


def is_subscription_active(subscription_start: dt, subscription_end: dt, current_time: dt) -&amp;gt; bool:
    return subscription_start &amp;lt;= current_time &amp;lt;= subscription_end

def get_subscription_status(subscription_end: dt, current_time: dt) -&amp;gt; bool:
    grace_period = timedelta(days=7)
    return current_time &amp;lt;= subscription_end + grace_period


def send_sms(phone_number, message):
    try:
        client.messages.create(
            body=message,
            from_=TWILIO_PHONE_NUMBER,
            to=phone_number
        )
        print(f"SMS sent to {phone_number}: {message}")
    except Exception as e:
        print(f"Error sending SMS: {e}")

from pyspark.sql.functions import col

def is_valid_balance(value):
    try:
        float(value)
        return True
    except ValueError:
        return False

valid_balance_udf = udf(is_valid_balance, BooleanType())

silver_data = silver_data.filter(valid_balance_udf(col("balance")))

# Database configuration
db_config = {
    "host": "mysql",
    "user": "root",
    "password": "debezium",
    "database": "inventory"
}

def update_customer_balance(customer_id, new_balance):
    try:
        connection = mysql.connector.connect(**db_config)
        cursor = connection.cursor()
        update_query = "UPDATE customers SET balance = %s WHERE id = %s"
        cursor.execute(update_query, (new_balance, customer_id))
        connection.commit()
        print(f"Updated balance for customer {customer_id}: {new_balance}")
    except Error as e:
        print(f"Error updating balance: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close() 

from datetime import datetime, timezone

def safe_date_conversion(date_string: Optional[str]) -&amp;gt; dt:
    if date_string is None or not isinstance(date_string, str):
        return dt(1970, 1, 1, tzinfo=timezone.utc)
    try:
        return dt.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        return dt(1970, 1, 1, tzinfo=timezone.utc)

def process_plate(row: Row) -&amp;gt; None:
    print(f"Processing plate: {row.plate_number}")
    current_time = dt.now(timezone.utc)
    try:
        plate_timestamp = dt.fromisoformat(row.timestamp[:-1]).replace(tzinfo=timezone.utc)
    except ValueError:
        plate_timestamp = dt.fromtimestamp(0, timezone.utc)

    subscription_start = safe_date_conversion(row.subscription_start)
    subscription_end = safe_date_conversion(row.subscription_end)

    is_active = is_subscription_active(subscription_start, subscription_end, current_time)
    rate = get_rate_for_customer(plate_timestamp, row.subscription_status)

    balance = float(row.balance)
    new_balance = balance - rate

    if row.subscription_status == 'none':
        message = f"Dear {row.owner_name}, your car with plate number {row.plate_number} is not registered. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif is_active:  # Changed from row.subscription_status == 'active'
        message = f"Dear {row.owner_name}, your subscription is active. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)
    elif not get_subscription_status(subscription_end, current_time):
        message = f"Dear {row.owner_name}, your subscription has expired. The rate of ${rate} has been charged for your recent passage. Your new balance is ${new_balance:.2f}."
        send_sms(row.owner_phone_number, message)

        update_customer_balance(row.id, new_balance)

silver_data.foreach(process_plate)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This script is designed to process a dataset containing information about car passages and their owners, including subscription status, balance, plate numbers, and owner details. It reads data from a “silver” table in a data warehouse, processes the data in real-time, sends SMS notifications to the car owners via the Twilio API, and updates the customer’s balance in a MySQL database.&lt;/p&gt;

&lt;p&gt;Here’s a breakdown of the script:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Import necessary libraries and modules for the script.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define Twilio credentials (account SID, auth token, and phone number) for sending SMS notifications.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create a SparkSession to read data from the “silver” table.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define utility functions:&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;get_rate_for_customer: Calculate the rate based on timestamp and subscription status.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;is_subscription_active: Check if a subscription is active.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;get_subscription_status: Check if a subscription is within the grace period.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;send_sms: Send an SMS using the Twilio API.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;is_valid_balance: Check if a given balance is valid (convertible to a float).&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;update_customer_balance: Update the customer balance in the MySQL database.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;safe_date_conversion: Convert a date string to a datetime object, handling errors and missing values.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;process_plate: Process each plate record, calculate the rate, send SMS notifications, and update the customer balance.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Register a User-Defined Function (UDF) valid_balance_udf that filters records with valid balance values.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Filter the dataset to keep records with valid balances using the valid_balance_udf.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define database configuration for connecting to the MySQL database.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Use the foreach action to process each plate record using the process_plate function. This includes checking subscription status, calculating the rate, sending SMS notifications, and updating the customer balance.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--kkjVyWCN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2000/1%2A6ARIgXlgfMVRQ7JjL_ZZQw.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--kkjVyWCN--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2000/1%2A6ARIgXlgfMVRQ7JjL_ZZQw.jpeg" alt="" width="800" height="1647"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")


import pyspark.sql.functions as F
from pyspark.sql import SparkSession

class MetricsAdapter:
    def __init__(self, silver_table, warehouse_path):
        self.silver_table = silver_table
        self.warehouse_path = warehouse_path

    def show_metrics(self):
        daily_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/daily_metrics')
        weekly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/weekly_metrics')
        monthly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/monthly_metrics')
        quarterly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/quarterly_metrics')
        yearly_metrics = spark.read.format('delta').load(self.warehouse_path + '/gold/yearly_metrics')
        subscription_status_count = silver_data.groupBy("subscription_status").count()

        print("Daily Metrics:")
        daily_metrics.show(5)

        print("Weekly Metrics:")
        weekly_metrics.show(5)

        print("Monthly Metrics:")
        monthly_metrics.show(5)

        print("Quarterly Metrics:")
        quarterly_metrics.show(5)

        print("Yearly Metrics:")
        yearly_metrics.show(5)    

    def transform(self):
        # Calculate the week, month, quarter, and year from the timestamp
        time_based_metrics = self.silver_table.withColumn("date", F.to_date("timestamp")) \
            .withColumn("year", F.year("timestamp")) \
            .withColumn("quarter", F.quarter("timestamp")) \
            .withColumn("month", F.month("timestamp")) \
            .withColumn("week_of_year", F.weekofyear("timestamp")) \
            .withColumn("total_passages", F.lit(1)) \
            .withColumn("total_revenue", F.when(self.silver_table.timestamp.substr(12, 2).cast("int") &amp;lt; 12, 2.99).otherwise(3.99))


        # Daily metrics
        daily_metrics = time_based_metrics.groupBy("date").agg(
            F.count("*").alias("total_passages"),
            F.sum(F.when(time_based_metrics.timestamp.substr(12, 2).cast("int") &amp;lt; 12, 2.99).otherwise(3.99)).alias("total_revenue")
        )
        daily_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/daily_metrics')

        # Weekly metrics
        weekly_metrics = time_based_metrics.groupBy("year", "week_of_year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        weekly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/weekly_metrics')

        # Monthly metrics
        monthly_metrics = time_based_metrics.groupBy("year", "month").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        monthly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/monthly_metrics')

        # Quarterly metrics
        quarterly_metrics = time_based_metrics.groupBy("year", "quarter").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        quarterly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/quarterly_metrics')

        # Yearly metrics
        yearly_metrics = time_based_metrics.groupBy("year").agg(
            F.sum("total_passages").alias("total_passages"),
            F.sum("total_revenue").alias("total_revenue")
        )
        yearly_metrics.write.format('delta').mode('overwrite').option("mergeSchema", "true").save(self.warehouse_path + '/gold/yearly_metrics')

# Example usage
spark = SparkSession.builder.getOrCreate()
silver_data = spark.read.parquet("s3a://warehouse/inventory/silver_data")
warehouse_path = "s3a://warehouse/inventory/gold_data"
metrics_adapter = MetricsAdapter(silver_data, warehouse_path)
metrics_adapter.transform()

metrics_adapter.show_metrics()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The code calculates daily, weekly, monthly, quarterly, and yearly metrics, such as total passages and total revenue. It also defines a MetricsAdapter class that encapsulates the data transformation and metrics display logic.&lt;/p&gt;

&lt;p&gt;The first line of code:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;gold_data.write.parquet("s3a://warehouse/inventory/gold_data", mode="overwrite")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;writes the gold_data DataFrame to the specified S3 bucket in Parquet format, with the overwrite mode, which replaces any existing data in the destination.&lt;/p&gt;

&lt;p&gt;The MetricsAdapter class has two primary methods: transform() and show_metrics().&lt;/p&gt;

&lt;p&gt;transform() method:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Calculates the date, year, quarter, month, and week of the year from the timestamp.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Aggregates the data based on different time granularities (daily, weekly, monthly, quarterly, and yearly) using the groupBy and agg functions.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Writes the aggregated metrics into Parquet format on the specified S3 bucket using Delta Lake format, which provides ACID transactions, versioning, and schema evolution for large-scale data lakes.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;show_metrics() method:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Reads the metrics data from the S3 bucket and formats it as Delta Lake.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Displays the top 5 records of daily, weekly, monthly, quarterly, and yearly metrics using the show() function.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Finally, the example usage part of the code initializes a SparkSession, reads the silver_data from the S3 bucket, creates a MetricsAdapter instance with silver_data and the warehouse path, calls the transform() method to aggregate the data, and then calls the show_metrics() method to display the results.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--sMI1uAwV--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3830/1%2A0JELvCj7pc9-3FtOy3HmuQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--sMI1uAwV--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3830/1%2A0JELvCj7pc9-3FtOy3HmuQ.png" alt="" width="800" height="307"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ORjy3CMJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3816/1%2Av0EzzKKIB7ObPKd47-t34g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ORjy3CMJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3816/1%2Av0EzzKKIB7ObPKd47-t34g.png" alt="" width="800" height="277"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Conclusion&lt;/p&gt;

&lt;p&gt;In this article, we have demonstrated how to set up a real-time data processing and analytics environment using Docker, MySQL, Redpanda, MinIO, and Apache Spark. We created a system that generates fake data simulating a sensor, stores it in a MySQL database, and processes it in real-time using Redpanda and Kafka Connect. We then utilized MinIO as a distributed object storage and Apache Spark to further process and analyze the data. Additionally, we integrated the Twilio API for real-time notifications.&lt;/p&gt;

&lt;p&gt;This project showcases the potential of using modern data processing tools to handle real-time scenarios, such as monitoring car passages on a bridge and notifying car owners about their subscription status and balance. The combination of these technologies enables scalable and efficient data processing, as well as the ability to respond quickly to changes in the data.&lt;/p&gt;

&lt;p&gt;The knowledge gained from this project can be applied to various other real-time data processing and analytics use cases. By understanding and implementing these technologies, you can build powerful and efficient systems that are able to handle large amounts of data and provide valuable insights in real-time.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/Stefen-Taime/stream-ingestion-redpanda-minio.git"&gt;https://github.com/Stefen-Taime/stream-ingestion-redpanda-minio.git&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/real-time-data-processing-and-analytics-with-docker-mysql-redpanda-minio-and-apache-spark-eca83f210ef6"&gt;https://medium.com/@stefentaime_10958/real-time-data-processing-and-analytics-with-docker-mysql-redpanda-minio-and-apache-spark-eca83f210ef6&lt;/a&gt;&lt;/p&gt;

</description>
      <category>minio</category>
      <category>spark</category>
      <category>redpanda</category>
      <category>deltalake</category>
    </item>
    <item>
      <title>Visualizing Bitcoin to USD Exchange Rates using FastAPI, Prometheus, Grafana, Deploy with jenkins</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:15:42 +0000</pubDate>
      <link>https://dev.to/stefentaime/visualizing-bitcoin-to-usd-exchange-rates-using-fastapi-prometheus-grafana-deploy-with-jenkins-41d9</link>
      <guid>https://dev.to/stefentaime/visualizing-bitcoin-to-usd-exchange-rates-using-fastapi-prometheus-grafana-deploy-with-jenkins-41d9</guid>
      <description>&lt;h2&gt;
  
  
  Visualizing Bitcoin to USD Exchange Rates using FastAPI, Prometheus, Grafana, Deploy with jenkins On Localhost Ubuntu Server 20.04
&lt;/h2&gt;

&lt;p&gt;In this article, we’ll explore how to visualize the exchange rate of Bitcoin to USD using FastAPI, Prometheus, Grafana, and Docker. We will create a simple FastAPI application to import exchange rate data from an API, store it in a database, and expose it as metrics using Prometheus. Then, we’ll use Grafana to create dashboards that visualize the data, and deploy the whole setup using Docker and Jenkins. Inspired by the article of &lt;a href="https://amlanscloud.com/kubechallenge/"&gt;amlanscloud&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://devopscube.com/setup-slaves-on-jenkins-2/"&gt;Setup Jenkins Agent/Slave Using SSH [Password &amp;amp; SSH Key]&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Overview
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ceMIo1hW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3816/1%2Aoc4RMB_AVLUm0MPzM0tGnA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ceMIo1hW--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3816/1%2Aoc4RMB_AVLUm0MPzM0tGnA.png" alt="" width="800" height="365"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here’s an outline of our plan:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Data Source: The process commences by acquiring data from a public data source. In this case, we’ll use an API that offers free exchange rates for Bitcoin to USD. The API delivers the exchange rate between Bitcoin and USD at the moment the API is called. By invoking the API multiple times, we can obtain time series data for the fluctuations in the exchange rate.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Data Importer App: An importer app reads the data from the data source API mentioned above. This app operates daily, invoking the data source API to acquire the exchange rate. Subsequently, the importer app stores this rate in a database. Each day’s exchange rate is represented by a row of items in the database.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Prometheus Data Scraper: This component is a data scraper job defined within Prometheus. The Prometheus data scraper fetches data from an API endpoint and incorporates it as a metric in Prometheus. A custom API has been developed for this purpose, which, upon invocation, retrieves the day’s exchange rate from the database and returns the data in a format that can be easily read and imported as a metric in Prometheus.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Visualize Data: Prometheus stores the data scraped as a distinct metric. This metric serves as a data source for creating visualization dashboards on Grafana. These dashboards display various trends in the exchange rate fluctuations as documented by the metrics.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;Deployment: We will deploy the entire setup using Jenkins, Docker, and Docker Compose.&lt;/p&gt;

&lt;p&gt;from fastapi import FastAPI, Response&lt;br&gt;
from fastapi.responses import PlainTextResponse, JSONResponse&lt;br&gt;
import uvicorn&lt;br&gt;
from dotenv import load_dotenv&lt;br&gt;
import os&lt;br&gt;
import requests&lt;br&gt;
import json&lt;br&gt;
from prometheus_client import Gauge, CollectorRegistry, generate_latest&lt;br&gt;
from datetime import date&lt;br&gt;
import redis&lt;/p&gt;

&lt;p&gt;load_dotenv()&lt;/p&gt;

&lt;p&gt;def scrape_data():&lt;br&gt;
    qry_url = f'{os.environ.get("STOCK_URL")}?function=CURRENCY_EXCHANGE_RATE&amp;amp;from_currency=BTC&amp;amp;to_currency=USD&amp;amp;apikey={os.environ.get("API_KEY")}'&lt;br&gt;
    response = requests.request("GET", qry_url)&lt;br&gt;
    respdata = response.json()&lt;br&gt;
    rate = respdata['Realtime Currency Exchange Rate']['5. Exchange Rate']&lt;br&gt;
    float_rate = "{:.2f}".format(float(rate))&lt;br&gt;
    return float_rate&lt;/p&gt;

&lt;p&gt;registry = CollectorRegistry()&lt;br&gt;
exchange_rate_btc_usd = Gauge('exchange_rate_btc_usd', 'Exchange rate between BTC and USD', registry=registry)&lt;/p&gt;

&lt;p&gt;app = FastAPI()&lt;/p&gt;

&lt;p&gt;@app.get("/", response_class=JSONResponse)&lt;br&gt;
async def root():&lt;br&gt;
    return {"message": "Hello World"}&lt;/p&gt;

&lt;p&gt;@app.get("/exchangemetrics", response_class=PlainTextResponse)&lt;br&gt;
async def get_exchange_metrics():&lt;br&gt;
    r = redis.Redis(host="redis", port=os.environ.get('REDIS_PORT'), password=os.environ.get('REDIS_PASSWORD'), db=0)&lt;br&gt;
    scraped_data = 0.0&lt;br&gt;
    try:&lt;br&gt;
        todays_date = str(date.today())&lt;br&gt;
        redis_key = f'exchange_rate-{todays_date}'&lt;br&gt;
        tmpdata = r.get(redis_key)&lt;br&gt;
        scraped_data = tmpdata.decode("utf-8")&lt;br&gt;
        exchange_rate_btc_usd.set(float(scraped_data))&lt;br&gt;
    except Exception as e:&lt;br&gt;
        print(e)&lt;br&gt;
        print('responding default value')&lt;br&gt;
    return Response(generate_latest(registry), media_type="text/plain")&lt;/p&gt;

&lt;p&gt;@app.get("/getexchangedata", response_class=PlainTextResponse)&lt;br&gt;
async def get_exchange_data():&lt;br&gt;
    r = redis.Redis(host="redis", port=os.environ.get('REDIS_PORT'), password=os.environ.get('REDIS_PASSWORD'), db=0)&lt;br&gt;
    scraped_data = 0.0&lt;br&gt;
    respdata = "error"&lt;br&gt;
    try:&lt;br&gt;
        scraped_data = scrape_data()&lt;br&gt;
        todays_date = str(date.today())&lt;br&gt;
        r.set(f'exchange_rate-{todays_date}', scraped_data)&lt;br&gt;
        respdata = "done"&lt;br&gt;
    except Exception as e:&lt;br&gt;
        print(e)&lt;br&gt;
        print('responding default value')&lt;br&gt;
        todays_date = str(date.today())&lt;br&gt;
        r.set(f'exchange_rate-{todays_date}', scraped_data)&lt;br&gt;
    return respdata&lt;/p&gt;

&lt;p&gt;if &lt;strong&gt;name&lt;/strong&gt; == "&lt;strong&gt;main&lt;/strong&gt;":&lt;br&gt;
    uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=True)&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;FastAPI application that fetches the Bitcoin to USD exchange rate from an external API and stores the data in a Redis database. It also exposes the exchange rate data as Prometheus metrics. Let’s break down the code:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Import necessary modules: The code starts by importing the required modules such as FastAPI, uvicorn, dotenv, os, requests, json, prometheus_client, datetime, and redis.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Load environment variables: The load_dotenv() function is called to load the environment variables from the .env file.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Scrape exchange rate data: The scrape_data() function fetches the exchange rate data by making a GET request to the external API using the requests library. It then extracts the exchange rate and formats it as a float with two decimal places.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Initialize Prometheus Gauge: A Prometheus Gauge named exchange_rate_btc_usd is initialized to store the exchange rate data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Create FastAPI app: A FastAPI application named app is created.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Define endpoints:&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;GET /: A simple Hello World endpoint that returns a JSON response.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;GET /exchangemetrics: This endpoint fetches the exchange rate data for the current day from the Redis database and sets the value of the Prometheus Gauge exchange_rate_btc_usd. It then returns the Prometheus metrics as a plain text response.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;GET /getexchangedata: This endpoint fetches the exchange rate data by calling the scrape_data() function, stores the data in the Redis database, and returns a plain text response indicating whether the operation was successful.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Run FastAPI app: The FastAPI app is run using uvicorn with the host set to “0.0.0.0” and port 5000. The reload=True parameter enables hot-reloading during development.&lt;/li&gt;
&lt;/ol&gt;

&lt;h2&gt;
  
  
  Prometheus
&lt;/h2&gt;

&lt;p&gt;It sets up Prometheus to scrape metrics from two different sources: Prometheus itself and a FastAPI application. Let’s break down the configuration:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Global settings: The global settings apply to all the scrape jobs defined in the configuration.&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;scrape_interval: The interval at which Prometheus scrapes metrics from the targets. In this case, it's set to 15 seconds.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Scrape configs: The scrape_configs section defines the jobs that Prometheus will use to scrape metrics from different sources.&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;job_name: 'prometheus': The first job is named 'prometheus' and is configured to scrape metrics from the Prometheus server itself.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;static_configs: This section specifies the target for this job. In this case, it's set to scrape metrics from the Prometheus server running on 'localhost:9090'.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;job_name: 'fastapi_app': The second job is named 'fastapi_app' and is configured to scrape metrics from the FastAPI application.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;metrics_path: The path to the metrics endpoint in the FastAPI application, which is '/exchangemetrics' in this case.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;static_configs: This section specifies the target for this job. In this case, it's set to scrape metrics from the FastAPI application running on 'fastapi_app:5000' (the application's hostname and port).&lt;/p&gt;

&lt;p&gt;global:&lt;br&gt;
  scrape_interval: 15s&lt;/p&gt;

&lt;p&gt;scrape_configs:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;job_name: 'prometheus'
static_configs:

&lt;ul&gt;
&lt;li&gt;targets: ['localhost:9090']&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;job_name: 'fastapi_app'
metrics_path: '/exchangemetrics'
static_configs:

&lt;ul&gt;
&lt;li&gt;targets: ['fastapi_app:5000']&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Jenkins
&lt;/h2&gt;

&lt;p&gt;which defines a Jenkins pipeline for building and deploying a project. The pipeline consists of four stages, and there’s a post section to execute actions after all stages have completed. Let's break down the pipeline:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;Agent: Specifies that the pipeline will run on a Jenkins agent with the label ‘ubuntu’.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stages: Contains the sequential stages to be executed in the pipeline.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;Stage 1: Build and Deploy prometheus, grafana, and redis:&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This stage changes the working directory to /home/stefen/deploy/adminer.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;It prints the current working directory and its contents.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Then, it runs the docker-compose up -d command to deploy Prometheus, Grafana, and Redis using Docker Compose.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stage 2: Build and Deploy API:&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This stage waits for 30 seconds before proceeding.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;It changes the working directory to /home/stefen/deploy/api.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;It prints the current working directory and its contents.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Then, it runs the docker-compose up --build -d command to build and deploy the FastAPI application using Docker Compose.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stage 3: Fetch and Print getexchangedata:&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This stage makes an HTTP request to the FastAPI application’s /getexchangedata endpoint.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;It prints the response received from the API.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;Stage 4: Fetch and Print Exchangemetrics:&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;This stage makes an HTTP request to the FastAPI application’s /exchangemetrics endpoint.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;It prints the response received from the API.&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt;Post: Specifies actions to be executed after all the stages have completed, regardless of their success or failure.&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;In this case, it prints the URLs for Prometheus, Grafana, Redis, and the FastAPI application.&lt;/p&gt;

&lt;p&gt;pipeline {&lt;br&gt;
    agent {&lt;br&gt;
        label 'ubuntu'&lt;br&gt;
    }&lt;br&gt;
    stages {&lt;br&gt;
        stage('Build and Deploy prometheus, grafana, and redis') {&lt;br&gt;
            steps {&lt;br&gt;
                dir('/home/stefen/deploy/adminer') {&lt;br&gt;
                    sh 'pwd' // Ajout de la commande pwd pour vérifier le répertoire de travail&lt;br&gt;
                    sh 'ls -la'&lt;br&gt;
                    sh 'docker-compose up -d'&lt;br&gt;
                }&lt;br&gt;
            }&lt;br&gt;
        }&lt;br&gt;
        stage('Build and Deploy API') {&lt;br&gt;
            steps {&lt;br&gt;
                sleep(time: 30, unit: 'SECONDS') // Wait for 30 seconds&lt;br&gt;
                dir('/home/stefen/deploy/api') {&lt;br&gt;
                    sh 'pwd' // Check the current working directory&lt;br&gt;
                    sh 'ls -la'&lt;br&gt;
                    sh 'docker-compose up --build -d'&lt;br&gt;
                }&lt;br&gt;
            }&lt;br&gt;
        }&lt;br&gt;
        stage('Fetch and Print getexchangedata') {&lt;br&gt;
            steps {&lt;br&gt;
                script {&lt;br&gt;
                    def response = httpRequest '&lt;a href="http://localhost:5000/getexchangedata"&gt;http://localhost:5000/getexchangedata&lt;/a&gt;'&lt;br&gt;
                    echo "Response: ${response.content}"&lt;br&gt;
                }&lt;br&gt;
            }&lt;br&gt;
        }&lt;br&gt;
        stage('Fetch and Print Exchangemetrics') {&lt;br&gt;
            steps {&lt;br&gt;
                script {&lt;br&gt;
                    def response = httpRequest '&lt;a href="http://localhost:5000/exchangemetrics"&gt;http://localhost:5000/exchangemetrics&lt;/a&gt;'&lt;br&gt;
                    echo "Response: ${response.content}"&lt;br&gt;
                }&lt;br&gt;
            }&lt;br&gt;
        }&lt;br&gt;
    }&lt;br&gt;
    post {&lt;br&gt;
        always {&lt;br&gt;
            script {&lt;br&gt;
                def prometheusURL = '&lt;a href="http://localhost:9090"&gt;http://localhost:9090&lt;/a&gt;'&lt;br&gt;
                def grafanaURL = '&lt;a href="http://localhost:3000"&gt;http://localhost:3000&lt;/a&gt;'&lt;br&gt;
                def redisURL = '&lt;a href="http://localhost:6379"&gt;http://localhost:6379&lt;/a&gt;'&lt;br&gt;
                def apiURL = '&lt;a href="http://localhost:5000"&gt;http://localhost:5000&lt;/a&gt;'&lt;/p&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;            echo "Prometheus URL: ${prometheusURL}"
            echo "Grafana URL: ${grafanaURL}"
            echo "Redis URL: ${redisURL}"
            echo "API URL: ${apiURL}"
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;
&lt;p&gt;}&lt;/p&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Grafana Dashboard:
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--_NFhlOV4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3834/1%2Ahqkhn_cI3UNKGa__tSkg5g.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--_NFhlOV4--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3834/1%2Ahqkhn_cI3UNKGa__tSkg5g.png" alt="" width="800" height="364"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion:
&lt;/h2&gt;

&lt;p&gt;In conclusion, the provided code snippets and configurations showcase an end-to-end deployment process of a monitoring and data visualization system using Jenkins, Docker, Prometheus, Grafana, and FastAPI. The pipeline defined in the Jenkinsfile automates the build and deployment process, ensuring a smooth and streamlined workflow for the project. This setup allows developers to efficiently monitor and visualize Bitcoin to USD exchange rate data, making it easier to identify trends and understand the data’s behavior over time. The use of FastAPI, Redis, and the provided API ensures a robust and efficient architecture for the system, while Docker and Jenkins enable seamless deployment and automation. Overall, this project demonstrates a practical application of modern technologies to create an effective and reliable monitoring and data visualization system.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/Stefen-Taime/Visualizing-Bitcoin"&gt;Code Source Github&lt;/a&gt;&lt;/p&gt;

</description>
      <category>jenkins</category>
      <category>prometheus</category>
      <category>grafana</category>
      <category>fastapi</category>
    </item>
    <item>
      <title>Analyzing Uber and Uber Eats Expenses Using DBT, Postgres, Gmail, Python, SQL And PowerBI</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:13:15 +0000</pubDate>
      <link>https://dev.to/stefentaime/analyzing-uber-and-uber-eats-expenses-using-dbt-postgres-gmail-python-sql-and-powerbi-1j9g</link>
      <guid>https://dev.to/stefentaime/analyzing-uber-and-uber-eats-expenses-using-dbt-postgres-gmail-python-sql-and-powerbi-1j9g</guid>
      <description>&lt;p&gt;Unveiling the true cost of your ride-sharing and food delivery habits with an ELT data pipeline, PostgreSQL, dbt, and Power BI.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--JQV1fC5j--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:1050/1%2AjmyS3SW2x15UPpbppBKx8w.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--JQV1fC5j--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:1050/1%2AjmyS3SW2x15UPpbppBKx8w.png" alt="" width="800" height="447"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Introduction&lt;/p&gt;

&lt;p&gt;As a regular user of Uber and Uber Eats products, I realized that I wanted to gain better insights into how much I spend on these services per month, year, or quarter. As a digital content creator and data engineer, I decided to create a proof-of-concept (POC) for a data analysis project to track my expenses on these platforms.&lt;/p&gt;

&lt;p&gt;In this article, I will walk you through the process of building the “My Uber Project” pipeline. This pipeline utilizes an ELT (Extract, Load, Transform) approach to extract data from PDF receipts, clean and structure the data, store the data in a PostgreSQL database, perform transformations using dbt (Data Build Tool), and finally visualize the results with Power BI.&lt;/p&gt;

&lt;p&gt;Data Extraction: PDF Receipts&lt;/p&gt;

&lt;p&gt;The first step in the My Uber Project pipeline is to extract data from the PDF receipts received via email after each Uber ride or Uber Eats order. To achieve this, we can use Python libraries like PyPDF2 or pdfplumber to parse the PDF files and extract the relevant information.&lt;/p&gt;

&lt;p&gt;Data Cleaning and Structuring&lt;/p&gt;

&lt;p&gt;After extracting the raw data, the next step is to clean and structure it. This process involves tasks such as parsing dates, converting currencies, and standardizing column names. The cleaned and structured data will be stored in two separate CSV files:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; uber_eats.csv: Contains information related to Uber Eats orders with columns: type, date, total, and restaurant.&lt;/li&gt;
&lt;li&gt; uber_ride.csv: Contains information related to Uber rides with columns: type, date, total, and driver.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import pdfplumberimport reimport osimport pandas as pddef extract_data(pdf_path):    with pdfplumber.open(pdf_path) as pdf:        page = pdf.pages[0]        content = page.extract_text()    date_pattern = r'\d{1,2} \w+ \d{4}'    date = re.search(date_pattern, content).group(0)    total_pattern = r'Total (\d+\,\d{2}) \$CA'    total = re.search(total_pattern, content).group(1).replace(',', '.')    driver_pattern = r'Votre chauffeur était (\w+)'    driver_match = re.search(driver_pattern, content)    restaurant_pattern = r'restaurant suivant : (.+?)\.'    restaurant_match = re.search(restaurant_pattern, content)    if driver_match:        return {'type': 'Uber', 'date': date, 'total': total, 'driver': driver_match.group(1)}    elif restaurant_match:        return {'type': 'Uber Eats', 'date': date, 'total': total, 'restaurant': restaurant_match.group(1)}    else:        return {'error': 'Invalid receipt format'}pdf_directory = '/home/stefen/uber/data'pdf_files = [f for f in os.listdir(pdf_directory) if f.endswith('.pdf')]uber_data = []uber_eats_data = []for pdf_file in pdf_files:    pdf_path = os.path.join(pdf_directory, pdf_file)    extracted_data = extract_data(pdf_path)    if 'error' in extracted_data:        print(f"Error processing file {pdf_file}: {extracted_data['error']}")    elif extracted_data['type'] == 'Uber':        uber_data.append(extracted_data)    elif extracted_data['type'] == 'Uber Eats':        uber_eats_data.append(extracted_data)uber_df = pd.DataFrame(uber_data)uber_eats_df = pd.DataFrame(uber_eats_data)uber_df.to_csv('uber_receipts.csv', index=False)uber_eats_df.to_csv('uber_eats_receipts.csv', index=False)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Here’s an explanation of each part of the code:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Import necessary libraries:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;  pdfplumber: To extract text from PDF files&lt;/li&gt;
&lt;li&gt;  re: To perform regular expression operations&lt;/li&gt;
&lt;li&gt;  os: To interact with the operating system, e.g., working with directories and files&lt;/li&gt;
&lt;li&gt;  pandas: To work with data in DataFrame format and save to CSV&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt; Define the &lt;code&gt;extract_data&lt;/code&gt; function that takes a PDF file path as an input: a. Open the PDF file using pdfplumber and get the first page b. Extract the text content from the page c. Use regular expressions to find the date, total, driver (if available), and restaurant (if available) information in the text d. If a driver is found, return the extracted data as a dictionary with the 'type' key set to 'Uber' e. If a restaurant is found, return the extracted data as a dictionary with the 'type' key set to 'Uber Eats' f. If neither a driver nor a restaurant is found, return an error dictionary indicating an invalid receipt format&lt;/li&gt;
&lt;li&gt; Specify the directory containing the PDF files and create a list of all PDF files in the directory.&lt;/li&gt;
&lt;li&gt; Initialize empty lists &lt;code&gt;uber_data&lt;/code&gt; and &lt;code&gt;uber_eats_data&lt;/code&gt; to store extracted data.&lt;/li&gt;
&lt;li&gt; Iterate through each PDF file in the list, call the &lt;code&gt;extract_data&lt;/code&gt; function to extract the data, and append it to the appropriate list based on the 'type' key value. If an error is encountered, print the error message.&lt;/li&gt;
&lt;li&gt; Create separate DataFrames for Uber and Uber Eats data using the pandas library.&lt;/li&gt;
&lt;li&gt; Save the DataFrames to CSV files (uber_receipts.csv and uber_eats_receipts.csv) without including the index column.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;After the data extraction and processing, the next step is to create the architecture for the PostgreSQL database and pgAdmin. In this section, we will use Docker and docker-compose to set up the services:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: "3.8"services:  postgres:    image: postgres:latest    environment:      POSTGRES_USER: postgres      POSTGRES_PASSWORD: mysecretpassword    ports:      - "0.0.0.0:5432:5432"    volumes:      - postgres_data:/var/lib/postgresql/data      - ./postgres-init:/docker-entrypoint-initdb.d  pgadmin:    image: dpage/pgadmin4:latest    environment:      PGADMIN_DEFAULT_EMAIL: admin@example.com      PGADMIN_DEFAULT_PASSWORD: mysecretpassword    ports:      - "8080:80"    depends_on:      - postgresvolumes:  postgres_data:
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the PostgreSQL database and pgAdmin have been set up, the next step is to initialize and configure our dbt project. After running the &lt;code&gt;dbt init&lt;/code&gt; command, we can start setting up the project structure. Here's an overview of the dbt project structure:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;C:.├───dbt_packages├───logs├───macros├───models│   ├───intermediate│   ├───marts│   │   ├───eats_dept│   │   └───rides_dept│   └───staging├───seeds└───target    ├───compiled    │   └───my_uber_project    │       └───models    │           ├───intermediate    │           ├───marts    │           │   ├───eats_dept    │           │   └───rides_dept    │           └───staging    └───run        └───my_uber_project            ├───models            │   ├───intermediate            │   ├───marts            │   │   ├───eats_dept            │   │   └───rides_dept            │   └───staging            └───seeds
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The project structure contains the following folders:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; &lt;code&gt;dbt_packages&lt;/code&gt;: Contains packages installed via the &lt;code&gt;packages.yml&lt;/code&gt; file.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;logs&lt;/code&gt;: Stores log files generated during dbt execution.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;macros&lt;/code&gt;: Contains custom macros for the project.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;models&lt;/code&gt;: Holds the dbt models, organized into subdirectories for intermediate, staging, and marts (eats_dept and rides_dept) layers.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;seeds&lt;/code&gt;: Contains CSV files with seed data to be loaded into the database.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;target&lt;/code&gt;: Stores the output of dbt commands (compiled and run). This folder has subdirectories for compiled and run models, each with the same structure as the &lt;code&gt;models&lt;/code&gt; folder (intermediate, staging, and marts layers).&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By following this structure, we can keep our dbt project organized and easy to maintain. Each subdirectory within the &lt;code&gt;models&lt;/code&gt; folder serves a specific purpose, helping to separate different stages of data transformation and analysis.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;profiles.yml&lt;/code&gt; file is a configuration file used by dbt to define different environments (called profiles) and their connection settings. In this example, two profiles are defined: &lt;code&gt;dev&lt;/code&gt; and &lt;code&gt;prod&lt;/code&gt;. Each profile specifies the connection settings for a PostgreSQL database.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;profiles.yml&lt;/code&gt; file contents:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; &lt;code&gt;default&lt;/code&gt;: The name of the profile group. You can have multiple profile groups if needed.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;outputs&lt;/code&gt;: A dictionary containing the different profiles within the group.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;dev&lt;/code&gt;: The development profile with the following connection settings:&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;code&gt;type&lt;/code&gt;: The type of database being used (in this case, PostgreSQL).&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;threads&lt;/code&gt;: The number of concurrent threads dbt should use when executing queries.&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;host&lt;/code&gt;, &lt;code&gt;port&lt;/code&gt;, &lt;code&gt;user&lt;/code&gt;, &lt;code&gt;pass&lt;/code&gt;, &lt;code&gt;dbname&lt;/code&gt;, &lt;code&gt;schema&lt;/code&gt;: Connection settings for the PostgreSQL database (host, port, username, password, database name, and schema) in the development environment.&lt;/li&gt;
&lt;/ul&gt;

&lt;ol&gt;
&lt;li&gt; &lt;code&gt;prod&lt;/code&gt;: The production profile with similar connection settings as the &lt;code&gt;dev&lt;/code&gt; profile. Replace the placeholders (&lt;code&gt;[host]&lt;/code&gt;, &lt;code&gt;[port]&lt;/code&gt;, &lt;code&gt;[prod_username]&lt;/code&gt;, &lt;code&gt;[prod_password]&lt;/code&gt;, &lt;code&gt;[dbname]&lt;/code&gt;, and &lt;code&gt;[prod_schema]&lt;/code&gt;) with the actual values for your production environment.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;target&lt;/code&gt;: Specifies the default target profile to use when running dbt commands. In this case, it is set to &lt;code&gt;dev&lt;/code&gt;.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By defining different profiles, you can easily switch between development and production environments when running dbt commands, allowing you to test and develop transformations in one environment before deploying them to another. To switch between profiles, you can change the &lt;code&gt;target&lt;/code&gt; value in the &lt;code&gt;profiles.yml&lt;/code&gt; file or use the &lt;code&gt;--target&lt;/code&gt; flag when running dbt commands.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;default:  outputs:    dev:      type: postgres      threads: 3      host: localhost      port: 5432      user: dbt      pass: dbt_password      dbname: olap      schema: public    prod:      type: postgres      threads: 1      host: [host]      port: [port]      user: [prod_username]      pass: [prod_password]      dbname: [dbname]      schema: [prod_schema]  target: dev
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Once the dbt project is set up, one of the first things to do is to manage the date format in the Uber receipts, which are in French. To handle the French month names, you can create a custom function in your PostgreSQL database to translate them into English month names. Here’s a step-by-step explanation of the process:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Connect to your &lt;code&gt;olap&lt;/code&gt; PostgreSQL database using your preferred database client or pgAdmin.&lt;/li&gt;
&lt;li&gt; Create a new function called &lt;code&gt;translate_french_month_to_english&lt;/code&gt; that accepts a single &lt;code&gt;TEXT&lt;/code&gt; parameter representing the French month name.&lt;/li&gt;
&lt;li&gt; Inside the function, use a &lt;code&gt;CASE&lt;/code&gt; statement to map the French month names (in lowercase) to their corresponding English month names.&lt;/li&gt;
&lt;li&gt; Return the translated English month name or &lt;code&gt;NULL&lt;/code&gt; if no match is found.&lt;/li&gt;
&lt;li&gt; The function is defined using the &lt;code&gt;plpgsql&lt;/code&gt; language.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Here’s the SQL code for the function:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE OR REPLACE FUNCTION translate_french_month_to_english(month TEXT)RETURNS TEXT AS $$BEGIN  RETURN CASE    WHEN lower(month) = 'janvier' THEN 'January'    WHEN lower(month) = 'février' THEN 'February'    WHEN lower(month) = 'mars' THEN 'March'    WHEN lower(month) = 'avril' THEN 'April'    WHEN lower(month) = 'mai' THEN 'May'    WHEN lower(month) = 'juin' THEN 'June'    WHEN lower(month) = 'juillet' THEN 'July'    WHEN lower(month) = 'août' THEN 'August'    WHEN lower(month) = 'septembre' THEN 'September'    WHEN lower(month) = 'octobre' THEN 'October'    WHEN lower(month) = 'novembre' THEN 'November'    WHEN lower(month) = 'décembre' THEN 'December'    ELSE NULL  END;END;$$ LANGUAGE plpgsql;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;By adding this function to your PostgreSQL database, you can easily translate the French month names in your Uber receipts data to their English counterparts. This will help standardize the date format and make it easier to work with the data in dbt and other data processing tools.&lt;/p&gt;

&lt;p&gt;Once the &lt;code&gt;translate_french_month_to_english&lt;/code&gt; function is created, you can now create your first staging models for both Uber Eats and Uber rides data. In each model, you will use the custom date parsing function to convert the French date format to a standardized format.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Create a new model for staging Uber Eats data:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{{ config(materialized='table') }}SELECT *,       {{ parse_custom_date('date') }} as transaction_dateFROM {{ ref('uber_eats') }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This model uses the &lt;code&gt;parse_custom_date&lt;/code&gt; macro (which should be defined in your &lt;code&gt;macros&lt;/code&gt; folder) to convert the French date format in the &lt;code&gt;date&lt;/code&gt; column. The resulting standardized date is stored in a new column called &lt;code&gt;transaction_date&lt;/code&gt;.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Create a new model for staging Uber rides data:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{{ config(materialized='table') }}SELECT *,       {{ parse_custom_date('date') }} as transaction_dateFROM {{ ref('uber_ride') }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With the staging models in place, run the following dbt commands:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;  &lt;code&gt;dbt seed&lt;/code&gt;: This command loads the seed data from the CSV files in the &lt;code&gt;seeds&lt;/code&gt; folder into your database.&lt;/li&gt;
&lt;li&gt;  &lt;code&gt;dbt run&lt;/code&gt;: This command executes the models in your project. It will create the staging tables for both Uber Eats and Uber rides data, applying the custom date parsing to standardize the date format.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;After creating the staging models, you can create an intermediate model called &lt;code&gt;uber_transactions.sql&lt;/code&gt; in the &lt;code&gt;models/intermediate&lt;/code&gt; folder. This model combines the Uber Eats and Uber rides data into a single table, which can be useful for further analysis and reporting. Here's a breakdown of the code in this model:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Set the materialization type to ‘table’ using the &lt;code&gt;config&lt;/code&gt; function:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{{ config(materialized='table') }}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;2. Create a Common Table Expression (CTE) named &lt;code&gt;eats&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;WITH eats AS (    SELECT 'eats' as type,           transaction_date,           total,           restaurant    FROM {{ ref('uber_eating') }}),
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This CTE selects data from the &lt;code&gt;uber_eating&lt;/code&gt; staging model, adding a new column called &lt;code&gt;type&lt;/code&gt; with a value of 'eats' to identify the source of the data.&lt;/p&gt;

&lt;p&gt;3. Create another CTE named &lt;code&gt;rides&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;rides AS (    SELECT 'rides' as type,           transaction_date,           total,           driver    FROM {{ ref('uber_riding') }})
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Similar to the &lt;code&gt;eats&lt;/code&gt; CTE, this CTE selects data from the &lt;code&gt;uber_riding&lt;/code&gt; staging model and adds a &lt;code&gt;type&lt;/code&gt; column with a value of 'rides' to identify the source of the data.&lt;/p&gt;

&lt;p&gt;4. Combine the &lt;code&gt;eats&lt;/code&gt; and &lt;code&gt;rides&lt;/code&gt; CTEs using the &lt;code&gt;UNION ALL&lt;/code&gt; operator:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;SELECT *FROM eatsUNION ALLSELECT *FROM rides
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;UNION ALL&lt;/code&gt; operator combines the results of the two SELECT statements into a single result set. This will create a single table containing both Uber Eats and Uber rides data, with the &lt;code&gt;type&lt;/code&gt; column indicating the source of each row.&lt;/p&gt;

&lt;p&gt;Full-Code:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;-- models/intermediate/uber_transactions.sql{{ config(materialized='table') }}WITH eats AS (    SELECT 'eats' as type,           transaction_date,           total,           restaurant    FROM {{ ref('uber_eating') }}),rides AS (    SELECT 'rides' as type,           transaction_date,           total,           driver    FROM {{ ref('uber_riding') }})SELECT *FROM eatsUNION ALLSELECT *FROM rides
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--QXLoC2ls--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:1050/1%2AdR3nWRNo2n5fuZn68eWtIQ.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--QXLoC2ls--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:1050/1%2AdR3nWRNo2n5fuZn68eWtIQ.png" alt="" width="800" height="517"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After creating the intermediate model, the next step is to create a series of models. These models will generate various aggregated metrics for the rides data, such as average expense, and expenses by week, month, quarter, and year.&lt;/p&gt;

&lt;p&gt;Here’s a brief overview of the models:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; &lt;code&gt;average_expense_rides.sql&lt;/code&gt;: Calculates the average expense of Uber rides.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;monthly_expenses_rides.sql&lt;/code&gt;: Aggregates the total expenses of Uber rides on a monthly basis.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;quarterly_expenses_rides.sql&lt;/code&gt;: Aggregates the total expenses of Uber rides on a quarterly basis.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;weekly_expenses_rides.sql&lt;/code&gt;: Aggregates the total expenses of Uber rides on a weekly basis.&lt;/li&gt;
&lt;li&gt; &lt;code&gt;yearly_expenses_rides.sql&lt;/code&gt;: Aggregates the total expenses of Uber rides on a yearly basis.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;By creating these models, with power bi you can analyze and visualize various aspects of your Uber rides expenses over different time periods. This will provide a comprehensive understanding of your Uber rides expenditure patterns and help you make more informed decisions about your transportation budget.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--66qzzmgw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:630/1%2AwegAW7DAfK-xMBBWDNiyrA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--66qzzmgw--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://miro.medium.com/v2/resize:fit:630/1%2AwegAW7DAfK-xMBBWDNiyrA.png" alt="" width="420" height="535"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;In conclusion&lt;/strong&gt;, this project demonstrates the process of building a data pipeline for analyzing Uber and Uber Eats expenses. By leveraging tools such as Python, PostgreSQL, dbt, and Power BI, you can extract, clean, and transform data from various sources, then visualize it in a way that provides valuable insights.&lt;/p&gt;

&lt;p&gt;Throughout this project, you:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; Extracted data from Uber and Uber Eats PDF receipts using Python and pdfplumber.&lt;/li&gt;
&lt;li&gt; Created a PostgreSQL database and a pgAdmin container using Docker Compose.&lt;/li&gt;
&lt;li&gt; Loaded the extracted data into the database and configured a dbt project.&lt;/li&gt;
&lt;li&gt; Created a custom PostgreSQL function to handle date translations from French to English.&lt;/li&gt;
&lt;li&gt; Built a series of dbt models for staging, intermediate, and aggregated data.&lt;/li&gt;
&lt;li&gt; Analyzed and visualized the data using Power BI (not covered in detail here but assumed as part of the project).&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://medium.com/@stefentaime_10958/uber-project-analyzing-personal-uber-and-uber-eats-expenses-with-elt-data-pipeline-using-dbt-91ead4aea5df"&gt;https://medium.com/@stefentaime_10958/uber-project-analyzing-personal-uber-and-uber-eats-expenses-with-elt-data-pipeline-using-dbt-91ead4aea5df&lt;/a&gt;&lt;/p&gt;

</description>
      <category>powerbi</category>
      <category>dbt</category>
      <category>sql</category>
      <category>python</category>
    </item>
    <item>
      <title>AI-Powered Accommodation Search: Harnessing the Power of Hadoop, MongoDB, Spark, GPT-3, React, and Flask</title>
      <dc:creator>Stefen</dc:creator>
      <pubDate>Mon, 15 May 2023 14:06:14 +0000</pubDate>
      <link>https://dev.to/stefentaime/ai-powered-accommodation-search-harnessing-the-power-of-hadoop-mongodb-spark-gpt-3-react-and-flask-1a69</link>
      <guid>https://dev.to/stefentaime/ai-powered-accommodation-search-harnessing-the-power-of-hadoop-mongodb-spark-gpt-3-react-and-flask-1a69</guid>
      <description>&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--VE09heG---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/4000/1%2ABD-N-PRO9gXaxJB9jEhXOA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--VE09heG---/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/4000/1%2ABD-N-PRO9gXaxJB9jEhXOA.png" alt="" width="800" height="587"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In today’s dynamic and data-driven world, the ability to harness information effectively and deliver user-specific results has become paramount. This is particularly true in the accommodation industry, where customer preferences can vary enormously. Leveraging AI and Big Data technologies, I’ve created an intelligent data pipeline capable of tailoring accommodation search results to individual needs.&lt;/p&gt;

&lt;p&gt;This article outlines the process of building an AI data pipeline using Hadoop HDFS, MongoDB, Spark, GPT-3, React, and Flask. The goal is to develop an intuitive platform where users can search for Airbnb apartments based on a target city, budget, and duration of stay, all powered by the intelligent language model, GPT-3.&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 1: Data Acquisition and Upload to HDFS
&lt;/h2&gt;

&lt;p&gt;The data used in this project was derived from a dataset comprising listings from Airbnb, Booking, and Hotels.com. This dataset is focused on exploring the pricing landscape within the most popular European capitals. Each city contributed 500 hotels from each platform, culminating in a total of 7500 hotel listings.&lt;/p&gt;

&lt;p&gt;Example Berlin.json:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "airbnbHotels": [
    {
      "thumbnail": "https://a0.muscache.com/im/pictures/miso/Hosting-647664199858827562/original/cfc2fc4c-d703-4827-bc25-f1acb07e0025.jpeg?im_w=720",
      "title": "Private room in Tempelhof",
      "subtitles": ["Privatzimmer in Tempelhofer Feld", "1 bed", "Jul 24 – 31"],
      "price": { "currency": "$", "value": 31, "period": "night" },
      "rating": 5,
      "link": "https://www.airbnb.com/rooms/647664199858827562"
    },
    {
      "thumbnail": "https://a0.muscache.com/im/pictures/b9cb8b8c-51b3-46c4-b9cd-d27053f7d628.jpg?im_w=720",
      "title": "Private room in Mitte",
      "subtitles": ["Tiny, individual Room with private Bathroom", "1 small double bed", "Sep 1 – 8"],
      "price": { "currency": "$", "value": 40, "period": "night" },
      "rating": 4.96,
      "link": "https://www.airbnb.com/rooms/41220512"
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Paris.json:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "airbnbHotels": [
    {
      "thumbnail": "https://a0.muscache.com/im/pictures/b9bc653d-df43-4f91-8162-0be5c912a3b4.jpg?im_w=720",
      "title": "Apartment in Paris",
      "subtitles": ["A nice little space and cute", "1 single bed", "May 8 – 13"],
      "price": { "currency": "$", "value": 57, "period": "night" },
      "rating": 4.46,
      "link": "https://www.airbnb.com/rooms/7337703"
    },
    {
      "thumbnail": "https://a0.muscache.com/im/pictures/261b3fb2-fec9-4009-b8b9-90d9976597fd.jpg?im_w=720",
      "title": "Apartment in Paris",
      "subtitles": ["Small cozy cocoon in Paris! 1 person studio", "1 double bed", "Jul 30 – Aug 5"],
      "price": { "currency": "$", "value": 107, "period": "night" },
      "rating": 4.98,
      "link": "https://www.airbnb.com/rooms/25820315"
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After downloading the data onto my local system, a Makefile was created to transfer the data onto Hadoop’s distributed file system (HDFS). This process involves copying the JSON files from the local directory into the HDFS using Docker and Hadoop commands, ensuring the data is stored in a distributed manner for efficient processing. Run &lt;strong&gt;make copy_files_to_hdfs&lt;/strong&gt;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;.PHONY: copy_files_to_hdfs
copy_files_to_hdfs:
 @for local_path in $(local_dir_hdfs)/*.json; do \
  filename=$$(basename -- "$$local_path"); \
  docker cp $$local_path $(container_id_hdfs):$(docker_dir_hdfs)/$$filename; do
  docker exec $(container_id_hdfs) hadoop fs -copyFromLocal $(docker_dir_hdfs)/$$filename $(hdfs_dir_hdfs)/$$filename; \
 done
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;h2&gt;
  
  
  Step 2: Data Processing with Spark and Storage in MongoDB
&lt;/h2&gt;

&lt;p&gt;Once the data is in HDFS, the next step is to process and clean it. For this purpose, Apache Spark comes into play. Apache Spark is an open-source, distributed computing system that handles data processing and analytics on large datasets, making it perfect for the task at hand.&lt;/p&gt;

&lt;p&gt;Let’s delve deeper into how Spark processes the data and loads it into MongoDB.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, lit, col, from_json
from pyspark.sql.types import *

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Data Processing") \
    .master("spark://spark:7077") \
    .getOrCreate()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The first step involves creating a Spark session, which is the entry point to any Spark functionality. Here, we specify the name of the application and the master URL to connect to, which in this case is a Spark standalone cluster.&lt;/p&gt;

&lt;p&gt;Next, we define the schema of our data. Spark schema is a blueprint of what our DataFrame should look like, including the types of data it contains.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Define the schema
schema = StructType([
    StructField("airbnbHotels", ArrayType(
        StructType([
            StructField("thumbnail", StringType()),
            StructField("title", StringType()),
            StructField("subtitles", ArrayType(StringType())),
            StructField("price", StructType([
                StructField("currency", StringType()),
                StructField("value", DoubleType()),
                StructField("period", StringType())
            ])),
            StructField("rating", DoubleType()),
            StructField("link", StringType())
        ])
    ))
])
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Once the schema is defined, we load the data from HDFS and process it. The data processing includes adding a new column with the city name and exploding the “airbnbHotels” field into separate rows. We then select the individual fields from the “airbnbHotels” objects.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;for city in cities:
    # Load the data for the current city
    df = spark.read.option("multiline", "true").json(f'hdfs://namenode:9000/input/{city}.json')

    # Add a new column with the city name
    df = df.withColumn("city", lit(city))

    # Explode the "airbnbHotels" field into separate rows
    df = df.select("city", explode(df.airbnbHotels).alias("airbnbHotels"))

    # Select the individual fields from the "airbnbHotels" objects
    df = df.select(
        lit(city).alias("city"),
        df.airbnbHotels.thumbnail.alias("thumbnail"),
        df.airbnbHotels.title.alias("title"),
        df.airbnbHotels.subtitles.alias("subtitles"),
        df.airbnbHotels.price.alias("price"),
        df.airbnbHotels.rating.alias("rating"),
        df.airbnbHotels.link.alias("link")
    )
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;After the processing is done, the data is ready to be loaded into MongoDB. MongoDB is a popular NoSQL database known for its flexibility and scalability.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;    # Define MongoDB connection URI for Mongo Atlas, you can create free account
    MONGODB_URI = "mongodb+srv://&amp;lt;username&amp;gt;:&amp;lt;password&amp;gt;@cluster0.mongodb.net/?retryWrites=true&amp;amp;w=majority" 
    # Use mongodb://root:example@mongo:27017 if localhost docker for mongo
    # Write DataFrame to MongoDB
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .mode("append") \
        .option("uri", MONGODB_URI) \
        .option("database", "booking") \
        .option("collection", "airbnb") \
        .save()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The DataFrame is written into MongoDB using the MongoDB Spark Connector. This allows data to be written from Spark into MongoDB by specifying the MongoDB connection URI, the database, and the collection where the data should be stored.&lt;/p&gt;

&lt;p&gt;Finally, the SparkSession is stopped to free up resources:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Stop the SparkSession
spark.stop()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;This step is vital as it not only cleans the data but also transforms and structures it in a way that makes it readily accessible for the subsequent stages of the pipeline.&lt;/p&gt;

&lt;p&gt;By using the powerful processing capabilities of Apache Spark and the flexible storage of MongoDB, we can effectively handle, clean, and store large amounts of data. This forms a robust foundation for the next steps in our pipeline where the data will be used to generate tailored search results.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--aXFtFb5Q--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3424/1%2AUYiQPga1noAniFrdo3XF0A.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--aXFtFb5Q--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/3424/1%2AUYiQPga1noAniFrdo3XF0A.png" alt="" width="800" height="357"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 3: Flask API and GPT-3 Integration
&lt;/h2&gt;

&lt;p&gt;In step 3, a Flask API is built to serve as the interface between the GPT-3 model and the MongoDB database. This is a crucial component of the pipeline as it enables real-time interaction with the data stored in MongoDB through the power of GPT-3 natural language processing.&lt;/p&gt;

&lt;p&gt;To begin with, necessary modules are imported and a Flask application is set up. The Python JSON Encoder is extended to handle ObjectId instances, which are specific to MongoDB. This is done via a custom JSONEncoder class. This ensures the JSON serialization process can handle the unique ObjectId values from MongoDB documents.&lt;/p&gt;

&lt;p&gt;The Flask application also sets up logging to help with debugging and monitoring the application:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;A MongoDB client is established with the relevant MongoDB URI. The database and collection that will be interacted with are then specified.&lt;/p&gt;

&lt;p&gt;The function process_query() communicates with the GPT-3 model, sending a system message and the user's search query. The function then extracts and returns the content from the GPT-3 response.&lt;/p&gt;

&lt;p&gt;Next, the function translate_to_mongo_query() takes the output from process_query() and translates it into a MongoDB query. It does this by extracting the city, budget, and dates from the processed query and using these values to create a query dictionary.&lt;/p&gt;

&lt;p&gt;The find_results() function takes a user's query, processes it through GPT-3, translates it into a MongoDB query, then executes the query on the MongoDB collection. It then returns the results as a string.&lt;/p&gt;

&lt;p&gt;The generate_preamble() function generates a friendly introduction to the apartment options list using the GPT-3 model.&lt;/p&gt;

&lt;p&gt;The format_results() function takes the result set and formats each document into a string that includes relevant apartment information.&lt;/p&gt;

&lt;p&gt;Finally, the home() function in the Flask app receives the POST request containing the user's query. It uses the find_results() function to get the MongoDB results, which it then sends to the GPT-3 model. The GPT-3 model generates a response, which the function returns as a JSON object. If there are any errors during this process, the function catches the exception and returns the error message.&lt;/p&gt;

&lt;p&gt;In summary, this Flask API serves as the glue that connects the user’s search request, GPT-3’s natural language processing, and the MongoDB database.&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import os
import requests
import json
from pymongo import MongoClient
from flask import Flask, request, jsonify
import logging
from bson import ObjectId
from flask import json
import openai
import re
from flask_cors import CORS

app = Flask(__name__)
CORS(app)

class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, ObjectId):
            return str(o)
        return json.JSONEncoder.default(self, o)

app.json_encoder = JSONEncoder

logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)

# Get API key from environment variable
api_key = os.getenv("OPENAI_KEY")
print(api_key)  # Debug: print the api_key
if not api_key:
    raise ValueError("Missing OpenAI API key")

# Set the OpenAI API key
openai.api_key = api_key

# Connect to MongoDB
client = MongoClient('mongodb+srv://&amp;lt;username&amp;gt;:&amp;lt;password&amp;gt;@cluster0.td8y4zr.mongodb.net/?retryWrites=true&amp;amp;w=majority')
db = client['booking']
collection = db['airbnb']

def process_query(query):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {openai.api_key}"
    }
    # Use GPT-3 to process the query
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant. You can retrieve information from a database and present it to the user. You can communicate information about an apartment such as its title, location, price, availability, and the link to its page. The user will provide you with their requirements in the following format: 'city: city_name, budget: budget_amount, dates: start_date to end_date'."},
            {"role": "user", "content": query}
        ],
        max_tokens=100
    )

    # Get the desired information from the response
    processed_query = response.choices[0].message['content'].strip()

    return processed_query

def translate_to_mongo_query(processed_query):
    # Initialize the MongoDB query dictionary
    query_dict = {}

    # Search for city in the processed query
    city_search = re.search(r'city: (.*?),', processed_query, re.IGNORECASE)
    if city_search:
        query_dict['city'] = city_search.group(1).strip()

    # Search for budget in the processed query
    budget_search = re.search(r'budget: \$?(\d+)', processed_query, re.IGNORECASE)
    if budget_search:
        # Convert the budget to the same currency as in your database
        budget_in_dollars = int(budget_search.group(1))
        query_dict['price.value'] = {'$lte': budget_in_dollars}

    # Search for the dates in the processed query
    date_search = re.search(r'dates: (.*?) - (.*?)\.', processed_query, re.IGNORECASE)
    if date_search:
        # Format the dates string to match your database format
        dates_string = f"{date_search.group(1)} – {date_search.group(2)}"
        query_dict['subtitles'] = {"$regex": dates_string}

    return query_dict

def find_results(query):
    logger.debug('Starting to find results')

    # Use GPT-3 to understand the query
    processed_query = process_query(query)
    print("Processed Query:", processed_query)
    logger.debug('Processed query with GPT-3')

    # Translate the processed query into MongoDB query
    mongo_query = translate_to_mongo_query(processed_query)
    logger.debug('Translated query to MongoDB query')
    print("MongoDB query:", mongo_query)

    # Run the query and get the results
    results = collection.find(mongo_query).limit(5)  
    logger.debug('Got results from MongoDB')

    # Transform the results into a string, including less information for each result
    results_string = "\n".join([f"Apartment title: {result['title']}, price: {result['price']['value']}, link: {result['link']}" for result in results])

    return results_string

def generate_preamble():
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": "Provide a friendly introduction to a list of apartment options."}
        ],
        max_tokens=100
    )

    # Get the generated preamble from the response
    preamble = response.choices[0].message['content'].strip()

    return preamble

def format_results(results):
    formatted_results = []
    for result in results:
        formatted_result = f"Title: {result['title']}\nCity: {result['city']}\nPrice: {result['price']['value']} {result['price']['currency']} per {result['price']['period']}\nDates: {', '.join(result['subtitles'])}\nRating: {result['rating']}\nLink: {result['link']}"
        formatted_results.append(formatted_result)
    return formatted_results

@app.route('/', methods=['POST'])
def home():
    try:
        data = request.get_json(force=True)
        query = data['query']

        # Get the results string from find_results
        results_string = find_results(query)

        # Create a GPT-3 prompt with the results
        prompt = f"{query}\n{results_string}"

        # Check if the total token count is within the model's limit
        total_tokens = len(query.split()) + len(results_string.split())
        if total_tokens &amp;gt; 4097:
            raise ValueError(f"Total token count ({total_tokens}) exceeds the model's limit (4097)")

        # Use GPT-3 to generate a response
        response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt}
            ],
        )
        gpt3_response = response.choices[0].message['content'].strip()

        return jsonify({"response": gpt3_response})
    except Exception as e:
        return str(e)

if __name__ == '__main__':
    app.run(debug=True)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--xbsYAcpp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2820/1%2AGvFJKcSS5bj0s4c0bazSoA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--xbsYAcpp--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2820/1%2AGvFJKcSS5bj0s4c0bazSoA.png" alt="" width="800" height="75"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 4: User Interface with React
&lt;/h2&gt;

&lt;p&gt;In step 4, a React application is built as the user interface where users can enter their search criteria such as city, budget, and period of stay. This is done by creating a functional component SearchBar that maintains two states: query and results.&lt;/p&gt;

&lt;p&gt;Here’s how the main components of this script work:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;useState(): This React Hook is used to add React state to functional components. query is initialized as an empty string, and setQuery function is used to change its value. The same goes for results and setResults.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;handleSubmit(): This is an asynchronous function that is triggered when the user submits the search form. It sends a POST request to the Flask backend (running on &lt;a href="http://localhost:5000"&gt;http://localhost:5000&lt;/a&gt;) with the search query as JSON in the request body. It then waits for the response from the backend, parses the JSON response, and sets the results state with the returned data.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;return(): This function returns the JSX that renders the search bar and the results. It includes a form with an input field and a submit button. The input field's value is bound to the query state, and any change in the input field updates the query state. When the form is submitted, the handleSubmit function is called. The results are then displayed in a div element.&lt;/p&gt;

&lt;p&gt;import React, { useState } from 'react';&lt;br&gt;
import './Search.css';&lt;/p&gt;

&lt;p&gt;function SearchBar() {&lt;br&gt;
  const [query, setQuery] = useState("");&lt;br&gt;
  const [results, setResults] = useState("");&lt;/p&gt;

&lt;p&gt;const handleSubmit = async (event) =&amp;gt; {&lt;br&gt;
    event.preventDefault();&lt;br&gt;
    const response = await fetch('&lt;a href="http://localhost:5000"&gt;http://localhost:5000&lt;/a&gt;', {&lt;br&gt;
      method: 'POST',&lt;br&gt;
      headers: {&lt;br&gt;
        'Content-Type': 'application/json'&lt;br&gt;
      },&lt;br&gt;
      body: JSON.stringify({ query })&lt;br&gt;
    });&lt;br&gt;
    const data = await response.json();&lt;br&gt;
    setResults(data.response);&lt;br&gt;
  };&lt;/p&gt;

&lt;p&gt;return (&lt;br&gt;
    &lt;/p&gt;

      
        
          className="search-input"
          type="text"
          value={query}
          onChange={event =&amp;gt; setQuery(event.target.value)}
        /&amp;gt;
        Search
      
      {results}
    
  );
}

&lt;p&gt;export default SearchBar;&lt;/p&gt;
&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;In the given example, the user enters “I want an apartment in Berlin with a budget of amount 700$ between May 8–13” in the search bar. The React application sends this query to the Flask backend, which processes the query using GPT-3, forms a MongoDB query, retrieves matching results from the MongoDB database, and sends the results back to the React application. The application then displays the results, which include a list of apartments that match the user’s criteria with a link to each apartment.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--9ZujEkdg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2406/1%2AHEd7O9eR6WIabsTo3P2DTA.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--9ZujEkdg--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_800/https://cdn-images-1.medium.com/max/2406/1%2AHEd7O9eR6WIabsTo3P2DTA.png" alt="" width="800" height="387"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this article, we have explored an innovative and efficient system that integrates GPT-3, Apache Spark, MongoDB, and React to create a user-friendly search application.&lt;/p&gt;

&lt;p&gt;The first step involves data collection from a source like Airbnb, transforming and storing it in a distributed file system like HDFS. Apache Spark is used for this purpose, demonstrating its power in handling large scale data processing tasks.&lt;/p&gt;

&lt;p&gt;Next, the data stored in HDFS is processed and loaded into MongoDB using PySpark, showcasing the flexibility and ease of combining big data technologies with NoSQL databases.&lt;/p&gt;

&lt;p&gt;The third step introduces GPT-3, a state-of-the-art language model developed by OpenAI. With its remarkable natural language understanding capabilities, GPT-3 is used to interpret the user’s search query, which is then transformed into a MongoDB query to retrieve relevant results from the database.&lt;/p&gt;

&lt;p&gt;Finally, a user-friendly interface is developed using React, a popular JavaScript library for building UIs. This interface allows users to input their search queries in a natural language format and get results in real-time, providing a seamless user experience.&lt;/p&gt;

&lt;p&gt;In conclusion, this system demonstrates the potential of combining big data technologies, AI, and modern frontend development to create powerful and user-friendly applications. It illustrates the possibilities that open up when different technologies are integrated to work together, providing a practical and efficient solution for complex search problems. The methods and technologies discussed here can be extended or modified for various use cases, paving the way for future innovations in the intersection of AI, big data, and web development.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/Stefen-Taime/IA_Data_Pipeline"&gt;Github&lt;/a&gt;&lt;br&gt;
&lt;a href="https://medium.com/@stefentaime_10958/ai-powered-accommodation-search-harnessing-the-power-of-hadoop-mongodb-spark-gpt-3-react-and-7e0bfc41bf26"&gt;Medium&lt;/a&gt;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;To run this project, follow these steps:

Step 1: Clone the Repository
First, clone the repository from GitHub using the following command:

git clone https://github.com/Stefen-Taime/IA_Data_Pipeline.git
Step 2: Navigate to Project Directory
Next, navigate to the project directory using this command:

cd IA_Data_Pipeline
Step 3: Use Makefile Commands
You'll need to run a series of commands using the Makefile provided in the project. The Makefile is a tool that simplifies building and managing the project.

Copy Files to HDFS: This will copy the necessary files to Hadoop Distributed File System (HDFS). Run the following command:

make copy_files_to_hdfs
Load Data to MongoDB: This will load the data from HDFS to MongoDB. Run the following command:

make run_load_to_mongo
Run API: This command will start the Flask API which uses GPT-3 to process the user's queries. Run the following command:

make run_api
Start Frontend: This will start the React application, which serves as the user interface for the project. Run the following command:

make start_front_end
Important Note: This project uses OpenAI's GPT-3, and you will need an OpenAI API key to run the application. Make sure to set the OpenAI API key in your environment variables before starting the API.

If you encounter any issues while setting up or running the project, you can refer to the project's documentation or open an issue in the GitHub repository. Remember, this project is an example of how to integrate various technologies, and you might need to adjust some settings based on your specific environment and setup.
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

</description>
      <category>database</category>
      <category>ia</category>
      <category>openai</category>
      <category>data</category>
    </item>
  </channel>
</rss>
