DEV Community

Cover image for Data Engineering with DLT and REST
Sophia Parafina
Sophia Parafina

Posted on

Data Engineering with DLT and REST

Building a Data Pipeline with DLT

In this example, a company provides IoT data through a near real-time API for monitoring and a historical API feed for analysis. Each serves a specific purpose, but you can use a single ETL (extraction/transform/loading)solution to manage data. APIs are a convenient way to access data from vehicles and devices. However, an endpoint can be oversubscribed. One method to scale is to ingest the data into a distributed database. This article shows how to ingest data from a REST API into DuckDB, a distributed database built for online analytic processing (OLAP) and historical data analysis.

Requirements

To build this solution, you will need:

  • Python 3.10 or higher
  • duckDB installed locally

Setup

It's good practice to create a dedicated environment for a Python project. Create a directory and an environment and activate the environment.

mkdir rest_iot 
cd rest_iot
python3 -m venv venv
source venv/bin/activate
Enter fullscreen mode Exit fullscreen mode

This example uses the data load tool - dlt to consume and ingest near real-time data from the API. Dlt is a lightweight production-ready extract, transform, and load (ETL) Python library. To install dbt:

pip install -U dlt[duckdb]"
Enter fullscreen mode Exit fullscreen mode

We can use dbt to create a new project.

dlt init iot duckdb
Enter fullscreen mode Exit fullscreen mode

Dbt creates several project files, including a code example, requirements.txt, and a directory with configuration files.

iot_pipeline.py
requirements.txt
.dlt/
 config.toml
 secrets.toml
Enter fullscreen mode Exit fullscreen mode

The requirements.txt file contains a list of Python packages required for the project. Installing these packages is essential to ensure the project runs smoothly.

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

The final step is to configure the API key. Add your API key to the secrets.toml file

rest_token = "rest_api_aaaaaaaaaaaaabbbbbbbbbbbbbbbbccccccccc"
Enter fullscreen mode Exit fullscreen mode

Near Real-Time Data

We won't use the example code generated by dlt. Open the file and delete the code. Copy and paste the following code.

import dlt
from dlt.sources.rest_api import rest_api_source
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

nrt_source = rest_api_source({
    "client": {
        "base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
        "auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
        "paginator": {
            "type": "json_response",
            "next_url_path": "paging.next",
        },
    },
    "resources": [
        { 
            "name" : "feed",
            "endpoint" : {
                "params": {
                    "types": "gps",
                    "decorations": "obdOdometerMeters"
                },
            }
        },
    ],
})

pipeline = dlt.pipeline(
    pipeline_name="feed_pipeline",
    destination="duckdb",
    dataset_name="iot",
    progress="log",
)

load_info = pipeline.run(nrt_source)
Enter fullscreen mode Exit fullscreen mode

The code defines a source, which includes the REST IoT endpoint and the API token. Within the source, the near real-time resource is the gps, which is loaded into duckdb. Dlt handles pagination automatically, extracting data until there are no more pages.

The second part of the code creates a pipeline for extracting data. It defines duckdb as the destination and writes the data to the iot database.

To extract and load the data, run the following:

python3 iot_pipeline.py
Enter fullscreen mode Exit fullscreen mode

The pipeline outputs the log to the console as the program runs. We can check the output by querying duckdb with the CLI.

duckdb

v1.1.3 19864453f7
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen a persistent database.
Enter fullscreen mode Exit fullscreen mode

Open the database file in the duckdb CLI. You can use SQL to query the database.

.open iot.duckdb
SHOW TABLES;
Enter fullscreen mode Exit fullscreen mode

SHOW TABLES

SELECT * FROM feed;
Enter fullscreen mode Exit fullscreen mode

SELECT ALL

SELECT * FROM feed_gps;
Enter fullscreen mode Exit fullscreen mode

SELECT ALL FROM GPS TABLE

This example uses duckdb, but dlt supports many destinations, including Clickhouse, Snowflake, Databricks, S3, and over 30 SQL databases. Developers can choose a data backend that meets their requirements.

Historical Data

Retrieving historical data follows the same pattern, i.e., define a source and create a pipeline.

params = {
    "types": "engineStates",
    "startTime": "2020-07-23T00:00:00Z",
    "endTime": "2020-07-24T00:00:00Z"
}

historical_source = rest_api_source({
    "client": {
        "base_url": "https://api.rest-iot.com/fleet/vehicles/stats/",
        "auth": BearerTokenAuth(token=dlt.secrets["rest_token"]),
        "paginator": {
            "type": "json_response",
            "next_url_path": "paging.next",
        },
    },
    "resources": [
        { 
            "name" : "history",
            "endpoint" : {
                "params": params,
            }
        },
    ],
})

pipeline = dlt.pipeline(
    pipeline_name="history_pipeline",
    destination="duckdb",
    dataset_name="iot",
    progress="log",
)

load_info = pipeline.run(historical_source)
Enter fullscreen mode Exit fullscreen mode

You can programmatically work with the data in duckdb, but analysts often use spreadsheets. Exporting the data to a CSV file is simple. In the duckdb session, use the COPY command to write the data to a file.

COPY history TO 'history.csv' (HEADER, DELIMITER ',');
Enter fullscreen mode Exit fullscreen mode

What will you build?

This article demonstrates how to work with near real-time and historical data using the dlt package. Whether you need to scale data access across the enterprise or provide historical data for post-event analysis, you can use the same framework to provide customer data. In a future article, I'll demonstrate how to use dlt with a workflow orchestrator such as Apache Airflow or Dagster.``

Top comments (0)