Abstract
During the 1970s, the three-step process to Extract, Transform, and Load (ETL) became popular when working with data warehouses. As processing power has significantly improved and cloud-based systems have become more powerful and cost-effective, Extract, Load and Transform (ELT) has become possible. This article will demonstrate using SingleStoreDB with one popular open-source ELT tool, Airbyte.
Introduction
Airbyte provides source and destination connectors that enable the construction of pipelines between different systems. In this article, we'll use one of Airbyte's source connectors with SingleStoreDB Cloud and send the data to Apache Pulsar using a destination connector.
Create a SingleStoreDB Cloud account
A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Airbyte Demo Group as our Workspace Group Name and airbyte-demo as our Workspace Name. We'll make a note of our password and host name.
Create the database and table
We'll use a subset of the inventory system example from a previous article, as it provides a combination of both Relational and JSON data. In the SQL Editor in SingleStoreDB Cloud, we'll create a database and a table:
CREATE DATABASE e_store;
USE e_store;
CREATE TABLE products (
id INT UNSIGNED NOT NULL AUTO_INCREMENT,
name VARCHAR(250) NOT NULL,
brand_id INT UNSIGNED NOT NULL,
category_id INT UNSIGNED NOT NULL,
attributes JSON NOT NULL,
PRIMARY KEY(id),
INDEX CATEGORY_ID(category_id ASC),
INDEX BRAND_ID(brand_id ASC)
);
Populate the database table
Let's now populate the products table:
-- Televisions
INSERT INTO products (name, brand_id, category_id, attributes) VALUES
('Prime', '1', '1', '{"screen" : "50 inch", "resolution" : "2048 x 1152 pixels", "ports" : {"hdmi" : 1, "usb" : 3}, "speakers" : {"left" : "10 watt", "right" : "10 watt"}}'),
('Octoview', '1', '1', '{"screen" : "40 inch", "resolution" : "1920 x 1080 pixels", "ports" : {"hdmi" : 1, "usb" : 2}, "speakers" : {"left" : "10 watt", "right" : "10 watt"}}'),
('Dreamer', '1', '1', '{"screen" : "30 inch", "resolution" : "1600 x 900 pixels", "ports" : {"hdmi" : 1, "usb" : 1}, "speakers" : {"left" : "10 watt", "right" : "10 watt"}}'),
('Bravia', '1', '1', '{"screen" : "25 inch", "resolution" : "1366 x 768 pixels", "ports" : {"hdmi" : 1, "usb" : 0}, "speakers" : {"left" : "5 watt", "right" : "5 watt"}}'),
('Proton', '1', '1', '{"screen" : "20 inch", "resolution" : "1280 x 720 pixels", "ports" : {"hdmi" : 0, "usb" : 0}, "speakers" : {"left" : "5 watt", "right" : "5 watt"}}');
-- Mobile Phones
INSERT INTO products (name, brand_id, category_id, attributes) VALUES
('Desire', '2', '2', JSON_BUILD_OBJECT("network",
JSON_ARRAY_PUSH_STRING('["GSM", "CDMA", "HSPA"]', 'EVDO'),
"body",
"5.11 x 2.59 x 0.46 inches",
"weight",
"143 grams",
"sim",
"Micro-SIM",
"display",
"4.5 inches",
"resolution",
"720 x 1280 pixels",
"os",
"Android Jellybean v4.3"
)
),
('Passion', '2', '2', JSON_BUILD_OBJECT("network",
JSON_ARRAY_PUSH_STRING('["GSM", "CDMA"]', 'HSPA'),
"body",
"6.11 x 3.59 x 0.46 inches",
"weight",
"145 grams",
"sim",
"Micro-SIM",
"display",
"4.5 inches",
"resolution",
"720 x 1280 pixels",
"os",
"Android Jellybean v4.3"
)
),
('Emotion', '2', '2', JSON_BUILD_OBJECT("network" ,
JSON_ARRAY_PUSH_STRING('["GSM", "CDMA"]', 'EVDO'),
"body",
"5.50 x 2.50 x 0.50 inches",
"weight",
"125 grams",
"sim",
"Micro-SIM",
"display",
"5.00 inches",
"resolution",
"720 x 1280 pixels",
"os",
"Android KitKat v4.3"
)
),
('Sensation', '2', '2', JSON_BUILD_OBJECT("network",
JSON_ARRAY_PUSH_STRING('["GSM", "HSPA"]', 'EVDO'),
"body",
"4.00 x 2.00 x 0.75 inches",
"weight",
"150 grams",
"sim",
"Micro-SIM",
"display",
"3.5 inches",
"resolution",
"720 x 1280 pixels",
"os",
"Android Lollipop v4.3"
)
),
('Joy', '2', '2', JSON_BUILD_OBJECT("network",
JSON_ARRAY_PUSH_STRING('["CDMA", "HSPA"]', 'EVDO'),
"body",
"7.00 x 3.50 x 0.25 inches",
"weight",
"250 grams",
"sim",
"Micro-SIM",
"display",
"6.5 inches",
"resolution",
"1920 x 1080 pixels",
"os",
"Android Marshmallow v4.3"
)
);
-- Cameras
INSERT INTO products (name, brand_id, category_id, attributes) VALUES
('Explorer', '3', '3', '{"sensor_type" : "CMOS", "processor" : "Digic DV III", "scanning_system" : "progressive", "mount_type" : "PL", "monitor_type" : "LCD"}'),
('Runner', '3', '3', '{"sensor_type" : "CMOS", "processor" : "Digic DV II", "scanning_system" : "progressive", "mount_type" : "PL", "monitor_type" : "LED"}'),
('Traveler', '3', '3', '{"sensor_type" : "CMOS", "processor" : "Digic DV II", "scanning_system" : "progressive", "mount_type" : "PL", "monitor_type" : "LCD"}'),
('Walker', '3', '3', '{"sensor_type" : "CMOS", "processor" : "Digic DV I", "scanning_system" : "progressive", "mount_type" : "PL", "monitor_type" : "LED"}'),
('Jumper', '3', '3', '{"sensor_type" : "CMOS", "processor" : "Digic DV I", "scanning_system" : "progressive", "mount_type" : "PL", "monitor_type" : "LCD"}');
Install and configure Apache Pulsar
We'll install Pulsar using the instructions on the website:
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.2/apache-pulsar-2.10.2-bin.tar.gz
tar xvfz apache-pulsar-2.10.2-bin.tar.gz
cd apache-pulsar-2.10.2
Next, we'll start Pulsar standalone:
bin/pulsar standalone
From another terminal window, we'll run a consumer
, as follows:
bin/pulsar-client consume sample_topic -s "first-subscription" -n 0
The consumer will now wait for messages. We'll send messages to sample_topic
from Airbyte.
Install and configure Airbyte
We'll install Airbyte open-source using the instructions on the website:
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker-compose up
After the deployment is complete, we'll launch a web browser and enter http://localhost:8000
.
We'll be asked for a username and password. The defaults are airbyte
and password
, respectively.
Create Source
Once logged in, we'll select Sources
from the left-hand navigation pane and create a source.
We'll choose MySQL as the Source type on the New connection page.
On the Set up the source page, here is what we need to enter:
- Source type: MySQL
- Source name: SingleStoreDB
- Host: <host>
- Port: 3306
- Database: e_store
- Username: admin
- Password: <password>
- SSL Connection: Option is enabled
We'll replace the <host>
and <password>
with the values from our SingleStoreDB Cloud account.
Next, we'll click the Set up source button. The connection should be successful.
Create Destination
We'll choose Pulsar as the Destination type on the New connection page.
On the Set up the destination page, here is what we need to enter:
- Destination type: Pulsar
- Destination name: Pulsar
- Pulsar brokers: localhost:6650
- Test topic: test_topic
- Topic type: persistent
- Topic tenant: public
- Producer name: airbyte_producer
- Topic pattern: sample_topic
Next, we'll click the Set up destination button. The connection should be successful.
Create Connection
We'll now link our source to our destination.
We'll select SingleStoreDB and click the Use existing source button.
We'll select Pulsar and click the Use existing destination button.
For our initial test, we'll configure the connection as follows:
- Connection name: SingleStoreDB <> Pulsar
- Transfer > Replication frequency: Every 24 hours
- Streams > Destination Namespace: Mirror source structure
- Activate the streams you want to sync: Check (✔) the products table and select Full refresh | Append for sync mode
Next, we'll click the Set up connection button. Sync should start immediately.
In the terminal window where our consumer is running, we should see output similar to the following:
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"10a95b94-ca07-48c3-8655-04e1160684ca","_airbyte_data":"eyJpZCI6MiwibmFtZSI6Ik9jdG92aWV3IiwiYnJhbmRfaWQiOjEsImNhdGVnb3J5X2lkIjoxLCJhdHRyaWJ1dGVzIjoie1wicG9ydHNcIjp7XCJoZG1pXCI6MSxcInVzYlwiOjJ9LFwicmVzb2x1dGlvblwiOlwiMTkyMCB4IDEwODAgcGl4ZWxzXCIsXCJzY3JlZW5cIjpcIjQwIGluY2hcIixcInNwZWFrZXJzXCI6e1wibGVmdFwiOlwiMTAgd2F0dFwiLFwicmlnaHRcIjpcIjEwIHdhdHRcIn19In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"ac9a6d74-192f-4133-a77b-b2783a5a86a6","_airbyte_data":"eyJpZCI6NCwibmFtZSI6IkJyYXZpYSIsImJyYW5kX2lkIjoxLCJjYXRlZ29yeV9pZCI6MSwiYXR0cmlidXRlcyI6IntcInBvcnRzXCI6e1wiaGRtaVwiOjEsXCJ1c2JcIjowfSxcInJlc29sdXRpb25cIjpcIjEzNjYgeCA3NjggcGl4ZWxzXCIsXCJzY3JlZW5cIjpcIjI1IGluY2hcIixcInNwZWFrZXJzXCI6e1wibGVmdFwiOlwiNSB3YXR0XCIsXCJyaWdodFwiOlwiNSB3YXR0XCJ9fSJ9","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"51a5a79f-97a0-47e5-9a9c-55b36483d6c4","_airbyte_data":"eyJpZCI6NSwibmFtZSI6IlByb3RvbiIsImJyYW5kX2lkIjoxLCJjYXRlZ29yeV9pZCI6MSwiYXR0cmlidXRlcyI6IntcInBvcnRzXCI6e1wiaGRtaVwiOjAsXCJ1c2JcIjowfSxcInJlc29sdXRpb25cIjpcIjEyODAgeCA3MjAgcGl4ZWxzXCIsXCJzY3JlZW5cIjpcIjIwIGluY2hcIixcInNwZWFrZXJzXCI6e1wibGVmdFwiOlwiNSB3YXR0XCIsXCJyaWdodFwiOlwiNSB3YXR0XCJ9fSJ9","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"745fc969-3d29-4866-973b-db7fde3ad9f4","_airbyte_data":"eyJpZCI6NywibmFtZSI6IlBhc3Npb24iLCJicmFuZF9pZCI6MiwiY2F0ZWdvcnlfaWQiOjIsImF0dHJpYnV0ZXMiOiJ7XCJib2R5XCI6XCI2LjExIHggMy41OSB4IDAuNDYgaW5jaGVzXCIsXCJkaXNwbGF5XCI6XCI0LjUgaW5jaGVzXCIsXCJuZXR3b3JrXCI6W1wiR1NNXCIsXCJDRE1BXCIsXCJIU1BBXCJdLFwib3NcIjpcIkFuZHJvaWQgSmVsbHliZWFuIHY0LjNcIixcInJlc29sdXRpb25cIjpcIjcyMCB4IDEyODAgcGl4ZWxzXCIsXCJzaW1cIjpcIk1pY3JvLVNJTVwiLFwid2VpZ2h0XCI6XCIxNDUgZ3JhbXNcIn0ifQ==","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"7c64455b-0c5f-4292-a4bc-6ea29dd209f8","_airbyte_data":"eyJpZCI6OCwibmFtZSI6IkVtb3Rpb24iLCJicmFuZF9pZCI6MiwiY2F0ZWdvcnlfaWQiOjIsImF0dHJpYnV0ZXMiOiJ7XCJib2R5XCI6XCI1LjUwIHggMi41MCB4IDAuNTAgaW5jaGVzXCIsXCJkaXNwbGF5XCI6XCI1LjAwIGluY2hlc1wiLFwibmV0d29ya1wiOltcIkdTTVwiLFwiQ0RNQVwiLFwiRVZET1wiXSxcIm9zXCI6XCJBbmRyb2lkIEtpdEthdCB2NC4zXCIsXCJyZXNvbHV0aW9uXCI6XCI3MjAgeCAxMjgwIHBpeGVsc1wiLFwic2ltXCI6XCJNaWNyby1TSU1cIixcIndlaWdodFwiOlwiMTI1IGdyYW1zXCJ9In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"2c3ce9ba-56a3-4cb5-90d9-035b79b2c06f","_airbyte_data":"eyJpZCI6MTAsIm5hbWUiOiJKb3kiLCJicmFuZF9pZCI6MiwiY2F0ZWdvcnlfaWQiOjIsImF0dHJpYnV0ZXMiOiJ7XCJib2R5XCI6XCI3LjAwIHggMy41MCB4IDAuMjUgaW5jaGVzXCIsXCJkaXNwbGF5XCI6XCI2LjUgaW5jaGVzXCIsXCJuZXR3b3JrXCI6W1wiQ0RNQVwiLFwiSFNQQVwiLFwiRVZET1wiXSxcIm9zXCI6XCJBbmRyb2lkIE1hcnNobWFsbG93IHY0LjNcIixcInJlc29sdXRpb25cIjpcIjE5MjAgeCAxMDgwIHBpeGVsc1wiLFwic2ltXCI6XCJNaWNyby1TSU1cIixcIndlaWdodFwiOlwiMjUwIGdyYW1zXCJ9In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"1e20dd1d-a390-4f3f-96ff-dacc70f4ae41","_airbyte_data":"eyJpZCI6MTIsIm5hbWUiOiJSdW5uZXIiLCJicmFuZF9pZCI6MywiY2F0ZWdvcnlfaWQiOjMsImF0dHJpYnV0ZXMiOiJ7XCJtb25pdG9yX3R5cGVcIjpcIkxFRFwiLFwibW91bnRfdHlwZVwiOlwiUExcIixcInByb2Nlc3NvclwiOlwiRGlnaWMgRFYgSUlcIixcInNjYW5uaW5nX3N5c3RlbVwiOlwicHJvZ3Jlc3NpdmVcIixcInNlbnNvcl90eXBlXCI6XCJDTU9TXCJ9In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"b225a1f3-c31f-4ee8-9dd9-922b36d7be96","_airbyte_data":"eyJpZCI6MTUsIm5hbWUiOiJKdW1wZXIiLCJicmFuZF9pZCI6MywiY2F0ZWdvcnlfaWQiOjMsImF0dHJpYnV0ZXMiOiJ7XCJtb25pdG9yX3R5cGVcIjpcIkxDRFwiLFwibW91bnRfdHlwZVwiOlwiUExcIixcInByb2Nlc3NvclwiOlwiRGlnaWMgRFYgSVwiLFwic2Nhbm5pbmdfc3lzdGVtXCI6XCJwcm9ncmVzc2l2ZVwiLFwic2Vuc29yX3R5cGVcIjpcIkNNT1NcIn0ifQ==","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"421a202e-6309-473d-b376-16736e6b733d","_airbyte_data":"eyJpZCI6MSwibmFtZSI6IlByaW1lIiwiYnJhbmRfaWQiOjEsImNhdGVnb3J5X2lkIjoxLCJhdHRyaWJ1dGVzIjoie1wicG9ydHNcIjp7XCJoZG1pXCI6MSxcInVzYlwiOjN9LFwicmVzb2x1dGlvblwiOlwiMjA0OCB4IDExNTIgcGl4ZWxzXCIsXCJzY3JlZW5cIjpcIjUwIGluY2hcIixcInNwZWFrZXJzXCI6e1wibGVmdFwiOlwiMTAgd2F0dFwiLFwicmlnaHRcIjpcIjEwIHdhdHRcIn19In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"ed03e9bf-907f-4a1f-b051-6615b3ebeb3d","_airbyte_data":"eyJpZCI6MywibmFtZSI6IkRyZWFtZXIiLCJicmFuZF9pZCI6MSwiY2F0ZWdvcnlfaWQiOjEsImF0dHJpYnV0ZXMiOiJ7XCJwb3J0c1wiOntcImhkbWlcIjoxLFwidXNiXCI6MX0sXCJyZXNvbHV0aW9uXCI6XCIxNjAwIHggOTAwIHBpeGVsc1wiLFwic2NyZWVuXCI6XCIzMCBpbmNoXCIsXCJzcGVha2Vyc1wiOntcImxlZnRcIjpcIjEwIHdhdHRcIixcInJpZ2h0XCI6XCIxMCB3YXR0XCJ9fSJ9","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"e371792a-79da-4ae1-a644-271c4f0e8b8c","_airbyte_data":"eyJpZCI6NiwibmFtZSI6IkRlc2lyZSIsImJyYW5kX2lkIjoyLCJjYXRlZ29yeV9pZCI6MiwiYXR0cmlidXRlcyI6IntcImJvZHlcIjpcIjUuMTEgeCAyLjU5IHggMC40NiBpbmNoZXNcIixcImRpc3BsYXlcIjpcIjQuNSBpbmNoZXNcIixcIm5ldHdvcmtcIjpbXCJHU01cIixcIkNETUFcIixcIkhTUEFcIixcIkVWRE9cIl0sXCJvc1wiOlwiQW5kcm9pZCBKZWxseWJlYW4gdjQuM1wiLFwicmVzb2x1dGlvblwiOlwiNzIwIHggMTI4MCBwaXhlbHNcIixcInNpbVwiOlwiTWljcm8tU0lNXCIsXCJ3ZWlnaHRcIjpcIjE0MyBncmFtc1wifSJ9","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"ffd5ac6c-15cb-4110-9bd0-55125b9528f7","_airbyte_data":"eyJpZCI6OSwibmFtZSI6IlNlbnNhdGlvbiIsImJyYW5kX2lkIjoyLCJjYXRlZ29yeV9pZCI6MiwiYXR0cmlidXRlcyI6IntcImJvZHlcIjpcIjQuMDAgeCAyLjAwIHggMC43NSBpbmNoZXNcIixcImRpc3BsYXlcIjpcIjMuNSBpbmNoZXNcIixcIm5ldHdvcmtcIjpbXCJHU01cIixcIkhTUEFcIixcIkVWRE9cIl0sXCJvc1wiOlwiQW5kcm9pZCBMb2xsaXBvcCB2NC4zXCIsXCJyZXNvbHV0aW9uXCI6XCI3MjAgeCAxMjgwIHBpeGVsc1wiLFwic2ltXCI6XCJNaWNyby1TSU1cIixcIndlaWdodFwiOlwiMTUwIGdyYW1zXCJ9In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"769cffc1-6c64-4440-82df-070c45b90990","_airbyte_data":"eyJpZCI6MTEsIm5hbWUiOiJFeHBsb3JlciIsImJyYW5kX2lkIjozLCJjYXRlZ29yeV9pZCI6MywiYXR0cmlidXRlcyI6IntcIm1vbml0b3JfdHlwZVwiOlwiTENEXCIsXCJtb3VudF90eXBlXCI6XCJQTFwiLFwicHJvY2Vzc29yXCI6XCJEaWdpYyBEViBJSUlcIixcInNjYW5uaW5nX3N5c3RlbVwiOlwicHJvZ3Jlc3NpdmVcIixcInNlbnNvcl90eXBlXCI6XCJDTU9TXCJ9In0=","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"874f205c-018a-464c-a6fa-8a6dcfafd4d7","_airbyte_data":"eyJpZCI6MTMsIm5hbWUiOiJUcmF2ZWxlciIsImJyYW5kX2lkIjozLCJjYXRlZ29yeV9pZCI6MywiYXR0cmlidXRlcyI6IntcIm1vbml0b3JfdHlwZVwiOlwiTENEXCIsXCJtb3VudF90eXBlXCI6XCJQTFwiLFwicHJvY2Vzc29yXCI6XCJEaWdpYyBEViBJSVwiLFwic2Nhbm5pbmdfc3lzdGVtXCI6XCJwcm9ncmVzc2l2ZVwiLFwic2Vuc29yX3R5cGVcIjpcIkNNT1NcIn0ifQ==","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
----- got message -----
key:[null], properties:[], content:{"_airbyte_ab_id":"1929e8b9-c4ac-4ef6-875e-76f44d1640a2","_airbyte_data":"eyJpZCI6MTQsIm5hbWUiOiJXYWxrZXIiLCJicmFuZF9pZCI6MywiY2F0ZWdvcnlfaWQiOjMsImF0dHJpYnV0ZXMiOiJ7XCJtb25pdG9yX3R5cGVcIjpcIkxFRFwiLFwibW91bnRfdHlwZVwiOlwiUExcIixcInByb2Nlc3NvclwiOlwiRGlnaWMgRFYgSVwiLFwic2Nhbm5pbmdfc3lzdGVtXCI6XCJwcm9ncmVzc2l2ZVwiLFwic2Vuc29yX3R5cGVcIjpcIkNNT1NcIn0ifQ==","_airbyte_stream":"products","_airbyte_emitted_at":1666989683549}
2022-10-28T21:41:37,441+0100 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [sample_topic] [first-subscription] [ce6e3] Prefetched messages: 0 --- Consume throughput received: 0.25 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.25 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
There are 15 messages, one for each row of the products table. The data are encoded into base64
and if we take the first message above and decode it, we'll get the following:
{"id":2,"name":"Octoview","brand_id":1,"category_id":1,"attributes":"{\"ports\":{\"hdmi\":1,\"usb\":2},\"resolution\":\"1920 x 1080 pixels\",\"screen\":\"40 inch\",\"speakers\":{\"left\":\"10 watt\",\"right\":\"10 watt\"}}"}
The data matches one of the rows that we previously stored.
Summary
In this article, we have seen that we can easily use SingleStoreDB as a source in Airbyte. We successfully replicated the data from a SingleStoreDB Cloud database to Apache Pulsar. Further tests could include other destinations available in Airbyte.
Top comments (0)