<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: totalSophie</title>
    <description>The latest articles on DEV Community by totalSophie (@sophie_toffee).</description>
    <link>https://dev.to/sophie_toffee</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1063599%2Fc440a4f2-d017-4121-8163-bcbb51366b61.png</url>
      <title>DEV Community: totalSophie</title>
      <link>https://dev.to/sophie_toffee</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/sophie_toffee"/>
    <language>en</language>
    <item>
      <title>Ingesting Data to Postgres</title>
      <dc:creator>totalSophie</dc:creator>
      <pubDate>Thu, 11 Jan 2024 10:55:20 +0000</pubDate>
      <link>https://dev.to/sophie_toffee/ingesting-data-to-postgres-lk5</link>
      <guid>https://dev.to/sophie_toffee/ingesting-data-to-postgres-lk5</guid>
      <description>&lt;p&gt;DE &lt;a href="https://github.com/DataTalksClub/data-engineering-zoomcamp/tree/main"&gt;Zoomcamp&lt;/a&gt; study notes&lt;/p&gt;

&lt;p&gt;To set up PostgreSQL in Docker, run the pgcli command, and execute SQL statements, you can follow these steps:&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 1: Install Docker
&lt;/h3&gt;

&lt;p&gt;Make sure you have Docker installed on your machine. You can download Docker from the official website: &lt;a href="https://www.docker.com/"&gt;Docker&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 2: Pull PostgreSQL Docker Image
&lt;/h3&gt;

&lt;p&gt;Open a terminal and pull the official PostgreSQL Docker image:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker pull postgres
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 3: Run PostgreSQL Container
&lt;/h3&gt;

&lt;p&gt;Run a PostgreSQL container with a specified password for the default user 'postgres':&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker run &lt;span class="nt"&gt;--name&lt;/span&gt; mypostgres &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;POSTGRES_USER&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"root"&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;POSTGRES_PASSWORD&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"password"&lt;/span&gt; &lt;span class="nt"&gt;-e&lt;/span&gt; &lt;span class="nv"&gt;POSTGRES_DB&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"ny_taxi"&lt;/span&gt; &lt;span class="nt"&gt;-v&lt;/span&gt; &lt;span class="si"&gt;$(&lt;/span&gt;&lt;span class="nb"&gt;pwd&lt;/span&gt;&lt;span class="si"&gt;)&lt;/span&gt;/ny_taxi_postgres_data:/var/lib/postgresql/data &lt;span class="nt"&gt;-p&lt;/span&gt; 5432:5432 &lt;span class="nt"&gt;-d&lt;/span&gt; postgres

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This command starts a PostgreSQL container named 'mypostgres' with the password 'password' and exposes port 5432 on the host.&lt;br&gt;
e declares the environment variables.&lt;br&gt;
v declares volume path&lt;/p&gt;
&lt;h3&gt;
  
  
  Step 4: Install pgcli
&lt;/h3&gt;

&lt;p&gt;Install pgcli, a command-line interface for PostgreSQL, on your local machine:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pip &lt;span class="nb"&gt;install &lt;/span&gt;pgcli
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Step 5: Connect to PostgreSQL using pgcli
&lt;/h3&gt;

&lt;p&gt;Connect to the PostgreSQL database using pgcli:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;pgcli &lt;span class="nt"&gt;-h&lt;/span&gt; localhost &lt;span class="nt"&gt;-p&lt;/span&gt; 5432 &lt;span class="nt"&gt;-U&lt;/span&gt; root &lt;span class="nt"&gt;-d&lt;/span&gt; ny_taxi &lt;span class="nt"&gt;-W&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;h declares the host variable which is localhost connection port.&lt;br&gt;
u is the username.&lt;br&gt;
d is the database name&lt;br&gt;
-W prompts the user for the password. After entering the command&lt;/p&gt;

&lt;p&gt;Enter the password when prompted (use 'password' if you followed the previous steps).&lt;/p&gt;
&lt;h3&gt;
  
  
  Step 6: Execute SQL Statements
&lt;/h3&gt;

&lt;p&gt;Once connected, you can execute SQL statements directly in the pgcli interface. For example:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Create a new database&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;DATABASE&lt;/span&gt; &lt;span class="n"&gt;mydatabase&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Switch to the new database&lt;/span&gt;
&lt;span class="err"&gt;\&lt;/span&gt;&lt;span class="k"&gt;c&lt;/span&gt; &lt;span class="n"&gt;mydatabase&lt;/span&gt;

&lt;span class="c1"&gt;-- Create a table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;mytable&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;serial&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="n"&gt;age&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Insert some data&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;mytable&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;age&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'John'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;25&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'Jane'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;30&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Query the data&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;mytable&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Feel free to modify the SQL statements according to your requirements.&lt;/p&gt;

&lt;h3&gt;
  
  
  Step 7: To Exit pgcli and Stop the PostgreSQL Container
&lt;/h3&gt;

&lt;p&gt;To exit pgcli, type &lt;code&gt;\q&lt;/code&gt;. After that, stop and remove the PostgreSQL container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker stop mypostgres
docker &lt;span class="nb"&gt;rm &lt;/span&gt;mypostgres
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Data Ingestion from CSV to PostgreSQL using Pandas and SQLAlchemy
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Used Jupyter notebook to insert the data in chunks.&lt;/li&gt;
&lt;li&gt;Downloaded the &lt;a href="https://data.cityofnewyork.us/api/views/m6nq-qud6/rows.csv?accessType=DOWNLOAD"&gt;NY taxi 2021 data&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 1: Setting Up the Environment:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Use Pandas to read the CSV file in chunks for more efficient processing.&lt;/li&gt;
&lt;li&gt;Define a PostgreSQL connection string using SQLAlchemy.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 2: Creating the Table Schema:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Read the first chunk of data to create the initial table schema in the database.&lt;/li&gt;
&lt;li&gt;Utilize Pandas' &lt;code&gt;to_sql&lt;/code&gt; method to replace or create the table in the PostgreSQL database.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Step 3: Iterative Data Insertion:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Iterate through the remaining chunks of the CSV file.&lt;/li&gt;
&lt;li&gt;Optimize timestamp data types using Pandas' &lt;code&gt;to_datetime&lt;/code&gt;.&lt;/li&gt;
&lt;li&gt;Append each chunk to the existing PostgreSQL table.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;strong&gt;Final Code:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;

&lt;span class="c1"&gt;# specify the database you want to use based on the docker run command we had
# postgresql://username:password@localhost:port/dbname
&lt;/span&gt;&lt;span class="n"&gt;db_url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;postgresql://root:password@localhost:5432/ny_taxi&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;
&lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;db_url&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Chunksize for reading CSV and inserting into the database
&lt;/span&gt;&lt;span class="n"&gt;chunk_size&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;100000&lt;/span&gt;

&lt;span class="c1"&gt;# Create an iterator for reading CSV in chunks
&lt;/span&gt;&lt;span class="n"&gt;csv_iter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;2021_Yellow_Taxi_Trip_Data.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;chunksize&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;chunk_size&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Get the first chunk to create the table schema
&lt;/span&gt;&lt;span class="n"&gt;first_chunk&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;csv_iter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="n"&gt;first_chunk&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;yellow_taxi_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;replace&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c1"&gt;# Loop through the remaining chunks and append to the table
&lt;/span&gt;&lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;chunk&lt;/span&gt; &lt;span class="ow"&gt;in&lt;/span&gt; &lt;span class="n"&gt;csv_iter&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;t_start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="c1"&gt;# Fix timestamp type issue
&lt;/span&gt;    &lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;tpep_pickup_datetime&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;tpep_pickup_datetime&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;
    &lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;tpep_dropoff_datetime&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;tpep_dropoff_datetime&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;])&lt;/span&gt;

    &lt;span class="c1"&gt;# Append data to the existing table
&lt;/span&gt;    &lt;span class="n"&gt;chunk&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;yellow_taxi_data&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;False&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Print a message and benchmark the time
&lt;/span&gt;    &lt;span class="n"&gt;t_end&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Inserted another chunk... took &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;t_end&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;t_start&lt;/span&gt;&lt;span class="si"&gt;:&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;3&lt;/span&gt;&lt;span class="n"&gt;f&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; second(s)&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Extra, Extra!!!
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://i.giphy.com/media/3ohjUPZcfmzytBQGlO/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/3ohjUPZcfmzytBQGlO/giphy.gif" alt="Animated GIF" width="480" height="360"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Using &lt;code&gt;argparse&lt;/code&gt; to Parse Command Line Arguments
&lt;/h2&gt;

&lt;p&gt;Utilizing the &lt;code&gt;argparse&lt;/code&gt; standard library to efficiently parse command line arguments, this script downloads a CSV file from a specified URL and ingests its data into a PostgreSQL database.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight python"&gt;&lt;code&gt;&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;
&lt;span class="kn"&gt;from&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;create_engine&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="k"&gt;as&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;argparse&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="n"&gt;os&lt;/span&gt;

&lt;span class="k"&gt;def&lt;/span&gt; &lt;span class="nf"&gt;main&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;):&lt;/span&gt;
    &lt;span class="n"&gt;user&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;
    &lt;span class="n"&gt;password&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt;
    &lt;span class="n"&gt;host&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;
    &lt;span class="n"&gt;port&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;
    &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;
    &lt;span class="n"&gt;table_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt;
    &lt;span class="n"&gt;url&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;params&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;

    &lt;span class="n"&gt;csv_name&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;output.csv&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;

    &lt;span class="c1"&gt;# Download the CSV using the os system function to execute command line arguments from Python
&lt;/span&gt;    &lt;span class="n"&gt;os&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;system&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;wget &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;url&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt; -O &lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;csv_name&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;engine&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;create_engine&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sa"&gt;f&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;postgresql://&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;user&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;password&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;@&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;host&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;:&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;port&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="s"&gt;/&lt;/span&gt;&lt;span class="si"&gt;{&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="si"&gt;}&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;df_iter&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;read_csv&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;csv_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;iterator&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;chunksize&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;100000&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_iter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_pickup_datetime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_pickup_datetime&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_dropoff_datetime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_dropoff_datetime&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Adding the column names
&lt;/span&gt;    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;head&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;n&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;replace&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c1"&gt;# Adding the first batch of rows
&lt;/span&gt;    &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="bp"&gt;True&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
        &lt;span class="n"&gt;t_start&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="n"&gt;df&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df_iter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_pickup_datetime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_pickup_datetime&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_dropoff_datetime&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;pd&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_datetime&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tpep_dropoff_datetime&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;df&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;to_sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;table_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;con&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="n"&gt;engine&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;if_exists&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;append&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;t_end&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;time&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

        &lt;span class="nf"&gt;print&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;Inserted another chunk... took %.3f second(s)&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t_end&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="n"&gt;t_start&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;

&lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;__name__&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;__main__&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;argparse&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;ArgumentParser&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;description&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;Ingest CSV data to Postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--user&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;user name for postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--password&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;password for postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--host&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;host for postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--port&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;port for postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--db&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;database name for postgres&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--table_name&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;name of the table where we will write the results to&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;add_argument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="s"&gt;--url&lt;/span&gt;&lt;span class="sh"&gt;'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;help&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;url of the CSV&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;args&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;parser&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;parse_args&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

&lt;span class="c1"&gt;# Dockerizing Ingestion Script
&lt;/span&gt;
&lt;span class="n"&gt;In&lt;/span&gt; &lt;span class="n"&gt;the&lt;/span&gt; &lt;span class="n"&gt;provided&lt;/span&gt; &lt;span class="n"&gt;Dockerfile&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;

&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;**&lt;/span&gt;&lt;span class="n"&gt;Dockerfile&lt;/span&gt;&lt;span class="o"&gt;**&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="sb"&gt;``&lt;/span&gt;&lt;span class="err"&gt;`&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="n"&gt;endraw&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="n"&gt;docker&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;python&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="mf"&gt;3.9&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; 
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;RUN&lt;/span&gt; &lt;span class="n"&gt;apt&lt;/span&gt;&lt;span class="o"&gt;-&lt;/span&gt;&lt;span class="n"&gt;get&lt;/span&gt; &lt;span class="n"&gt;install&lt;/span&gt; &lt;span class="n"&gt;wget&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;RUN&lt;/span&gt; &lt;span class="n"&gt;pip&lt;/span&gt; &lt;span class="n"&gt;install&lt;/span&gt; &lt;span class="n"&gt;pandas&lt;/span&gt; &lt;span class="n"&gt;sqlalchemy&lt;/span&gt; &lt;span class="n"&gt;psycopg2&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; 
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;WORKDIR&lt;/span&gt; &lt;span class="o"&gt;/&lt;/span&gt;&lt;span class="n"&gt;app&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;COPY&lt;/span&gt; &lt;span class="n"&gt;ingest_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt; &lt;span class="n"&gt;ingest_data&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;py&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; 
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;ENTRYPOINT&lt;/span&gt; &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;python&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="s"&gt;ingest_data.py&lt;/span&gt;&lt;span class="sh"&gt;"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="o"&gt;%&lt;/span&gt; &lt;span class="n"&gt;raw&lt;/span&gt; &lt;span class="o"&gt;%&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;The &lt;code&gt;psychopg2&lt;/code&gt; package is included to facilitate access to the PostgreSQL database from Python, serving as a valuable "database wrapper."&lt;/p&gt;

&lt;p&gt;To build the Docker image, execute the following command:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
docker build -t taxi_ingest:v001 .


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Now run the image instead of the script with the network argument and changing the database host...&lt;/p&gt;

&lt;p&gt;&lt;em&gt;You can serve the local file over HTTP on your machine and access it through your IP address by running this in its location&lt;/em&gt;&lt;br&gt;
&lt;code&gt;python3 -m http.server&lt;/code&gt;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
# If your file is local
URL="http://192.x.x.x:8000/2021_Yellow_Taxi_Trip_Data.csv"
docker run -it \
  --network=pg-network \
  taxi_ingest:v001 \
  --user=root \
  --password=password \
  --host=pg-database \
  --port=5432 \
  --db=ny_taxi \
  --table_name=yellow_taxi_trips \
  --url="${URL}"


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;a href="https://i.giphy.com/media/836DqJPm7SnlAk1nh5/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/836DqJPm7SnlAk1nh5/giphy.gif" width="480" height="480"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Not yet...&lt;/p&gt;

&lt;h2&gt;
  
  
  Connecting pgAdmin and Postgres
&lt;/h2&gt;

&lt;p&gt;pgCLI allows for quickly looking into data. But the more convenient way to work with a postgres database is to use the pgAdmin tool which is a web based GUI tool.&lt;/p&gt;

&lt;p&gt;To install pgAdmin in a Docker container, you can follow these steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Pull the pgAdmin Docker Image:&lt;/strong&gt;
Use the following command to pull the official pgAdmin Docker image from Docker Hub.&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
   docker pull dpage/pgadmin4


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Create a Docker Network:&lt;/strong&gt;
It's a good practice to create a Docker network to facilitate communication between the PostgreSQL container and the pgAdmin container.&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
   docker network create pgadmin-network


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Run the PostgreSQL Container:&lt;/strong&gt;
Now modify the postgres db run command&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
docker run --name pg-database \
--network pgadmin-network \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="password" \
-e POSTGRES_DB="ny_taxi" \
-v $(pwd)/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
-d postgres


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Replace &lt;code&gt;password&lt;/code&gt; with your desired PostgreSQL password.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Run the pgAdmin Container:&lt;/strong&gt;
Now, you can run the pgAdmin container and link it to the PostgreSQL container.&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
bash
   docker run --name pgadmin-container \
              --network pgadmin-network \
              -e PGADMIN_DEFAULT_EMAIL=myemail@example.com \
              -e PGADMIN_DEFAULT_PASSWORD=mypassword \
              -p 5055:80 \
              -d dpage/pgadmin4


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Replace &lt;code&gt;myemail@example.com&lt;/code&gt; and &lt;code&gt;mypassword&lt;/code&gt; with your desired pgAdmin login credentials.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Access pgAdmin:&lt;/strong&gt;&lt;br&gt;
Open your web browser and navigate to &lt;code&gt;http://localhost:5055&lt;/code&gt;. Log in with the credentials you provided in the previous step.&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;Add PostgreSQL Server:&lt;/strong&gt;&lt;br&gt;
In pgAdmin, click on "Add New Server" and fill in the necessary details to connect to the PostgreSQL server running in the Docker container.&lt;/p&gt;&lt;/li&gt;
&lt;/ol&gt;

&lt;ul&gt;
&lt;li&gt;Host name/address: &lt;code&gt;postgres-container&lt;/code&gt; (the name of your PostgreSQL container)&lt;/li&gt;
&lt;li&gt;Port: &lt;code&gt;5432&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Username: &lt;code&gt;postgres&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Password: (the password you set in step 3)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Now, you should be able to manage your PostgreSQL server using pgAdmin in a Docker container. Adjust the commands and parameters according to your specific requirements and environment.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Rather, we can also use Docker Compose&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Create a &lt;code&gt;docker-compose.yml&lt;/code&gt;.. Now, you don't specify the network&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

services:
  pgdatabase:
    image: postgres:latest
    environment:
      - POSTGRES_USER=root
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=ny_taxi
    volumes:
      - "./ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
    ports:
      - "5432:5432"
    container_name: mypostgres

  pgadmin:
    image: dpage/pgadmin4
    environment:
      - PGADMIN_DEFAULT_EMAIL=myemail@example.com
      - PGADMIN_DEFAULT_PASSWORD=mypassword
    ports:
      - "5055:80"
    container_name: pgadmin


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To start Docker Compose &lt;code&gt;docker-compose up&lt;/code&gt;&lt;br&gt;
To run Docker Compose in the background &lt;code&gt;docker-compose up -d&lt;/code&gt;&lt;br&gt;
To view Docker Compose containers &lt;code&gt;docker-compose ps&lt;/code&gt;&lt;br&gt;
To stop Docker Compose &lt;code&gt;docker-compose down&lt;/code&gt;&lt;br&gt;
To stop Docker Compose if you used the -d flag &lt;code&gt;docker-compose down -v&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://i.giphy.com/media/l0IycQmt79g9XzOWQ/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/l0IycQmt79g9XzOWQ/giphy.gif" width="480" height="270"&gt;&lt;/a&gt;&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Beginner's guide to Apache Flink</title>
      <dc:creator>totalSophie</dc:creator>
      <pubDate>Mon, 08 Jan 2024 14:28:42 +0000</pubDate>
      <link>https://dev.to/sophie_toffee/beginners-guide-to-apache-flink-5f3d</link>
      <guid>https://dev.to/sophie_toffee/beginners-guide-to-apache-flink-5f3d</guid>
      <description>&lt;p&gt;Apache Flink is a powerful and versatile open-source stream processing framework that goes beyond traditional batch processing to handle real-time data streaming and analytics. In this article, we will explore the fundamental concepts of Apache Flink, compare batch and stream processing, highlight the differences between Flink and Apache Spark, delve into system requirements, installation procedures, Maven usage, and discuss Flink's APIs and transformations.&lt;/p&gt;

&lt;h2&gt;
  
  
  Understanding Apache Flink
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://i.giphy.com/media/f6z5TkrTIBZILYOd1t/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/f6z5TkrTIBZILYOd1t/giphy.gif" width="540" height="540"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  What is Apache Flink?
&lt;/h3&gt;

&lt;p&gt;Apache Flink is a distributed stream processing framework designed for big data processing and analytics. It excels in handling large volumes of data with low-latency processing capabilities, making it suitable for real-time applications. Flink supports event time processing, fault-tolerance, and stateful processing, enabling developers to build robust and scalable data processing applications.&lt;/p&gt;

&lt;h2&gt;
  
  
  Batch Processing vs. Stream Processing
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Batch Processing
&lt;/h3&gt;

&lt;p&gt;Batch processing involves processing data in chunks or batches at a time. It is suitable for scenarios where data can be collected and processed in a non-continuous manner. Examples of batch processing include nightly ETL (Extract, Transform, Load) jobs, data warehousing, and large-scale data analysis.&lt;/p&gt;

&lt;h3&gt;
  
  
  Stream Processing
&lt;/h3&gt;

&lt;p&gt;Stream processing deals with the continuous and real-time processing of data as it is generated. It is ideal for scenarios where low-latency and near-real-time insights are crucial. &lt;br&gt;
Examples of stream processing include fraud detection, monitoring systems, and real-time analytics on social media feeds.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft3oi1ha4erwm89uih7uy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ft3oi1ha4erwm89uih7uy.png" alt="Image description of batch vs stream analytics[](url)" width="710" height="183"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Apache Flink vs. Apache Spark
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Core Differences
&lt;/h3&gt;

&lt;p&gt;While both Apache Flink and Apache Spark are powerful big data processing frameworks, they have some key differences:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Processing Model:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Flink focuses on event time processing and supports true event-driven stream processing.&lt;/li&gt;
&lt;li&gt;Spark primarily follows a micro-batch processing model, which introduces slight latency in stream processing.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;State Management:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Flink emphasizes on stateful processing, offering built-in support for managing state.&lt;/li&gt;
&lt;li&gt;Spark typically relies on external storage solutions for state management.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Fault Tolerance:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Flink achieves fault tolerance through distributed snapshots and state replication.&lt;/li&gt;
&lt;li&gt;Spark employs lineage information and recomputation for fault tolerance.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Layers of Apache Flink Ecosystem
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvh3s0be288c9waxmzmzv.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fvh3s0be288c9waxmzmzv.png" alt="Apache Flink Ecosystem" width="576" height="423"&gt;&lt;/a&gt;&lt;/p&gt;
&lt;h2&gt;
  
  
  Apache Flink System Requirements
&lt;/h2&gt;

&lt;p&gt;Before diving into Apache Flink, ensure that your system meets the following requirements:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Java:&lt;/strong&gt; Flink is a Java-based framework, so ensure that Java is installed on your machine.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Memory:&lt;/strong&gt; Sufficient RAM to accommodate Flink's processes.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Disk Space:&lt;/strong&gt; Adequate disk space for Flink's data storage requirements.&lt;/li&gt;
&lt;/ul&gt;
&lt;h2&gt;
  
  
  Installation and Maven Usage
&lt;/h2&gt;
&lt;h3&gt;
  
  
  Installing Apache Flink
&lt;/h3&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Download:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Visit the &lt;a href="https://flink.apache.org/downloads.html"&gt;Apache Flink download page&lt;/a&gt; and select the desired version.&lt;/li&gt;
&lt;li&gt;Follow the installation instructions provided for your operating system.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;strong&gt;Configuration:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Customize Flink's configuration files based on your specific requirements.&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;/ol&gt;
&lt;h3&gt;
  
  
  Using Maven with Apache Flink
&lt;/h3&gt;

&lt;p&gt;Maven simplifies the management of project dependencies and builds. To use Flink with Maven:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Add Dependency:&lt;/strong&gt;&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Include the Flink dependency in your &lt;code&gt;pom.xml&lt;/code&gt; file:
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight xml"&gt;&lt;code&gt;   &lt;span class="nt"&gt;&amp;lt;dependencies&amp;gt;&lt;/span&gt;
       &lt;span class="nt"&gt;&amp;lt;dependency&amp;gt;&lt;/span&gt;
           &lt;span class="nt"&gt;&amp;lt;groupId&amp;gt;&lt;/span&gt;org.apache.flink&lt;span class="nt"&gt;&amp;lt;/groupId&amp;gt;&lt;/span&gt;
           &lt;span class="nt"&gt;&amp;lt;artifactId&amp;gt;&lt;/span&gt;flink-java&lt;span class="nt"&gt;&amp;lt;/artifactId&amp;gt;&lt;/span&gt;
           &lt;span class="nt"&gt;&amp;lt;version&amp;gt;&lt;/span&gt;${flink.version}&lt;span class="nt"&gt;&amp;lt;/version&amp;gt;&lt;/span&gt;
       &lt;span class="nt"&gt;&amp;lt;/dependency&amp;gt;&lt;/span&gt;
   &lt;span class="nt"&gt;&amp;lt;/dependencies&amp;gt;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;strong&gt;Or&lt;/strong&gt;&lt;br&gt;
&lt;strong&gt;You can create a project based on an Archetype with the Maven command below:&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;$ mvn archetype:generate                \
  -DarchetypeGroupId=org.apache.flink   \
  -DarchetypeArtifactId=flink-quickstart-java \
  -DarchetypeVersion=1.18.0
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Build and Run
&lt;/h2&gt;

&lt;p&gt;To compile and execute your Apache Flink project, follow these steps:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Build Project:&lt;/strong&gt;
Execute the following Maven command to build your Flink project:
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;   mvn clean package 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Run the Flink application using the generated JAR file.&lt;/p&gt;

&lt;h2&gt;
  
  
  Apache Flink APIs and Transformations
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Flink APIs
&lt;/h3&gt;

&lt;p&gt;Flink provides high-level APIs for Java and Scala:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DataStream API:&lt;/strong&gt; Used for stream processing applications.&lt;br&gt;
Supported by Java, Scala and Python&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;&lt;strong&gt;DataSet API:&lt;/strong&gt; Designed for batch processing applications.&lt;br&gt;
Supported by Java and Scala&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Data Sources for DataStream API
&lt;/h3&gt;

&lt;p&gt;&lt;a href="https://i.giphy.com/media/13hFjqkckEAxzy/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/13hFjqkckEAxzy/giphy.gif" width="500" height="281"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The DataStream API in Apache Flink supports various data sources for stream processing applications. Common data sources include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Kafka:&lt;/strong&gt; Flink can consume and process data from Kafka topics in real-time.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Socket Streams:&lt;/strong&gt; Flink allows the ingestion of data from socket streams, making it versatile for various streaming scenarios.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;File Systems:&lt;/strong&gt; Data can be read from various file systems, such as HDFS or local file systems, providing flexibility in handling different data storage formats.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Transformations in Flink
&lt;/h3&gt;

&lt;p&gt;Flink transformations are the building blocks of data processing pipelines. Key transformations include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Map:&lt;/strong&gt; Applies a function to each element in the dataset.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Filter:&lt;/strong&gt; Retains elements satisfying a specified condition.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;KeyBy:&lt;/strong&gt; Groups elements based on a key.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Window:&lt;/strong&gt; Defines time or count-based windows for stream processing.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Reduce:&lt;/strong&gt; Aggregates elements in a window.&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;Let's create a project!&lt;/p&gt;

</description>
      <category>flink</category>
      <category>dataengineering</category>
      <category>dataprocessing</category>
    </item>
    <item>
      <title>Demystifying MLOps: Week 1</title>
      <dc:creator>totalSophie</dc:creator>
      <pubDate>Sun, 18 Jun 2023 06:51:37 +0000</pubDate>
      <link>https://dev.to/sophie_toffee/demystifying-mlops-week-1-1foi</link>
      <guid>https://dev.to/sophie_toffee/demystifying-mlops-week-1-1foi</guid>
      <description>&lt;p&gt;Notes from &lt;a href="https://www.youtube.com/watch?v=MqI8vt3-cag&amp;amp;list=PL3MmuxUbc_hIhxl5Ji8t4O6lPAOpHaCLR"&gt;MLOps ZoomCamp&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  1.1 What is MLOps
&lt;/h2&gt;

&lt;p&gt;MLOps (Machine Learning Operations) refers to the practices, processes, and tools used to manage the entire lifecycle of machine learning models. It bridges the gap between data scientists, software engineers, and operations teams to ensure successful deployment and maintenance of ML models.&lt;/p&gt;

&lt;h3&gt;
  
  
  Key Components
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Data Management and Versioning&lt;/li&gt;
&lt;li&gt; Model Training and Evaluation&lt;/li&gt;
&lt;li&gt; Deployment and Infrastructure&lt;/li&gt;
&lt;li&gt; Continuous Integration and Delivery&lt;/li&gt;
&lt;li&gt; Monitoring and Governance&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  1.2 Environment Preparation
&lt;/h2&gt;

&lt;p&gt;You can use an EC2 instance or your local environment&lt;/p&gt;

&lt;h2&gt;
  
  
  Step 1
&lt;/h2&gt;

&lt;p&gt;Download and install the Anaconda distribution of Python:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;wget https://repo.anaconda.com/archive/Anaconda3-2022.05-Linux-x86_64.sh
bash Anaconda3-2022.05-Linux-x86_64.sh
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 2
&lt;/h2&gt;

&lt;p&gt;Update existing packages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt update
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 3
&lt;/h2&gt;

&lt;p&gt;Install Docker:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;sudo apt install docker.io
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 4
&lt;/h2&gt;

&lt;p&gt;Create a separate directory for the installation and get the latest release of Docker Compose:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;mkdir soft
cd soft
wget https://github.com/docker/compose/releases/download/v2.18.0/docker-compose-linux-x86_64 -O docker-compose
chmod +x docker-compose
nano ~/.bashrc
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Add the following line to the .bashrc file:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;export PATH="${HOME}/soft:${PATH}"
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Save and exit the .bashrc file, then apply the changes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;source ~/.bashrc
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Step 5
&lt;/h2&gt;

&lt;p&gt;Run Docker to check if it's working:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker run hello-world
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  1.3 Training a ride duration prediction model
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Dataset
&lt;/h3&gt;

&lt;p&gt;Dataset used is &lt;a href="https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"&gt;2022 NYC green taxi trip records&lt;/a&gt;&lt;br&gt;
&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwag2uy1hpe3e7y8nx3qw.jpeg" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fwag2uy1hpe3e7y8nx3qw.jpeg" alt="Photo of a green cab" width="320" height="158"&gt;&lt;/a&gt;&lt;br&gt;
More information on the data is found at &lt;a href="https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf"&gt;https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Download the dataset&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Imports
&lt;/h4&gt;

&lt;p&gt;Import required packages&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;import pandas as pd 
import pickle 
import seaborn as sns 
import matplotlib.pyplot as plt 

from sklearn.feature_extraction import DictVectorizer 
from sklearn.linear_model import LinearRegression 
from sklearn.linear_model import Lasso
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_squared_error
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Reading the file:
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;jan_data = pd.read_parquet("./data/green_tripdata_2022-01.parquet")
jan_data.head()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;&lt;/th&gt;
&lt;th&gt;VendorID&lt;/th&gt;
&lt;th&gt;lpep_pickup_datetime&lt;/th&gt;
&lt;th&gt;lpep_dropoff_datetime&lt;/th&gt;
&lt;th&gt;store_and_fwd_flag&lt;/th&gt;
&lt;th&gt;RatecodeID&lt;/th&gt;
&lt;th&gt;PULocationID&lt;/th&gt;
&lt;th&gt;DOLocationID&lt;/th&gt;
&lt;th&gt;passenger_count&lt;/th&gt;
&lt;th&gt;trip_distance&lt;/th&gt;
&lt;th&gt;fare_amount&lt;/th&gt;
&lt;th&gt;extra&lt;/th&gt;
&lt;th&gt;mta_tax&lt;/th&gt;
&lt;th&gt;tip_amount&lt;/th&gt;
&lt;th&gt;tolls_amount&lt;/th&gt;
&lt;th&gt;ehail_fee&lt;/th&gt;
&lt;th&gt;improvement_surcharge&lt;/th&gt;
&lt;th&gt;total_amount&lt;/th&gt;
&lt;th&gt;payment_type&lt;/th&gt;
&lt;th&gt;trip_type&lt;/th&gt;
&lt;th&gt;congestion_surcharge&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;2022-01-01 00:14:21&lt;/td&gt;
&lt;td&gt;2022-01-01 00:15:33&lt;/td&gt;
&lt;td&gt;N&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;42&lt;/td&gt;
&lt;td&gt;42&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;0.44&lt;/td&gt;
&lt;td&gt;3.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;&lt;/td&gt;
&lt;td&gt;0.3&lt;/td&gt;
&lt;td&gt;4.8&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;2022-01-01 00:20:55&lt;/td&gt;
&lt;td&gt;2022-01-01 00:29:38&lt;/td&gt;
&lt;td&gt;N&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;116&lt;/td&gt;
&lt;td&gt;41&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;2.1&lt;/td&gt;
&lt;td&gt;9.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;&lt;/td&gt;
&lt;td&gt;0.3&lt;/td&gt;
&lt;td&gt;10.8&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;2022-01-01 00:57:02&lt;/td&gt;
&lt;td&gt;2022-01-01 01:13:14&lt;/td&gt;
&lt;td&gt;N&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;41&lt;/td&gt;
&lt;td&gt;140&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;3.7&lt;/td&gt;
&lt;td&gt;14.5&lt;/td&gt;
&lt;td&gt;3.25&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;4.6&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;&lt;/td&gt;
&lt;td&gt;0.3&lt;/td&gt;
&lt;td&gt;23.15&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;2.75&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;3&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;2022-01-01 00:07:42&lt;/td&gt;
&lt;td&gt;2022-01-01 00:15:57&lt;/td&gt;
&lt;td&gt;N&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;181&lt;/td&gt;
&lt;td&gt;181&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;1.69&lt;/td&gt;
&lt;td&gt;8&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;&lt;/td&gt;
&lt;td&gt;0.3&lt;/td&gt;
&lt;td&gt;9.3&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;4&lt;/td&gt;
&lt;td&gt;2&lt;/td&gt;
&lt;td&gt;2022-01-01 00:07:50&lt;/td&gt;
&lt;td&gt;2022-01-01 00:28:52&lt;/td&gt;
&lt;td&gt;N&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;33&lt;/td&gt;
&lt;td&gt;170&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;6.26&lt;/td&gt;
&lt;td&gt;22&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;0.5&lt;/td&gt;
&lt;td&gt;5.21&lt;/td&gt;
&lt;td&gt;0&lt;/td&gt;
&lt;td&gt;&lt;/td&gt;
&lt;td&gt;0.3&lt;/td&gt;
&lt;td&gt;31.26&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;1&lt;/td&gt;
&lt;td&gt;2.75&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;&lt;code&gt;jan_data.info()&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&amp;lt;class 'pandas.core.frame.DataFrame'&amp;gt;
RangeIndex: 62495 entries, 0 to 62494
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               62495 non-null  int64         
 1   lpep_pickup_datetime   62495 non-null  datetime64[ns]
 2   lpep_dropoff_datetime  62495 non-null  datetime64[ns]
 3   store_and_fwd_flag     56200 non-null  object        
 4   RatecodeID             56200 non-null  float64       
 5   PULocationID           62495 non-null  int64         
 6   DOLocationID           62495 non-null  int64         
 7   passenger_count        56200 non-null  float64       
 8   trip_distance          62495 non-null  float64       
 9   fare_amount            62495 non-null  float64       
 10  extra                  62495 non-null  float64       
 11  mta_tax                62495 non-null  float64       
 12  tip_amount             62495 non-null  float64       
 13  tolls_amount           62495 non-null  float64       
 14  ehail_fee              0 non-null      object        
 15  improvement_surcharge  62495 non-null  float64       
 16  total_amount           62495 non-null  float64       
 17  payment_type           56200 non-null  float64       
 18  trip_type              56200 non-null  float64       
 19  congestion_surcharge   56200 non-null  float64       
dtypes: datetime64[ns](2), float64(13), int64(3), object(2)
memory usage: 9.5+ MB
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Calculate duration of trip from dropoff and pickup times
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;jan_dropoff = pd.to_datetime(jan_data["lpep_dropoff_datetime"])
jan_pickup = pd.to_datetime(jan_data["lpep_pickup_datetime"])

jan_data["duration"] = jan_dropoff - jan_pickup

# Convert the values to minutes
jan_data["duration"] = jan_data.duration.apply(lambda td: td.total_seconds()/60)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Check the distribution of the duration
&lt;/h4&gt;

&lt;p&gt;&lt;code&gt;jan_data.duration.describe(percentiles=[0.95, 0.98, 0.99])&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;count    62495.000000
mean        19.019387
std         78.215732
min          0.000000
50%         11.583333
95%         35.438333
98%         49.722667
99%         68.453000
max       1439.466667
Name: duration, dtype: float64
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;sns.distplot(jan_data.duration)&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs6ieeiocdq8maf39160b.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fs6ieeiocdq8maf39160b.png" alt="Picture of the distplot" width="584" height="438"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We can see the data is skewed due to the presence of outliers&lt;br&gt;
Keeping only the records with the duration between 1 and 70 minutes&lt;/p&gt;

&lt;p&gt;&lt;code&gt;jan_data = jan_data[(jan_data.duration &amp;gt;= 1) &amp;amp; (jan_data.duration &amp;lt;= 60)]&lt;/code&gt;&lt;/p&gt;
&lt;h4&gt;
  
  
  One Hot Encoding
&lt;/h4&gt;

&lt;p&gt;Using Dictionary Vectorizer for One Hot Encoding&lt;br&gt;
Our categorical values that I will consider are the pickup and dropoff locations&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;categorical = ["PULocationID", "DOLocationID"]
numerical = ["trip_distance"]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Convert the column type to string from integers&lt;br&gt;
&lt;br&gt;&lt;br&gt;
&lt;code&gt;jan_data.loc[:, categorical] = jan_data[categorical].astype(str)&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Change our values to dictionaries

train_jan_data = jan_data[categorical + numerical].to_dict(orient='records')
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;dv = DictVectorizer()
X_train_jan = dv.fit_transform(train_jan_data)

# Convert the feature matrix to an array
fm_array = X_train_jan.toarray()
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Get the dimensionality of the feature matrix
fm_array.shape
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;(59837, 471)&lt;/code&gt;&lt;/p&gt;

&lt;h4&gt;
  
  
  Python function that would do the above steps
&lt;/h4&gt;

&lt;p&gt;Custom function to read and preprocess the data&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;def read_dataframe(filename):
    # Read the parquet file
    df = pd.read_parquet(filename)

    # Calculate the duration
    df_dropoff = pd.to_datetime(df["lpep_dropoff_datetime"])
    df_pickup = pd.to_datetime(df["lpep_pickup_datetime"])
    df["duration"] = df_dropoff - df_pickup

    # Remove outliers
    df["duration"] = df.duration.apply(lambda td: td.total_seconds()/60)
    df = df[(jan_data.duration &amp;gt;= 1) &amp;amp; (df.duration &amp;lt;= 60)]

    # Preparation for OneHotEncoding using DictVectorizer
    categorical = ["PULocationID", "DOLocationID"]
    df[categorical] = df[categorical].astype(str)

    return df

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h4&gt;
  
  
  Fitting Linear Regression Model
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;# Using January data as train and Feb as Validation

df_train = read_dataframe("./data/green_tripdata_2022-01.parquet")
df_val = read_dataframe("./data/green_tripdata_2022-02.parquet")
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;dv = DictVectorizer()

categorical = ["PULocationID", "DOLocationID"]
numerical = ["trip_distance"]


train_dicts= df_train[categorical + numerical].to_dict(orient='records')
X_train = dv.fit_transform(train_dicts)

val_dicts= df_val[categorical + numerical].to_dict(orient='records')
X_val = dv.transform(val_dicts)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;target = 'duration'
y_train = df_train[target].values
y_val = df_val[target].values
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;lr = LinearRegression()
lr.fit(X_train, y_train)

y_pred = lr.predict(X_val)

mean_squared_error(y_val, y_pred, squared=False)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;8.364575685718151&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;Try other models like lasso and Ridge&lt;/p&gt;

&lt;h4&gt;
  
  
  Save the model
&lt;/h4&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;with open('models/lin_reg.bin', 'wb') as f_out:
    pickle.dump((dv, lr), f_out)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://i.giphy.com/media/7yojoQtevjOCI/giphy.gif" class="article-body-image-wrapper"&gt;&lt;img src="https://i.giphy.com/media/7yojoQtevjOCI/giphy.gif" width="480" height="342"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Cover Photo by &lt;a href="https://unsplash.com/@alinnnaaaa?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText"&gt;Alina Grubnyak&lt;/a&gt; on &lt;a href="https://unsplash.com/s/photos/neural-networks?utm_source=unsplash&amp;amp;utm_medium=referral&amp;amp;utm_content=creditCopyText"&gt;Unsplash&lt;/a&gt;&lt;/p&gt;

</description>
      <category>machinelearning</category>
      <category>mlops</category>
      <category>aws</category>
    </item>
  </channel>
</rss>
