DEV Community

Cover image for Introduction to Apache Iceberg using MinIO
Minwook Je
Minwook Je

Posted on

Introduction to Apache Iceberg using MinIO

Introduction

Apache Iceberg is one of three popular open table formats (OTF).

(Hudi, Uber) and (Delta Lake, Databricks)

In this post:

  1. Iceberg specification
  2. Docker Compose Hands-on
  3. Metadata

What is OTF?

Turn files into tables
Open Table Format is a specification for organizing a collection of files containing the same information such that they are presented as a single table.

Table
Implying is that we want all these files to be viewable and updateable as if they were a single entity - the table.

We can interact with this collection of files in the same way with a table in a database.

Various parties must implement this specification to produce usable software.

Apache Iceberg specification (3)

To implement the Apache Iceberg specification, we need three things:

  1. Catalog: keep track of all the metadata files
  2. Processing engine: e.g., query engine
  3. Scalable storage: object storage

The compute node that ties everything together.

Issue commands to the compute node for:

  • Creating tables
  • Inserting data into tables
  • Querying tables

Logical Diagram

  • The Rest catalog uses MinIO for storing metadata.

Iceberg's Data Architecture

  • Catalog: The processing engine connects to the Catalog to get a list of all tables. Each table, the catalog keeps track of all Metadata files.
  • Metadata file: Schema and partition and snapshots (previous schemas and partitions)
  • Manifest Lists: Snapshot itself. (so, s1 ..)
  • Manifest Files
    1. Points to one or more data files (parquets).
    2. important for efficient query execution.
    3. Column-level information
    4. e.g., max / min values for each Data File
  • Data Files: Typically Parquet format

Hands-on

source code

$ docker compose up
$ docker exec -it spark-iceberg spark-sql
Enter fullscreen mode Exit fullscreen mode

Create db

❯ docker exec -it spark-iceberg spark-sql

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/16 08:01:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/16 08:01:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark Web UI available at http://e5356708c7af:4041
Spark master: local[*], Application Id: local-1758009676374
spark-sql ()> CREATE DATABASE IF NOT EXISTS climate;
Time taken: 0.537 seconds
Enter fullscreen mode Exit fullscreen mode

Create table

spark-sql ()> CREATE TABLE IF NOT EXISTS climate.weather (
            >     datetime              timestamp,
            >     temp                  double,
            >     lat                   double,
            >     long                  double,
            >     cloud_coverage        string,
            >     precip                double,
            >     wind_speed            double
            > )
            > USING iceberg
            > PARTITIONED BY (days(datetime))
            > ;
Time taken: 0.911 seconds
Enter fullscreen mode Exit fullscreen mode


{
    "format-version": 2,
    "table-uuid": "195f0af4-6ff1-4ea1-8def-73c85fa2d483",
    "location": "s3://warehouse/climate/weather",
    "last-sequence-number": 0,
    "last-updated-ms": 1758009757603,
    "last-column-id": 7,
    "current-schema-id": 0,
    "schemas": [
        {
            "type": "struct",
            "schema-id": 0,
            "fields": [
                {
                    "id": 1,
                    "name": "datetime",
                    "required": false,
                    "type": "timestamptz"
                },
                {
                    "id": 2,
                    "name": "temp",
                    "required": false,
                    "type": "double"
                },
                {
                    "id": 3,
                    "name": "lat",
                    "required": false,
                    "type": "double"
                },
                {
                    "id": 4,
                    "name": "long",
                    "required": false,
                    "type": "double"
                },
                {
                    "id": 5,
                    "name": "cloud_coverage",
                    "required": false,
                    "type": "string"
                },
                {
                    "id": 6,
                    "name": "precip",
                    "required": false,
                    "type": "double"
                },
                {
                    "id": 7,
                    "name": "wind_speed",
                    "required": false,
                    "type": "double"
                }
            ]
        }
    ],
    "default-spec-id": 0,
    "partition-specs": [
        {
            "spec-id": 0,
            "fields": [
                {
                    "name": "datetime_day",
                    "transform": "day",
                    "source-id": 1,
                    "field-id": 1000
                }
            ]
        }
    ],
    "last-partition-id": 1000,
    "default-sort-order-id": 0,
    "sort-orders": [
        {
            "order-id": 0,
            "fields": []
        }
    ],
    "properties": {
        "owner": "root",
        "write.parquet.compression-codec": "zstd"
    },
    "current-snapshot-id": -1,
    "refs": {},
    "snapshots": [],
    "statistics": [],
    "partition-statistics": [],
    "snapshot-log": [],
    "metadata-log": []
}
Enter fullscreen mode Exit fullscreen mode

Adding Data

Let's add data from jupyter notebook (using pyspark)


from datetime import datetime

schema = spark.table("climate.weather").schema

data = [
    (datetime(2023,8,16), 76.2, 40.951908, -74.075272, "Partially sunny", 0.0, 3.5),
    (datetime(2023,8,17), 82.5, 40.951908, -74.075272, "Sunny", 0.0, 1.2),
    (datetime(2023,8,18), 70.9, 40.951908, -74.075272, "Cloudy", .5, 5.2)
  ]

df = spark.createDataFrame(data, schema)
df.writeTo("climate.weather").append()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)