Introduction
I'm Aki, an AWS Community Builder (@jitepengin).
Apache Iceberg is one of the emerging open table formats that has gained attention recently. It supports features such as schema evolution, ACID transactions, and time travel.
When managing tables based on snapshots, there are many scenarios in development, testing, and production where you want to track or manipulate snapshot histories.
As a learning project, I created a CLI tool called iceberg-navigator that lets you inspect snapshot history and details of Iceberg tables on AWS. In this article, I’ll introduce this tool.
Problems I Wanted to Solve
- Easily check the snapshot history of Iceberg tables stored in S3 via the command line.
- Trace snapshot parent-child relationships to understand version lineage.
Technologies and Libraries Used
- PyIceberg: Python library to work with Iceberg tables
- PyArrow: Dependency library for Iceberg schema handling (used indirectly)
- Click: To build the CLI interface
- NetworkX / Matplotlib: For visualizing snapshot parent-child relationships as a Directed Acyclic Graph (DAG)
Key Implementation Points
- Use PyIceberg to access Iceberg catalog, tables, and snapshot metadata.
- Connect to AWS Glue Iceberg REST endpoint via PyIceberg.
- Build a simple CLI interface with Click.
- Use NetworkX and Matplotlib to generate a snapshot lineage graph.
Source Code
The project is hosted here:
https://github.com/dataPenginPenguin/iceberg_navigator
How to Use the CLI Tool
AWS CLI Setup
Make sure you have configured AWS CLI with the proper credentials and region.
Install Required Libraries
pip install -r requirements.txt
List Snapshots
$ python -m iceberg_navigator list --table <dbname>.<tablename>
Example output:
| Snapshot ID | Timestamp | Operation | Parent Snapshot ID | Total Size (MB) | Record Count |
|---------------------|----------------------|------------------|----------------------|-------------------|----------------|
| 1533347322559466931 | 2025-05-22T02:10:24Z | Operation.APPEND | null | 13.48 | 729,732 |
| 1485371543345582290 | 2025-05-22T02:10:54Z | Operation.DELETE | 1533347322559466931 | 0.00 | 0 |
| 67848960317145716 | 2025-05-22T02:15:45Z | Operation.APPEND | 1485371543345582290 | 13.48 | 729,732 |
| 3920289554540444894 | 2025-05-22T02:38:46Z | Operation.DELETE | 67848960317145716 | 0.00 | 0 |
| 6369576239134108166 | 2025-05-22T02:41:51Z | Operation.APPEND | 3920289554540444894 | 13.48 | 729,732 |
| 6216935665394419954 | 2025-05-22T02:41:54Z | Operation.APPEND | 6369576239134108166 | 26.96 | 1,459,464 |
| 9058990433822511495 | 2025-05-22T02:42:28Z | Operation.APPEND | 6216935665394419954 | 40.44 | 2,189,196 |
| 5224576979788468429 | 2025-05-22T02:46:53Z | Operation.DELETE | 9058990433822511495 | 0.00 | 0 |
| 8997131439115911397 | 2025-05-22T02:47:21Z | Operation.APPEND | 5224576979788468429 | 13.48 | 729,732 |
| 4246095293733855575 | 2025-08-02T22:51:16Z | Operation.DELETE | 8997131439115911397 | 0.00 | 0 |
| 8106328257365313720 | 2025-08-04T07:50:14Z | Operation.APPEND | 6369576239134108166 | 13.48 | 729,733 |
...
Show Snapshot Details
$ python -m iceberg_navigator show <Snapshot ID> --table <dbname>.<tablename>
Example output:
Table: yellow_tripdata
Snapshot ID: 8106328257365313720
Timestamp: 2025-08-04T07:50:14Z
Operation: Operation.APPEND
Parent Snapshot ID: 6369576239134108166
Manifest List: s3://your-bucket/warehouse/yellow_tripdata/metadata/snap-8106328257365313720-1-a4fb8059-7bf8-4254-b640-bf1fcbf100dd.avro
Schema:
1: vendorid: optional int
2: tpep_pickup_datetime: optional timestamp
3: tpep_dropoff_datetime: optional timestamp
4: passenger_count: optional long
5: trip_distance: optional double
6: ratecodeid: optional long
7: store_and_fwd_flag: optional string
8: pulocationid: optional int
9: dolocationid: optional int
10: payment_type: optional long
11: fare_amount: optional double
12: extra: optional double
13: mta_tax: optional double
14: tip_amount: optional double
15: tolls_amount: optional double
16: improvement_surcharge: optional double
17: total_amount: optional double
18: congestion_surcharge: optional double
19: airport_fee: optional double
Summary:
added-data-files: 1
total-equality-deletes: 0
added-records: 1
total-position-deletes: 0
added-files-size: 3046
total-delete-files: 0
total-files-size: 14138545
total-data-files: 2
total-records: 729733
Compare Snapshot
$ python -m iceberg_navigator compare <snapshot_id> --table <database>.<table>
Example output:
----------------------------------------
Parent Snapshot
----------------------------------------
ID: 6369576239134108166
File Size: 13.48 MB
Records: 729,732
----------------------------------------
Current Snapshot
----------------------------------------
ID: 6216935665394419954
File Size: 26.96 MB
Records: 1,459,464
========================================
Summary
========================================
Added Records: 729,732
Deleted Records: 0
Visualize Snapshot Lineage Graph
$ python -m iceberg_navigator graph --table <dbname>.<tablename>
DiGraph with 11 nodes and 10 edges
Snapshot graph saved to snapshot_graph.png
Example output:
The graph is drawn using NetworkX + Matplotlib to show parent-child relationships as a DAG.
Project Directory Structure (Excerpt)
iceberg_navigator/
├── cli.py
├── __main__.py
├── aws/
│ ├── auth.py
│ └── glue.py
├── commands/
│ ├── compare.py
│ ├── list.py
│ ├── show.py
│ └── graph.py
├── utils/
│ └── display.py
Implementation Overview
Entry Point (main.py)
Defines Click commands:
import click
from iceberg_navigator.commands.list import list_snapshots
from iceberg_navigator.commands.show import show_snapshot
from iceberg_navigator.commands.graph import graph_snapshots
from iceberg_navigator.commands.compare import compare_snapshots
@click.group()
def cli():
"""Iceberg Navigator CLI"""
pass
cli.add_command(list_snapshots)
cli.add_command(show_snapshot)
cli.add_command(graph_snapshots)
cli.add_command(compare_snapshots)
if __name__ == "__main__":
cli()
Connecting to AWS Glue Iceberg Catalog(Glue.py)
Uses Glue REST Catalog API:
from urllib.parse import urlparse
from pyiceberg.catalog import load_catalog
class GlueCatalog:
def __init__(self, profile_name=None, region_name=None, catalog_id="AwsDataCatalog"):
import boto3
if not region_name:
session = boto3.Session(profile_name=profile_name)
region_name = session.region_name
if not region_name:
raise ValueError("region_name Error")
self.region_name = region_name
self.catalog_id = catalog_id
session = boto3.Session(profile_name=profile_name, region_name=region_name)
self.glue_client = session.client("glue", region_name=region_name)
def _get_catalog(self):
conf = {
"type": "rest",
"uri": f"https://glue.{self.region_name}.amazonaws.com/iceberg",
"s3.region": self.region_name,
"rest.sigv4-enabled": "true",
"rest.signing-name": "glue",
"rest.signing-region": self.region_name,
}
return load_catalog(**conf)
def get_table_location(self, table_identifier: str) -> str:
database, table = table_identifier.split(".", 1)
resp = self.glue_client.get_table(DatabaseName=database, Name=table)
return resp["Table"]["Parameters"]["metadata_location"]
def list_snapshots(self, table_identifier: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
snapshots = []
for snap in table.snapshots():
total_bytes = int(snap.summary.get("total-files-size", 0)) if snap.summary else 0
total_records = int(snap.summary.get("total-records", 0)) if snap.summary else 0
snapshots.append({
"snapshot_id": str(snap.snapshot_id),
"timestamp": snap.timestamp_ms,
"operation": snap.summary.get("operation") if snap.summary else None,
"parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
"total_size_mb": round((total_bytes) / (1024 * 1024), 2),
"record_count": total_records
})
return snapshots
def show_snapshot(self, table_identifier: str, snapshot_id: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
snap = table.snapshot_by_id(int(snapshot_id))
if not snap:
return {"error": f"snapshot_id {snapshot_id} not found"}
schema_columns = []
for idx, col in enumerate(table.schema().columns, start=1):
requiredness = "optional" if col.optional else "required"
schema_columns.append(f"{idx}: {col.name}: {requiredness} {col.field_type}")
summary_dict = {}
if snap.summary:
summary_dict["operation"] = snap.summary.operation
if hasattr(snap.summary, "additional_properties"):
summary_dict.update(snap.summary.additional_properties)
return {
"table": table_name,
"snapshot_id": str(snap.snapshot_id),
"timestamp": snap.timestamp_ms,
"operation": summary_dict.get("operation"),
"parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
"manifest_list": snap.manifest_list,
"schema": schema_columns,
"summary": summary_dict,
}
def compare_snapshots(self, table_identifier: str, snapshot_id: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
current_snap = table.snapshot_by_id(int(snapshot_id))
if not current_snap:
return {"error": f"snapshot_id {snapshot_id} not found"}
parent_snap = table.snapshot_by_id(int(current_snap.parent_snapshot_id))
if not parent_snap:
return {"error": f"parent_snapshot not found"}
current_summary_dict = {}
if current_snap.summary:
current_summary_dict["operation"] = current_snap.summary.operation
if hasattr(current_snap.summary, "additional_properties"):
current_summary_dict.update(current_snap.summary.additional_properties)
parent_summary_dict = {}
if parent_snap.summary:
parent_summary_dict["operation"] = parent_snap.summary.operation
if hasattr(parent_snap.summary, "additional_properties"):
parent_summary_dict.update(parent_snap.summary.additional_properties)
current_size = int(current_snap.summary.get("total-files-size", 0))
current_records = int(current_snap.summary.get("total-records", 0))
parent_size = int(parent_snap.summary.get("total-files-size", 0))
parent_records = int(parent_snap.summary.get("total-records", 0))
added = current_records - parent_records if current_records > parent_records else 0
deleted = parent_records - current_records if parent_records > current_records else 0
return {
"current_snapshot_id": str(current_snap.snapshot_id),
"current_size": current_size,
"current_records": current_records,
"parent_snapshot_id": str(parent_snap.snapshot_id),
"parent_size": parent_size,
"parent_records": parent_records,
"added": added,
"deleted": deleted,
}
Snapshot List Command (list.py)
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import format_snapshots_table
@click.command("list")
@click.option("--table", required=True, help="Table identifier, e.g. db.table")
def list_snapshots(table):
glue = GlueCatalog()
snapshots = glue.list_snapshots(table)
if not snapshots:
click.echo("No snapshots found.")
return
table_str = format_snapshots_table(snapshots)
click.echo(table_str)
Snapshot Show Command (show.py)
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import show_snapshot_details
@click.command(name="show")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def show_snapshot(table, snapshot_id):
glue_catalog = GlueCatalog()
snapshot = glue_catalog.show_snapshot(table, snapshot_id)
if snapshot is None or "error" in snapshot:
click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
return
show_snapshot_details(snapshot)
Compare Snapshot Command (compare.py)
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import compare_snapshot
@click.command(name="compare")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def compare_snapshots(table, snapshot_id):
glue_catalog = GlueCatalog()
comparison_result = glue_catalog.compare_snapshots(table, snapshot_id)
if comparison_result is None or "error" in comparison_result:
click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
return
compare_snapshot(comparison_result)
Snapshot Graph Command (graph.py)
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import build_snapshot_graph, draw_graph
@click.command("graph")
@click.option("--table", required=True, help="Table name (e.g., db.table)")
@click.option("--output", default="snapshot_graph.png", help="Output image filename")
def graph_snapshots(table: str, output: str):
glue_catalog = GlueCatalog()
snapshots = glue_catalog.list_snapshots(table)
if not snapshots:
click.echo(f"No snapshots found for table {table}")
return
G = build_snapshot_graph(snapshots)
draw_graph(G, output)
click.echo(f"Snapshot graph saved to {output}")
if __name__ == "__main__":
graph_snapshots()
Catalog Access Patterns
PyIceberg supports multiple catalog implementations. In AWS environments, two main approaches are used:
RestCatalog: Access Iceberg metadata via Glue's Iceberg REST API
GlueCatalog: Use boto3 Glue client to fetch table info
According to official AWS docs and recent trends, using Glue’s REST endpoint via RestCatalog is the mainstream approach. This tool uses PyIceberg's RestCatalog access via Glue's Iceberg REST API, enabling standard and lightweight access.
For more details, check out my article comparing catalog access patterns:
https://zenn.dev/penginpenguin/articles/e44880aaa2d5e3
PyIceberg Limitations
While PyIceberg is a powerful Python tool for working with Iceberg metadata, it currently has some limitations:
Limited metadata operations like rollback
Cannot restore snapshots or perform rollback directly.Partial functionality via REST Catalog
Glue's REST API is still evolving, so some Iceberg features may not be accessible (especially rollback-related).Diff and snapshot operations require custom logic
Users must implement logic for diffing or complex history operations themselves.
Iceberg Table Rollback on AWS
As noted above, rollback is not supported with PyIceberg. Athena, often considered for Iceberg querying, does not currently provide snapshot rollback capabilities either.
To perform rollbacks, you need to use Glue or EMR-based tooling.
This CLI tool focuses on snapshot viewing via Glue REST Catalog but has potential to be extended in the future into a full metadata management tool including rollback.
Conclusion
I introduced the iceberg-navigator CLI tool that allows you to inspect snapshot history and details of Apache Iceberg tables on AWS.
Snapshot history is crucial for understanding data change history and keeping rollback-ready states.
With this tool, you can easily retrieve and inspect snapshot information to assist development and debugging.
This is a personal learning project, and the tool is still evolving, but I hope it serves as a useful example of AWS Iceberg usage and PyIceberg application.
If you're interested, please try it out and feel free to share your feedback!
Top comments (0)