DEV Community

Cover image for Creating your own data lake (MINIO+TRINO+GRAFANA)
Eric Kahindi
Eric Kahindi

Posted on

Creating your own data lake (MINIO+TRINO+GRAFANA)

Welcome, the full article might be a bit long so Ill break it into 3 parts
For this one, we'll focus on miniO (the backbone of your datalake)

Getting Started with MinIO: Your Private S3-Compatible Data Lake

MinIO is an open-source, high-performance object storage system that is fully compatible with the Amazon S3 API. Instead of storing data as files in directories, MinIO stores them as objects inside buckets. This makes it incredibly flexible and scalable, especially when working with large datasets.

MinIO can be the storage layer of a data lake.
On top of it, you connect tools like:

  • Apache Spark / Dask for big data processing
  • Presto/Trino / Athena for SQL queries
  • Grafana / Superset for visualization
  • Airflow for orchestration

The setup

Install it

wget https://dl.min.io/server/minio/release/linux-amd64/minio

Make it executable

chmod +x minio

Move the executable so that it can be run from anywhere

sudo mv minio /usr/local/bin

Export the users and passwords (start from here to restart your server once installed)

export MINIO_ROOT_USER=<your-user-name>

export MINIO_ROOT_PASSWORD=<your-password>

Start the server

minio server ~/minio-data --console-address ":9001"

One thing to note is, if you want to connect to minio, it exposes an api at "localhost:9000" that's why we use ":9001" for the server
Otherwise go to "localhost:9001" for your web ui

Then log in to your server to get a browser console like this

Add data to the server

In this case, we can create an Airflow DAG that will load the data for us.
I'm using Coingeko to get data on a few coins(get your api key first, add it to an env, then load it as I did)
Then, transforming the data using pandas to put it in the right format
Finally saving it into MiniO in parquet files

  • If you're not familiar with airflow, you can just create a separate file and run the "coin_vis_etl" function

  • Notice I'm using port 9000, not 9001

from airflow import DAG
from airflow.operators.python import PythonOperator 
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import requests 
import pandas as pd
import os
from dotenv import load_dotenv
from datetime import datetime


def coin_vis_etl():
    load_dotenv


    MY_API_KEY = os.getenv("MY_API_KEY")
    crypto_list = ['bitcoin', 'solana', 'ethereum', 'hyperliquid', 'binancecoin']

    for crypto in crypto_list:
        #load the data from the api 
        url = f"https://api.coingecko.com/api/v3/coins/{crypto}/market_chart?vs_currency=usd&days=30"
        headers = {
            'accepts':'application/json',
            'x-cg-demo-api-key': MY_API_KEY    
            }
        response = requests.get(url, headers=headers)

        #define custom metatdata 
        # use encode if youre adding a formated string instead of a b 
        custom_metadata ={
            b"source": b"coingecko API",
            b"coin_name": f"{crypto}".encode()
        }

        if response.status_code == 200:
            data = response.json()
            temp = pd.DataFrame(data['prices'], columns=[f"{crypto}_timestamps", f"{crypto}_prices"])

            # we can just add the columns since the time stamps are the same
            temp[f"{crypto}_market_caps"]= [x[1] for x in data["market_caps"]]
            temp[f"{crypto}_total_volumes"] = [x[1] for x in data["total_volumes"]]

            # change timestamps to the real ones 
            temp[f"{crypto}_timestamps"] = pd.to_datetime(temp[f"{crypto}_timestamps"], unit='ms')

            # use pyarrow to change them parquets 
            table = pa.Table.from_pandas(temp)

            # adding the metadata the metadata 
            existing_metadata = table.schema.metadata or {}
            new_metadata = {**custom_metadata, **existing_metadata}
            table = table.replace_schema_metadata(new_metadata)

            # finally write the data to the minio using pyarrow
            fs = pa.fs.S3FileSystem(
                access_key="eric",
                secret_key='eric1234',
                endpoint_override ="http://localhost:9000"
            )

            pq.write_table(
                table, 
                f"coin-vis-automated/coin_vis_{crypto}.parquet",
                filesystem=fs
            )
            print(f'{crypto} is done')

        else:
            print(f"API error at {crypto}, {response.content}")


with DAG(
    dag_id = 'coin_vis_etl',
    start_date = datetime(2025, 9, 15),
    schedule_interval= '@hourly',
    catchup= False

) as dag:
    runetl=PythonOperator(
        task_id = 'coin_vis_etl',
        python_callable = coin_vis_etl
    )

    runetl
Enter fullscreen mode Exit fullscreen mode

And there you go, you've just created your own S3-compatible object storage

Top comments (0)