Building a Data Pipeline with DLT
In this example, a company provides IoT data through a near real-time API for monitoring and a historical API feed for analysis. Each serves a specific purpose, but you can use a single ETL (extraction/transform/loading)solution to manage data. APIs are a convenient way to access data from vehicles and devices. However, an endpoint can be oversubscribed. One method to scale is to ingest the data into a distributed database. This article shows how to ingest data from a REST API into DuckDB, a distributed database built for online analytic processing (OLAP) and historical data analysis.
Requirements
To build this solution, you will need:
- Python 3.10 or higher
- duckDB installed locally
Setup
It's good practice to create a dedicated environment for a Python project. Create a directory and an environment and activate the environment.
mkdir rest_iot
cd rest_iot
python3 -m venv venv
source venv/bin/activate
This example uses the data load tool
- dlt to consume and ingest near real-time data from the API. Dlt is a lightweight production-ready extract, transform, and load (ETL) Python library. To install dbt:
pip install -U dlt[duckdb]"
We can use dbt to create a new project.
dlt init iot duckdb
Dbt creates several project files, including a code example, requirements.txt, and a directory with configuration files.
iot_pipeline.py
requirements.txt
.dlt/
config.toml
secrets.toml
The requirements.txt
file contains a list of Python packages required for the project. Installing these packages is essential to ensure the project runs smoothly.
pip install -r requirements.txt
The final step is to configure the API key. Add your API key to the secrets.toml
file
rest_token = "rest_api_aaaaaaaaaaaaabbbbbbbbbbbbbbbbccccccccc"
Near Real-Time Data
We won't use the example code generated by dlt
. Open the file and delete the code. Copy and paste the following code.
import dlt
from dlt.sources.rest_api import rest_api_source
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
nrt_source = rest_api_source({
"client": {
"base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
"auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
"paginator": {
"type": "json_response",
"next_url_path": "paging.next",
},
},
"resources": [
{
"name" : "feed",
"endpoint" : {
"params": {
"types": "gps",
"decorations": "obdOdometerMeters"
},
}
},
],
})
pipeline = dlt.pipeline(
pipeline_name="feed_pipeline",
destination="duckdb",
dataset_name="iot",
progress="log",
)
load_info = pipeline.run(nrt_source)
The code defines a source, which includes the REST IoT endpoint and the API token. Within the source, the near real-time resource is the gps
, which is loaded into duckdb. Dlt handles pagination automatically, extracting data until there are no more pages.
The second part of the code creates a pipeline for extracting data. It defines duckdb as the destination and writes the data to the iot database.
To extract and load the data, run the following:
python3 iot_pipeline.py
The pipeline outputs the log to the console as the program runs. We can check the output by querying duckdb with the CLI.
duckdb
v1.1.3 19864453f7
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen a persistent database.
Open the database file in the duckdb CLI. You can use SQL to query the database.
.open iot.duckdb
SHOW TABLES;
SELECT * FROM feed;
SELECT * FROM feed_gps;
This example uses duckdb, but dlt
supports many destinations, including Clickhouse, Snowflake, Databricks, S3, and over 30 SQL databases. Developers can choose a data backend that meets their requirements.
Historical Data
Retrieving historical data follows the same pattern, i.e., define a source and create a pipeline.
params = {
"types": "engineStates",
"startTime": "2020-07-23T00:00:00Z",
"endTime": "2020-07-24T00:00:00Z"
}
historical_source = rest_api_source({
"client": {
"base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
"auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
"paginator": {
"type": "json_response",
"next_url_path": "paging.next",
},
},
"resources": [
{
"name" : "history",
"endpoint" : {
"params": params,
}
},
],
})
pipeline = dlt.pipeline(
pipeline_name="history_pipeline",
destination="duckdb",
dataset_name="iot",
progress="log",
)
load_info = pipeline.run(historical_source)
You can programmatically work with the data in duckdb, but analysts often use spreadsheets. Exporting the data to a CSV file is simple. In the duckdb session, use the COPY
command to write the data to a file.
COPY history TO 'history.csv' (HEADER, DELIMITER ',');
What will you build?
This article demonstrates how to work with near real-time and historical data using the dlt
package. Whether you need to scale data access across the enterprise or provide historical data for post-event analysis, you can use the same framework to provide customer data. In a future article, I'll demonstrate how to use dlt
with a workflow orchestrator such as Apache Airflow or Dagster.``
Top comments (0)