DEV Community

Anthony Ikeda
Anthony Ikeda

Posted on

Open Source Data Ingestion Pipeline

In this post I'm going to set up the foundation for a data ingestion pipeline integrating RDBMS and BLOB Storage.

Table of contents

  • Install and Set up Minio
  • Setup and configure PostgreSQL
  • Install and Set up Apache Drill
  • Your First Data Transfer

Install and Set up Minio

On macOS run the following command:

$ brew install minio/stable/minio
Enter fullscreen mode Exit fullscreen mode

You will now have a new service configured for Minio:

$ brew services list
Name          Status  User         Plist
minio         stopped 
Enter fullscreen mode Exit fullscreen mode

You can start Minio with the following command:

$ brew services start minio
==> Successfully started `minio` (label: homebrew.mxcl.minio)
$ brew services list
Name          Status  User         Plist
minio         started anthonyikeda /Users/user_a/Library/LaunchAgents/homebrew.mxcl.minio.plist
Enter fullscreen mode Exit fullscreen mode

You can access the web console on http://localhost:9000 in your web browser with the default credentials minioadmin/minioadmin

Minio Login

Minio Folders

Part of the Minio tool kit is the client mc. In order to use the client from the command line, Minio expects an alias to be configured to the URL.

You can set up a new alias to connect to with:

$ mc alias minioserv http://localhost:9000 minioadmin minioadmin
Added `minioserv` successfully.
Enter fullscreen mode Exit fullscreen mode

You should now be able to execute commands against your Minio instance:

$ mc mb minioserve/apachedrill
Bucket created successfully `minioserv/apachedrill`.
$ mc ls minioserv
[2021-06-11 16:15:29 PDT]     0B apachedrill/
Enter fullscreen mode Exit fullscreen mode

In order to avoid using our admin user and privileges, let's create a new user that external apps will use.

From the command line, execute the following command to create a new user apachedrill with password letmeintotheparty:

$ mc admin user add minioserv apachedrill letmeintotheparty
Added user `apachedrill` successfully.
Enter fullscreen mode Exit fullscreen mode

Our user, apachedrill can now be used to access Minio, however, they have no privileges to do anything. Thankfully, Minio is S3 compliant so we can use AWS policies to enable our user to interact with Minio.

Create a new file minio-privileges.json with the following content:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
            "s3:ListBucket",
            "s3:PutObject",
            "s3:GetObject",
            "s3:DeleteObject"
        ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:s3:::apachedrill/*",
        "arn:aws:s3:::apachedrill",
        "arn:aws:s3:::drilltmp/*",
        "arn:aws:s3:::drilltmp"
      ],
      "Sid": "BucketAccessForUser"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Create the policy in Minio:

$ mc admin policy add minioserv drill-bucket-policy /path/to/minio-privileges.json
Enter fullscreen mode Exit fullscreen mode

And allow our user to make use of the policy:

$ mc admin policy set minioserv drill-bucket-policy user=apachedrill
Policy `drill-bucket-policy` is set on user `apachedrill`
Enter fullscreen mode Exit fullscreen mode

This allows your new user apachedrill to perform actions at a bucket level in Minio.

Set up and Configure PostgreSQL

If you haven't installed postgres before, we will follow a similar approach like we did with Minio:

$ brew install postgres
$ brew services start postgresql
Enter fullscreen mode Exit fullscreen mode

You can access postgres with:

$ psql postgres
psql (13.3)
Type "help" for help.

postgres=# 
Enter fullscreen mode Exit fullscreen mode

For this article we will:

  • Create a database
  • Create a database user
  • Grant the user control over the database
  • Create some tables and data

Create the Database

Straight forward, we will create the database with:

postgres=# create database address_book_db;
CREATE DATABASE
postgres=# 
Enter fullscreen mode Exit fullscreen mode

Let's add a user and grant them control:

postgres=# create role drill_user with password 'letmein';
CREATE ROLE
postgres=# alter user drill_user with login;
postres=# grant all on database address_book_db to drill_user;
GRANT
Enter fullscreen mode Exit fullscreen mode

For this article I have some tables and mock data here [https://github.com/anthonyikeda/contact-api/tree/main/src/main/resources/db/data]

Now for the heart of the article...

Install and Set up Apache Drill

Apache Drill integrates different datasources and uses a common syntax. Typically you'd run Drill in a clustered environment to handle the different streams of data processing; for this article we will stick with standalone.

Download the latest release from: https://drill.apache.org/download/

Move it to a local accessible folder, unzip it and open a command line to the drill home folder.


NOTE

You may need to install your JDBC Driver. You can easily download the JAR file from mvnrepository.org and copy the files to <DRILL_HOME>/jars/3rdparty/


Starting drill is easy:

$ bin/drill-embedded
Apache Drill 1.18.0
"Your Drill is the Drill that will pierce the heavens."
apache drill> 
Enter fullscreen mode Exit fullscreen mode

This starts the server and command line tool allowing you to browse the web console and execute commands from the terminal.

Open a web browser at: http://localhost:8047

Alt Text

Typically in a clustered environment you'd see more nodes, but since we are running in standalone mode you should see your single instance that is running.

Next we are going to connect to PostgreSQL.

Connect to PostgreSQL

Navigate to the Storage Tab and hit the Create button. This will bring up a rudimentary editor to paste our configuration.

Give the Storage option a name (address_book_db) and paste the following customizing for your environment:

{
  "type": "jdbc",
  "driver": "org.postgresql.Driver",
  "url": "jdbc:postgresql://localhost:5432/address_book_db",
  "username": "drill_user",
  "password": "letmein",
  "caseInsensitiveTableNames": false,
  "sourceParameters": {
    "minimumIdle": 5,
    "autoCommit": false,
    "connectionTestQuery": "select version() as postgresql_version",
    "dataSource.cachePrepStmts": true,
    "dataSource.prepStmtCacheSize": 250
  },
  "enabled": true
}
Enter fullscreen mode Exit fullscreen mode

Hit Create and the new Storage should test the connection to the database and be ready to use.

If you return to the drill-cli you can now inspect your database connection:

apache drill> show databases;
+----------------------------------+
|           SCHEMA_NAME            |
+----------------------------------+
| addressbookdb.address_book_db    |
| addressbookdb.information_schema |
| addressbookdb.pg_catalog         |
| addressbookdb.public             |
| addressbookdb                    |
| sys                              |
+----------------------------------+
Enter fullscreen mode Exit fullscreen mode

Now we will connect to the database and view the tables:

apache drill> use addressbookdb.address_book_db
+------+-----------------------------------------------------------+
|  ok  |                          summary                          |
+------+-----------------------------------------------------------+
| true | Default schema changed to [addressbookdb.address_book_db] |
+------+-----------------------------------------------------------+
1 row selected (0.13 seconds)
apache drill (addressbookdb.address_book_db)> show tables;
+-------------------------------+--------------------------------+
|      TABLE_SCHEMA             |            TABLE_NAME          |
+-------------------------------+--------------------------------+
| addressbookdb.address_book_db | flyway_schema_history_pk       |
| addressbookdb.address_book_db | flyway_schema_history_s_idx.   |
| addressbookdb.address_book_db | pk_address_id                  |
| addressbookdb.address_book_db | pk_contact_id                  |
| addressbookdb.address_book_db | address_address_id_seq         |
| addressbookdb.address_book_db | contacts_contact_id_seq        |
| addressbookdb.address_book_db | address                        |
| addressbookdb.address_book_db | contacts                       |
| addressbookdb.address_book_db | flyway_schema_history          |
+-------------------------------+--------------------------------+
9 rows selected (0.347 seconds)

apache drill (addressbookdb.address_book_db)> 
Enter fullscreen mode Exit fullscreen mode

We can query the tables using standard SQL syntax:

apache drill (addressbookdb.address_book_db)> select * from addressbookdb.contacts limit 10;
+------------+--------------------+-------------------+---------------------------+
| contact_id | contact_first_name | contact_last_name |   contact_email_address   |
+------------+--------------------+-------------------+---------------------------+
| 100        | Bert               | Gedling           | bgedling0@myspace.com     |
| 101        | Dacie              | Beddingham        | dbeddingham1@usda.gov     |
| 102        | Lina               | Wey               | lwey2@deviantart.com      |
| 103        | Savina             | Housbie           | shousbie3@google.com.au   |
| 104        | Delphinia          | Lente             | dlente4@delicious.com     |
| 105        | Zonda              | Blinder           | zblinder5@google.es       |
| 106        | Charmian           | Costley           | ccostley6@java.com        |
| 107        | Darcy              | Bulluck           | dbulluck7@flavors.me      |
| 108        | Tim                | Duesberry         | tduesberry8@google.com.br |
| 109        | Torr               | Cordero           | tcordero9@ftc.gov         |
+------------+--------------------+-------------------+---------------------------+
10 rows selected (0.228 seconds)

Enter fullscreen mode Exit fullscreen mode

It's time to turn our attention back to Minio!

Wiring up Minio and pushing data

Open up the Storage tab in Drill again and create a new Datasource.

Give it a name (minio) and paste the following configuration:

{
  "type": "file",
  "connection": "s3a://apachedrill",
  "config": {
    "fs.s3a.secret.key": "letmeintotheparty",
    "fs.s3a.access.key": "apachedrill",
    "fs.s3a.connection.ssl.enabled": "false",
    "fs.s3a.endpoint": "http://127.0.0.1:9000",
    "fs.s3a.path.style.access": "false"
  },
  "workspaces": {
    "tmp": {
      "location": "/tmp",
      "writable": true,
      "defaultInputFormat": null,
      "allowAccessOutsideWorkspace": false
    },
    "root": {
      "location": "/data",
      "writable": true,
      "defaultInputFormat": null,
      "allowAccessOutsideWorkspace": false
    }
  },
  "formats": {
    "parquet": {
      "type": "parquet"
    },
    "avro": {
      "type": "avro",
      "extensions": [
        "avro"
      ]
    },
    "json": {
      "type": "json",
      "extensions": [
        "json"
      ]
    },
    "pcap": {
      "type": "pcap",
      "extensions": [
        "pcap"
      ]
    },
    "csvh": {
      "type": "text",
      "extensions": [
        "csvh"
      ],
      "extractHeader": true
    },
    "sequencefile": {
      "type": "sequencefile",
      "extensions": [
        "seq"
      ]
    },
    "pcapng": {
      "type": "pcapng",
      "extensions": [
        "pcapng"
      ]
    },
    "psv": {
      "type": "text",
      "extensions": [
        "tbl"
      ],
      "fieldDelimiter": "|"
    },
    "tsv": {
      "type": "text",
      "extensions": [
        "tsv"
      ],
      "fieldDelimiter": "\t"
    },
    "csv": {
      "type": "text",
      "extensions": [
        "csv"
      ]
    },
    "syslog": {
      "type": "syslog",
      "extensions": [
        "syslog"
      ],
      "maxErrors": 10
    },
    "ltsv": {
      "type": "ltsv",
      "extensions": [
        "ltsv"
      ]
    },
    "hdf5": {
      "type": "hdf5",
      "extensions": [
        "h5"
      ],
      "defaultPath": null
    },
    "spss": {
      "type": "spss",
      "extensions": [
        "sav"
      ]
    },
    "shp": {
      "type": "shp",
      "extensions": [
        "shp"
      ]
    },
    "excel": {
      "type": "excel",
      "extensions": [
        "xlsx"
      ],
      "lastRow": 1048576
    }
  },
  "enabled": true
}
Enter fullscreen mode Exit fullscreen mode

Key things to note:

S3 Connection

 "connection": "s3a://apachedrill",
Enter fullscreen mode Exit fullscreen mode

This is the bucket we are targeting when we connect to Minio. Any workspaces we set up are subfolders of this bucket. Earlier we created a policy that outlined what can be done with this bucket and applied the policy to our user apachedrill.

S3 Configuration

"config": {
    "fs.s3a.secret.key": "letmeintotheparty",
    "fs.s3a.access.key": "apachedrill",
    "fs.s3a.connection.ssl.enabled": "false",
    "fs.s3a.endpoint": "http://127.0.0.1:9000",
    "fs.s3a.path.style.access": "false"
  }
Enter fullscreen mode Exit fullscreen mode

Here we are configuring the connection to the Minio server, from which endpoint to use (fs.s3a.endpoint) to the credentials used to access Minio (fs.s3a.access.key, fs.s3a.secret.key)

It's important to override the fs.s3a.endpoint otherwise Drill will attempt to connect to S3 on AWS which we don't want right now.

Workspace Directories

"workspaces": {
    "tmp": {
      "location": "/tmp",
      "writable": true,
      "defaultInputFormat": null,
      "allowAccessOutsideWorkspace": false
    },
    "root": {
      "location": "/data",
      "writable": true,
      "defaultInputFormat": null,
      "allowAccessOutsideWorkspace": false
    }
  }
Enter fullscreen mode Exit fullscreen mode

These are the directories in the bucket we have nominated that we can put data in. When we push data to Minio from Drill, we will reference these directories as writeable targets.

Push data to Minio

What we will do next is create a materialized view from the data in postgresql and write the materialized view into Minio.

In the Apache Drill console type:

apache drill> use minio.root;
+------+----------------------------------------+
|  ok  |                summary                 |
+------+----------------------------------------+
| true | Default schema changed to [minio.root] |
+------+----------------------------------------+
1 row selected (0.195 seconds)
apache drill (minio.root)>
Enter fullscreen mode Exit fullscreen mode

Now, we will create a table in Minio using data from PostgreSQL:

apache drill (minio.root)> create table contact_cities (contact_id, contact_first_name, contact_last_name, city) AS
2...............semicolon> select a.contact_id, c.contact_first_name, c.contact_last_name, a.city
3...............semicolon> from addressbookdb.address a
4...............semicolon> join addressbookdb.contacts c 
5...............semicolon> on c.contact_id = a.contact_id;
+----------+---------------------------+
| Fragment | Number of records written |
+----------+---------------------------+
| 0_0      | 1001                      |
+----------+---------------------------+
1 row selected (1.357 seconds)
Enter fullscreen mode Exit fullscreen mode

What we have just done is taken the data in the form of our query (select a.contact_id, c.contact_first_name, c.contact_last_name, a.city from addressbookdb.address a join addressbookdb.contacts c on c.contact_id = a.contact_id;) and created a new view in Minio stored in parquet format.

If open your Minio browser and navigate to the apachedrill/data folder you should see a new folder: /contact_cities

Parquet View

In there should be a file called 0_0_0.parquet;

Let's see if we got the results we were after:

apache drill (minio.root)> select * from contact_cities limit 10;
+------------+--------------------+-------------------+------------+
| contact_id | contact_first_name | contact_last_name |    city    |
+------------+--------------------+-------------------+------------+
| 278        | Jania              | Kilalea           | Rochester  |
| 1          | Jonah              | Kent              | Lalor Park |
| 100        | Bert               | Gedling           | Newark     |
| 101        | Dacie              | Beddingham        | Sioux City |
| 102        | Lina               | Wey               | Lexington  |
| 103        | Savina             | Housbie           | Clearwater |
| 104        | Delphinia          | Lente             | Spokane    |
| 105        | Zonda              | Blinder           | New Castle |
| 106        | Charmian           | Costley           | Orlando    |
| 107        | Darcy              | Bulluck           | Dayton     |
+------------+--------------------+-------------------+------------+
10 rows selected (0.774 seconds)
apache drill (minio.root)> select count(*) from contact_cities;
+--------+
| EXPR$0 |
+--------+
| 1001   |
+--------+
1 row selected (0.491 seconds)
Enter fullscreen mode Exit fullscreen mode

Looks like we were successful!

In the next article we will set up Kafka and query records coming from Kafka in Apache Drill!

Top comments (0)