DEV Community

Cover image for Realtime Analysis of Cryptocurrency Prices Using dbt, Materialize, Redpanda & Metabase
Nazli Ander
Nazli Ander

Posted on • Updated on

Realtime Analysis of Cryptocurrency Prices Using dbt, Materialize, Redpanda & Metabase

Materialize organized an Online Hack Day a while ago. And they provided a structured streaming setup using dbt (ETL framework), Redpanda (queue), Materialize (database) and Metabase (visualization).

The initial setup was using flight data with OpenSky API to aggregate flight information. I re-purposed the structured streaming setup to use cryptocurrency data in a real-time dashboard.

Without changing much of the setup, it was a great comfort to create a new producer via CoinRanking. Then I created two financial queries in dbt to answer the following questions in real-time:

  1. What is the difference of marketcap within the last 20 minutes per crypto currency?
  2. What are the most deviating cryptocurrencies within the last 20 minutes?
  3. What are the average prices per cryptocurrency within the last 20 minutes?

I connected the queries (materialized views in Materialize) to Metabase, to visualize my answers. Here is a screenshot from the resulting example dashboard in Metabase:

Metabase Dashboard

In this small memorial write-up, I will try to summarize the pipeline for this analysis. I aim to show the easiness of creating a real-time financial analysis with it.

Pipeline Summary

The pipeline that was created by Materialize had the following chronological order, and I re-purposed the same structure by changing the producer:

  1. Ingest real-time data with a Python producer into RedPanda
  2. Create a source from RedPanda in Materialize using dbt (raw data ingestion)
  3. Create a staging view in Materialize to type-cast JSON fields (staging ingested data)
  4. Create a materialized view(s) in Materialize to produce real-time windowed aggregations
  5. Use Metabase to visualize data The technologies relate to each other as the following diagram suggests:

Pipeline Setup Summary

The setup is initialized with a very diligent docker-compose file, created by @morsapaes. When we docker-compose up, we have the following services running on your local machine:

IMAGE                                               COMMAND                  PORTS                                                                NAMES
sample_project_data-generator                       "python -u ./crypto.…"                                                                        data-generator
sample_project_dbt                                  "/bin/bash"              0.0.0.0:8002->8080/tcp                                               dbt
metabase/metabase                                   "/app/run_metabase.sh"   0.0.0.0:3030->3000/tcp                                               metabase
docker.vectorized.io/vectorized/redpanda:v21.11.3   "/entrypoint.sh 'red…"   0.0.0.0:8081-8082->8081-8082/tcp, 0.0.0.0:9092->9092/tcp, 9644/tcp   redpanda
materialize/materialized:v0.20.0                    "tini -- materialize…"   0.0.0.0:6875->6875/tcp                                               materialized
Enter fullscreen mode Exit fullscreen mode

CoinRanking Producer in a Nutshell

To fetch real-time CoinRanking data into RedPanda, we need a small script. The script needs to read data from CoinRanking within periods. Then it needs to flush the read data into the RedPanda instance. Since RedPanda is Kafka compatible, that is quite straight forward with Python Kafka client.

Producer script basically reads data from the CoinRanking API, using a token. Then it picks up the required fields from the API data requested. Lastly it ingests the list of coin information into RedPanda within every 10 seconds. Here is the compact script:

import logging
import requests
import schedule
from kafka import KafkaProducer

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)-7s %(levelname)-1s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.StreamHandler(),
    ],
)
COIN_PAGE = "https://api.coinranking.com/v2/coins"
CRYPTO_TOPIC = "crypto"

TOKEN = os.getenv(
    "TOKEN",
    ".",
)
HEADERS = {"Authorization": f"access_token {TOKEN}"}
FREQUENCY_INGESTION = 10


def produce_list_of_coin_dict_into_queue(list_of_dict: list) -> None:
    producer = KafkaProducer(bootstrap_servers="redpanda:9092")
    for coin_with_model in list_of_dict:
        try:
            producer.send(
                topic=CRYPTO_TOPIC,
                value=dumps(coin_with_model).encode("utf-8"),
                key=uuid.uuid4().hex.encode("utf-8"),
            )
        except Exception as e:
            logging.error(f"The problem is: {e}!")
    producer.flush()


def get_json_api(page: str) -> tuple:
    get_request = requests.get(page, headers=HEADERS)
    assert get_request.status_code == 200, "Request not successful"
    return get_request.json(), get_request.status_code


def get_coin_model(coin: dict) -> dict:
    try:
        return {
            "uuid": coin.get("uuid"),
            "name": coin.get("name"),
            "symbol": coin.get("symbol"),
            "btc_price": coin.get("btcPrice"),
            "last_24h_volume": coin.get("24hVolume"),
            "marketcap": coin.get("marketCap"),
            "price": coin.get("price"),
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }
    except Exception as e:
        logging.error(f"Exception: {e}")
        return {}


def coins_producer() -> None:
    all_coins, _ = get_json_api(COIN_PAGE)
    coins_with_model = get_all_coins_with_model(all_coins)
    produce_list_of_coin_dict_into_queue(coins_with_model)


if __name__ == "__main__":
    coins_producer()
    schedule.every(FREQUENCY_INGESTION).seconds.do(coins_producer)
    while True:
        schedule.run_pending()
        time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

By using this producer, we regularly ingest crypto data into RedPanda with a topic called crypto. We expect to asynchronously consume the ingested data from the topic crypto and use in our analyses.

RedPanda Source in Materialize

The project uses dbt adapter for Materialize. The adapter (dbt-materialize) is a Python package available on PyPI. This package allows us to use SQL + Jinja statements to efficiently transform streaming data and continuously update our data.

According to Materialize, the connected data sources are called source. Creating a source in Materialize is possible by introducing a queue connection with a simple DDL statement.

Since we make use of dbt, this is as easy as using the following lines:

{{
    config(
        materialized='source',
        tags=['crypto']
    )
}}

{% set source_name %}
    {{ mz_generate_name('rc_coins') }}
{% endset %}

CREATE SOURCE {{ source_name }}
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'crypto'
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
ENVELOPE UPSERT;
Enter fullscreen mode Exit fullscreen mode

After defining the source, ingested data will be recognized by the database. However, we need to convert the string to the database encoding and JSON. This is required for column mapping, as our data is stored in JSON object literals. In the end we can map the column values by accessing and casting the object values:

{{
    config(
        materialized='view',
        tags=['crypto']
    )
}}

WITH converted_casted AS (
    SELECT 
        CAST(CONVERT_FROM(data, 'utf8') AS jsonb) AS data
    FROM {{ ref('rc_coins') }}
)
SELECT
    (data->>'uuid')::string as uuid,
    (data->>'name')::string as name,
    (data->>'symbol')::string as symbol,
    (data->>'btc_price')::numeric as btc_price,
    (data->>'last_24h_volume')::numeric as last_24h_volume,
    (data->>'marketcap')::numeric as marketcap,
    (data->>'price')::double as price,
    (data->>'timestamp')::timestamp as timestamp
FROM converted_casted
Enter fullscreen mode Exit fullscreen mode

Materialized Views in Materialize

One of the great features of Materialize is their materialized view. This is a streaming computation of a SELECT query in incrementally updated materialized views.

As in every structured streaming framework, to provide a valid analysis we can benefit from defining a set of assumptions for windowing and lateness of data (grace periods). Currently, Materialize makes use of mz_logical_timestamp() function to define windowing and grace periods of data. The mz_logical_timestamp() function represents the current timestamp in milliseconds at the time of the query execution.

The questions require us to keep track of the last 20 minutes of data. Theoretically we need to analyze data in sliding windows. Following the nice explanation in Materialize documentation, using the mz_logical_timestamp() we can compute the answering queries of our crypto analysis for the past 20 minutes.

{{
    config(
        materialized ='materializedview',
        tags=['crypto']
    )
}}

{# 20 mins #}
{% set slide_threshold = '1200000' %}

WITH with_casted_insertion AS (

    SELECT *, extract(epoch from timestamp) * 1000 AS inserted_at
    FROM {{ ref('stg_crypto') }}

)

SELECT * FROM with_casted_insertion
WHERE TRUE
    AND mz_logical_timestamp() >= inserted_at
    AND mz_logical_timestamp() < inserted_at + {{ slide_threshold }}
Enter fullscreen mode Exit fullscreen mode

In the producer we did not provide an insertion date in milliseconds. By using SQL statements we can convert the field timestamp into milliseconds and call this as inserted_at. This new field can be our benchmark for windowing.

The mz_logical_timestamp() and its usage is quite complicated. I suggest reading the nice documentation page from Materialize to see other use cases and their explanations.

On top of the materialized view created above we can write our analytics queries to expose in a dashboarding tool, such as Metabase. The analytics queries would use dbt functionalities as well, materialized as materializedview. You can find the resulting queries in the repository.

Resulting Lineage and Last Words

After obtaining the analytics queries, all we need to do is to run only once the crypto tagged models from dbt:

dbt run -m tag:crypto
Enter fullscreen mode Exit fullscreen mode

The output from dbt will be as:

19:01:19  Found 10 models, 0 tests, 0 snapshots, 0 analyses, 180 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
19:01:19
19:01:19  Concurrency: 1 threads (target='dev')
19:01:19
19:01:19  1 of 5 START source model public.rc_coins....................................... [RUN]
19:01:19  1 of 5 OK created source model public.rc_coins.................................. [CREATE SOURCE in 0.08s]
19:01:19  2 of 5 START view model public.stg_crypto....................................... [RUN]
19:01:19  2 of 5 OK created view model public.stg_crypto.................................. [CREATE VIEW in 0.05s]
19:01:19  3 of 5 START materializedview model public.fct_crypto_sliding_window............ [RUN]
19:01:19  3 of 5 OK created materializedview model public.fct_crypto_sliding_window....... [CREATE VIEW in 0.06s]
19:01:19  4 of 5 START materializedview model public.marketcap_changes.................... [RUN]
19:01:19  4 of 5 OK created materializedview model public.marketcap_changes............... [CREATE VIEW in 0.06s]
19:01:19  5 of 5 START materializedview model public.volatile_cryptos..................... [RUN]
19:01:20  5 of 5 OK created materializedview model public.volatile_cryptos................ [CREATE VIEW in 0.05s]
19:01:20
19:01:20  Finished running 1 source model, 1 view model, 3 materializedview models in 0.38s.
19:01:20
19:01:20  Completed successfully
19:01:20
19:01:20  Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
Enter fullscreen mode Exit fullscreen mode

The resulting dbt lineage diagram would be as follows:

dbt Lineage

Real-time analytics is used in getting immediate insights for financial data, news sources, social media reactions and healthcare. In most of the cases SQL is enough for obtaining basic insights in such systems. Thus, SQL dbt in combination with Materialize improve the efficiency of developing real-time analytics pipelines. The setup that was provided in the Hackday was a great example of this.

The biggest challenge after this point would be the maintenance of these pipelines. Luckily, there is sufficient information on integrating both RedPanda and Materialize with monitoring tools. As a next reading tip, I would nerdly suggest their documentation pages:

  1. RedPanda to Prometheus metrics
  2. Materialize monitoring documentation page

Top comments (1)

Collapse
 
lesterscramjetorg profile image
Lester @ Scramjet.org

Interesting. the question is whether it couldn't be easier.... in few minutes.
How about such a solution?
https://www.youtube.com/watch?v=BPLKPVVyHNY&list=PLxkW0N98bb3LJ97_ip8WuCKp0WYB2b3zb&index=3