DEV Community

Aki for AWS Community Builders

Posted on • Edited on

Building a CLI Tool to Visualize AWS Iceberg Table Snapshot History

Introduction

I'm Aki, an AWS Community Builder (@jitepengin).

Apache Iceberg is one of the emerging open table formats that has gained attention recently. It supports features such as schema evolution, ACID transactions, and time travel.

When managing tables based on snapshots, there are many scenarios in development, testing, and production where you want to track or manipulate snapshot histories.

As a learning project, I created a CLI tool called iceberg-navigator that lets you inspect snapshot history and details of Iceberg tables on AWS. In this article, I’ll introduce this tool.

Problems I Wanted to Solve

  • Easily check the snapshot history of Iceberg tables stored in S3 via the command line.
  • Trace snapshot parent-child relationships to understand version lineage.

Technologies and Libraries Used

  • PyIceberg: Python library to work with Iceberg tables
  • PyArrow: Dependency library for Iceberg schema handling (used indirectly)
  • Click: To build the CLI interface
  • NetworkX / Matplotlib: For visualizing snapshot parent-child relationships as a Directed Acyclic Graph (DAG)

Key Implementation Points

  • Use PyIceberg to access Iceberg catalog, tables, and snapshot metadata.
  • Connect to AWS Glue Iceberg REST endpoint via PyIceberg.
  • Build a simple CLI interface with Click.
  • Use NetworkX and Matplotlib to generate a snapshot lineage graph.

Source Code

The project is hosted here:

https://github.com/dataPenginPenguin/iceberg_navigator

How to Use the CLI Tool

AWS CLI Setup

Make sure you have configured AWS CLI with the proper credentials and region.

Install Required Libraries

pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode

List Snapshots

$ python -m iceberg_navigator list --table <dbname>.<tablename>
Enter fullscreen mode Exit fullscreen mode

Example output:

| Snapshot ID         | Timestamp            | Operation        | Parent Snapshot ID   |   Total Size (MB) |   Record Count |
|---------------------|----------------------|------------------|----------------------|-------------------|----------------|
| 1533347322559466931 | 2025-05-22T02:10:24Z | Operation.APPEND | null                 |             13.48 |        729,732 |
| 1485371543345582290 | 2025-05-22T02:10:54Z | Operation.DELETE | 1533347322559466931  |              0.00 |              0 |
| 67848960317145716   | 2025-05-22T02:15:45Z | Operation.APPEND | 1485371543345582290  |             13.48 |        729,732 |
| 3920289554540444894 | 2025-05-22T02:38:46Z | Operation.DELETE | 67848960317145716    |              0.00 |              0 |
| 6369576239134108166 | 2025-05-22T02:41:51Z | Operation.APPEND | 3920289554540444894  |             13.48 |        729,732 |
| 6216935665394419954 | 2025-05-22T02:41:54Z | Operation.APPEND | 6369576239134108166  |             26.96 |      1,459,464 |
| 9058990433822511495 | 2025-05-22T02:42:28Z | Operation.APPEND | 6216935665394419954  |             40.44 |      2,189,196 |
| 5224576979788468429 | 2025-05-22T02:46:53Z | Operation.DELETE | 9058990433822511495  |              0.00 |              0 |
| 8997131439115911397 | 2025-05-22T02:47:21Z | Operation.APPEND | 5224576979788468429  |             13.48 |        729,732 |
| 4246095293733855575 | 2025-08-02T22:51:16Z | Operation.DELETE | 8997131439115911397  |              0.00 |              0 |
| 8106328257365313720 | 2025-08-04T07:50:14Z | Operation.APPEND | 6369576239134108166  |             13.48 |        729,733 |
...
Enter fullscreen mode Exit fullscreen mode

Show Snapshot Details

$ python -m iceberg_navigator show <Snapshot ID> --table <dbname>.<tablename>

Enter fullscreen mode Exit fullscreen mode

Example output:

Table: yellow_tripdata

Snapshot ID: 8106328257365313720
Timestamp: 2025-08-04T07:50:14Z
Operation: Operation.APPEND
Parent Snapshot ID: 6369576239134108166
Manifest List: s3://your-bucket/warehouse/yellow_tripdata/metadata/snap-8106328257365313720-1-a4fb8059-7bf8-4254-b640-bf1fcbf100dd.avro

Schema:
  1: vendorid: optional int
  2: tpep_pickup_datetime: optional timestamp
  3: tpep_dropoff_datetime: optional timestamp
  4: passenger_count: optional long
  5: trip_distance: optional double
  6: ratecodeid: optional long
  7: store_and_fwd_flag: optional string
  8: pulocationid: optional int
  9: dolocationid: optional int
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double

Summary:
  added-data-files: 1
  total-equality-deletes: 0
  added-records: 1
  total-position-deletes: 0
  added-files-size: 3046
  total-delete-files: 0
  total-files-size: 14138545
  total-data-files: 2
  total-records: 729733
Enter fullscreen mode Exit fullscreen mode

Compare Snapshot

$ python -m iceberg_navigator compare <snapshot_id> --table <database>.<table>
Enter fullscreen mode Exit fullscreen mode

Example output:

----------------------------------------
Parent Snapshot
----------------------------------------
ID:         6369576239134108166
File Size:  13.48 MB
Records:    729,732

----------------------------------------
Current Snapshot
----------------------------------------
ID:         6216935665394419954
File Size:  26.96 MB
Records:    1,459,464

========================================
Summary
========================================
Added Records:   729,732
Deleted Records: 0
Enter fullscreen mode Exit fullscreen mode

Visualize Snapshot Lineage Graph

$ python -m iceberg_navigator graph --table <dbname>.<tablename>

DiGraph with 11 nodes and 10 edges
Snapshot graph saved to snapshot_graph.png

Enter fullscreen mode Exit fullscreen mode

Example output:

The graph is drawn using NetworkX + Matplotlib to show parent-child relationships as a DAG.

Project Directory Structure (Excerpt)

iceberg_navigator/
├── cli.py
├── __main__.py
├── aws/
│   ├── auth.py
│   └── glue.py
├── commands/
│   ├── compare.py
│   ├── list.py
│   ├── show.py
│   └── graph.py
├── utils/
│   └── display.py
Enter fullscreen mode Exit fullscreen mode

Implementation Overview

Entry Point (main.py)

Defines Click commands:

import click
from iceberg_navigator.commands.list import list_snapshots
from iceberg_navigator.commands.show import show_snapshot
from iceberg_navigator.commands.graph import graph_snapshots
from iceberg_navigator.commands.compare import compare_snapshots

@click.group()
def cli():
    """Iceberg Navigator CLI"""
    pass

cli.add_command(list_snapshots)
cli.add_command(show_snapshot)
cli.add_command(graph_snapshots)
cli.add_command(compare_snapshots)

if __name__ == "__main__":
    cli()

Enter fullscreen mode Exit fullscreen mode

Connecting to AWS Glue Iceberg Catalog(Glue.py)

Uses Glue REST Catalog API:

from urllib.parse import urlparse
from pyiceberg.catalog import load_catalog

class GlueCatalog:
    def __init__(self, profile_name=None, region_name=None, catalog_id="AwsDataCatalog"):
        import boto3
        if not region_name:
            session = boto3.Session(profile_name=profile_name)
            region_name = session.region_name
            if not region_name:
                raise ValueError("region_name Error")
        self.region_name = region_name
        self.catalog_id = catalog_id

        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.glue_client = session.client("glue", region_name=region_name)

    def _get_catalog(self):
        conf = {
            "type": "rest",
            "uri": f"https://glue.{self.region_name}.amazonaws.com/iceberg",
            "s3.region": self.region_name,
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "glue",
            "rest.signing-region": self.region_name,
        }
        return load_catalog(**conf)

    def get_table_location(self, table_identifier: str) -> str:
        database, table = table_identifier.split(".", 1)
        resp = self.glue_client.get_table(DatabaseName=database, Name=table)
        return resp["Table"]["Parameters"]["metadata_location"]

    def list_snapshots(self, table_identifier: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        snapshots = []
        for snap in table.snapshots():
            total_bytes = int(snap.summary.get("total-files-size", 0)) if snap.summary else 0
            total_records = int(snap.summary.get("total-records", 0)) if snap.summary else 0

            snapshots.append({
                "snapshot_id": str(snap.snapshot_id),
                "timestamp": snap.timestamp_ms,
                "operation": snap.summary.get("operation") if snap.summary else None,
                "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
                "total_size_mb": round((total_bytes) / (1024 * 1024), 2),
                "record_count": total_records
            })

        return snapshots

    def show_snapshot(self, table_identifier: str, snapshot_id: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        snap = table.snapshot_by_id(int(snapshot_id))
        if not snap:
            return {"error": f"snapshot_id {snapshot_id} not found"}

        schema_columns = []
        for idx, col in enumerate(table.schema().columns, start=1):
            requiredness = "optional" if col.optional else "required"
            schema_columns.append(f"{idx}: {col.name}: {requiredness} {col.field_type}")

        summary_dict = {}
        if snap.summary:
            summary_dict["operation"] = snap.summary.operation
            if hasattr(snap.summary, "additional_properties"):
                summary_dict.update(snap.summary.additional_properties)


        return {
            "table": table_name,
            "snapshot_id": str(snap.snapshot_id),
            "timestamp": snap.timestamp_ms,
            "operation": summary_dict.get("operation"),
            "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
            "manifest_list": snap.manifest_list,
            "schema": schema_columns,
            "summary": summary_dict,
        }

    def compare_snapshots(self, table_identifier: str, snapshot_id: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        current_snap = table.snapshot_by_id(int(snapshot_id))
        if not current_snap:
            return {"error": f"snapshot_id {snapshot_id} not found"}

        parent_snap = table.snapshot_by_id(int(current_snap.parent_snapshot_id))
        if not parent_snap:
            return {"error": f"parent_snapshot not found"}

        current_summary_dict = {}
        if current_snap.summary:
            current_summary_dict["operation"] = current_snap.summary.operation
            if hasattr(current_snap.summary, "additional_properties"):
                current_summary_dict.update(current_snap.summary.additional_properties)

        parent_summary_dict = {}
        if parent_snap.summary:
            parent_summary_dict["operation"] = parent_snap.summary.operation
            if hasattr(parent_snap.summary, "additional_properties"):
                parent_summary_dict.update(parent_snap.summary.additional_properties)


        current_size = int(current_snap.summary.get("total-files-size", 0))
        current_records = int(current_snap.summary.get("total-records", 0))

        parent_size = int(parent_snap.summary.get("total-files-size", 0))
        parent_records = int(parent_snap.summary.get("total-records", 0))

        added = current_records - parent_records if current_records > parent_records else 0
        deleted = parent_records - current_records if parent_records > current_records else 0

        return {
                "current_snapshot_id": str(current_snap.snapshot_id),
                "current_size": current_size,
                "current_records": current_records,
                "parent_snapshot_id": str(parent_snap.snapshot_id),
                "parent_size": parent_size,
                "parent_records": parent_records,
                "added": added,
                "deleted": deleted,
            }

Enter fullscreen mode Exit fullscreen mode

Snapshot List Command (list.py)

import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import format_snapshots_table

@click.command("list")
@click.option("--table", required=True, help="Table identifier, e.g. db.table")
def list_snapshots(table):

    glue = GlueCatalog()
    snapshots = glue.list_snapshots(table)
    if not snapshots:
        click.echo("No snapshots found.")
        return

    table_str = format_snapshots_table(snapshots)
    click.echo(table_str)

Enter fullscreen mode Exit fullscreen mode

Snapshot Show Command (show.py)

import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import show_snapshot_details

@click.command(name="show")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def show_snapshot(table, snapshot_id):
    glue_catalog = GlueCatalog()
    snapshot = glue_catalog.show_snapshot(table, snapshot_id)
    if snapshot is None or "error" in snapshot:
        click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
        return

    show_snapshot_details(snapshot)
Enter fullscreen mode Exit fullscreen mode

Compare Snapshot Command (compare.py)

import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import compare_snapshot

@click.command(name="compare")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def compare_snapshots(table, snapshot_id):
    glue_catalog = GlueCatalog()
    comparison_result = glue_catalog.compare_snapshots(table, snapshot_id)

    if comparison_result is None or "error" in comparison_result:
        click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
        return

    compare_snapshot(comparison_result)
Enter fullscreen mode Exit fullscreen mode

Snapshot Graph Command (graph.py)

import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import build_snapshot_graph, draw_graph

@click.command("graph")
@click.option("--table", required=True, help="Table name (e.g., db.table)")
@click.option("--output", default="snapshot_graph.png", help="Output image filename")
def graph_snapshots(table: str, output: str):
    glue_catalog = GlueCatalog()
    snapshots = glue_catalog.list_snapshots(table)
    if not snapshots:
        click.echo(f"No snapshots found for table {table}")
        return

    G = build_snapshot_graph(snapshots)
    draw_graph(G, output)
    click.echo(f"Snapshot graph saved to {output}")

if __name__ == "__main__":
    graph_snapshots()
Enter fullscreen mode Exit fullscreen mode

Catalog Access Patterns

PyIceberg supports multiple catalog implementations. In AWS environments, two main approaches are used:

  • RestCatalog: Access Iceberg metadata via Glue's Iceberg REST API

  • GlueCatalog: Use boto3 Glue client to fetch table info

According to official AWS docs and recent trends, using Glue’s REST endpoint via RestCatalog is the mainstream approach. This tool uses PyIceberg's RestCatalog access via Glue's Iceberg REST API, enabling standard and lightweight access.

For more details, check out my article comparing catalog access patterns:
https://zenn.dev/penginpenguin/articles/e44880aaa2d5e3

PyIceberg Limitations

While PyIceberg is a powerful Python tool for working with Iceberg metadata, it currently has some limitations:

  • Limited metadata operations like rollback
    Cannot restore snapshots or perform rollback directly.

  • Partial functionality via REST Catalog
    Glue's REST API is still evolving, so some Iceberg features may not be accessible (especially rollback-related).

  • Diff and snapshot operations require custom logic
    Users must implement logic for diffing or complex history operations themselves.

Iceberg Table Rollback on AWS

As noted above, rollback is not supported with PyIceberg. Athena, often considered for Iceberg querying, does not currently provide snapshot rollback capabilities either.

To perform rollbacks, you need to use Glue or EMR-based tooling.

This CLI tool focuses on snapshot viewing via Glue REST Catalog but has potential to be extended in the future into a full metadata management tool including rollback.

Conclusion

I introduced the iceberg-navigator CLI tool that allows you to inspect snapshot history and details of Apache Iceberg tables on AWS.

Snapshot history is crucial for understanding data change history and keeping rollback-ready states.

With this tool, you can easily retrieve and inspect snapshot information to assist development and debugging.

This is a personal learning project, and the tool is still evolving, but I hope it serves as a useful example of AWS Iceberg usage and PyIceberg application.

If you're interested, please try it out and feel free to share your feedback!

Top comments (0)