DEV Community

Anthony Ikeda
Anthony Ikeda

Posted on

Open Source Data Ingestion Pipelines - Part 2

In the previous post we set up Apache Drill, Minio and PostgreSQL to set up a basic pipeline of data processing.

In this installment we are allso going to incorporate Kafka and show how we can merge the relational and realtime events.

Connecting to Kafka

With an existing kafka setup open up the Apache Drill Storage tab and click on Update next to the inactive Kafka source:

Kafka Storage

Sending Data to Kafka

Create a topic

Open a terminal and run the topic create command:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic purchase-events --partitions 1 --replication-factor 1
Created topic purchase-events.
Enter fullscreen mode Exit fullscreen mode

Publish some JSON

Open a terminal in your KAFKA_HOME and run the kafka-consumer command:

$ bin/kafka-console-producer.sh --topic purchase-events --bootstrap-server localhost:9092
> 
Enter fullscreen mode Exit fullscreen mode

The scenario we will use is that once the main system has approved a payment, it will send a message to Kafka showing the transaction details and the approval status (SUCCESS or FAILED). There are some example JSON structures below:

{ "txn_id" :  12346, "store_id" :  "45873AD", "cust_id" :  "02145SYD", "amnt" :  122.49, "status" : "SUCCESS" }

{ "txn_id" :  12348, "store_id" :  "78534AS", "cust_id" :  "26354MLB", "amnt" :  16.01, "status" : "FAILED" }

{ "txn_id" :  12345, "store_id" :  "98369AS", "cust_id" :  "98764TGH", "amnt" :  34.99, "status" : "SUCCESS" }

{ "txn_id" :  12348, "store_id" :  "78534AS", "cust_id" :  "26354MLB", "amnt" :  16.01, "status" : "SUCCESS" }

{ "txn_id" :  12334, "store_id" :  "92374SS", "cust_id" :  "45268PER", "amnt" :  89.99, "status" : "FAILED" }

{ "txn_id" :  12352, "store_id" :  "98674DC", "cust_id" :  "25478MLB", "amnt" :  30.12, "status" : "SUCCESS" }
Enter fullscreen mode Exit fullscreen mode

In your Drill Query console, run the following query:

select * from kafka.`purchase-events`;
Enter fullscreen mode Exit fullscreen mode

You should get a table representing:

  • data
  • kafka topic
  • kafka partition id
  • message offset
  • message timestamp

Drill Kafka Results

Merging the RDBMS data with Realtime data

To actually make this data handy, what we will do is use the following scenario:

  1. User has submitted their order and the order is stored in PostgreSQL
  2. Our Order Service has submitted a request to the Payment Service to clear a payment for the order
  3. The Payment Service sends the transaction details and payment status to Kafka once it has completed the request
  4. We want to then view a quick report on the orders that are being placed and their status.
  5. Successful orders will then be sent to stores to be fulfilled.

To satisfy this, we will take the order information from our Order service and process only successful orders.

(next article we will add actions for items that have failed)

Create the order PostgreSQL table

Run the following script to create the database, role and tables and insert some data:

create database sales_db;

create role sales_admin with password 'letmein' login;

grant all on database sales_db to sales_admin;

create table purchase_order (
  order_id bigint generated always as identity,
  order_total numeric(5,2),
  store_id varchar(12),
  customer_id varchar(12),
  order_status varchar(20) DEFAULT 'SUBMITTED'
);

alter table purchase_order add constraint pk_order PRIMARY KEY (order_id);

insert into purchase_order (order_total, store_id, customer_id) values (12.56, 'STORE_1', 'CUST_12');

insert into purchase_order (order_total, store_id, customer_id) values (122.65, 'STORE_15', 'CUST_89');

insert into purchase_order (order_total, store_id, customer_id) values (56.98, 'STORE_43', 'CUST_51');

insert into purchase_order (order_total, store_id, customer_id) values (35.00, 'STORE_1', 'CUST_61');

select * from purchase_order;
 order_id | order_total | store_id | customer_id | order_status 
----------+-------------+----------+-------------+--------------
        1 |       12.56 | STORE_1  | CUST_12     | SUBMITTED
        2 |      122.65 | STORE_15 | CUST_89     | SUBMITTED
        3 |       56.98 | STORE_43 | CUST_51     | SUBMITTED
        4 |       35.00 | STORE_1  | CUST_61     | SUBMITTED
Enter fullscreen mode Exit fullscreen mode

We want some data that matches up to our database in the kafka topic so lets push the fresh data on the kafka-console-producer:

{ "txn_id" :  1, "store_id" :  "STORE_1", "cust_id" :  "CUST_12", "amnt" :  12.56, "status" : "SUCCESS" }

{ "txn_id" :  2, "store_id" :  "STORE_15", "cust_id" :  "CUST_89", "amnt" :  122.65, "status" : "FAILED" }

{ "txn_id" :  3, "store_id" :  "STORE_43", "cust_id" :  "CUST_51", "amnt" :  56.98, "status" : "FAILED" }

{ "txn_id" :  4, "store_id" :  "STORE_1", "cust_id" :  "CUST_61", "amnt" :  35.00, "status" : "SUCCESS" }
Enter fullscreen mode Exit fullscreen mode

Create our query

Now that we have some data in our database and topic, let's create a topic to join the data:

select p.*, e.*
from kafka.`purchase-events` e
join salesdb.`purchase_order` p
on p.order_id = e.txn_id;
Enter fullscreen mode Exit fullscreen mode

If successful, you should now have realtime data being matched with what is in your database!

Joined Results

Next

Next article we will set up orchestration of all these events using another Apache open source project called Airflow.

Discussion (0)