Author: xiaojing
Last Updated: 2026-03-11
Overview
In this tutorial, you will learn how to use Apache Gravitino with Apache Flink to build a simple streaming pipeline. You will create a Hive catalog and a Paimon catalog in Gravitino, define a Kafka-backed generic table in the Hive catalog, and then use Flink SQL (through the Gravitino Flink connector) to read Kafka data and write it into a Paimon table.
What you'll accomplish:
- Configure the Gravitino Flink connector in Flink
- Create Hive and Paimon catalogs in Gravitino
- Define a Kafka generic table in the Hive catalog
- Stream data from Kafka to Paimon using Flink SQL
Architecture overview:
Prerequisites
System Requirements:
- Linux or macOS
- JDK 17+ (required for the Gravitino server; this tutorial assumes JDK 17 or later)
- Apache Flink 1.18 (recommended for the Gravitino Flink connector)
Required Components:
- Gravitino server v1.2.0 or later (this tutorial requires features introduced after v1.1.0; see 02-setup-guide/README.md)
- Hive Metastore (for the Hive catalog)
- Apache Kafka broker (for the Kafka source table)
Suggested Versions:
- Apache Paimon connector JAR that matches your Flink version
Before proceeding, verify your Java and Flink installation:
${JAVA_HOME}/bin/java -version
${FLINK_HOME}/bin/flink --version
Step-by-Step Guide
Step 1: Set environment variables
These values are used throughout the tutorial. Adjust them for your environment:
export GRAVITINO_URI="http://localhost:8090"
export METALAKE_NAME="default_metalake"
export HIVE_METASTORE_URI="thrift://localhost:9083"
export PAIMON_WAREHOUSE="file:///tmp/paimon-warehouse"
export KAFKA_BROKERS="localhost:9092"
Step 2: Create Hive and Paimon catalogs in Gravitino
Create a Hive catalog and a Paimon catalog using the Gravitino REST API.
If you need to pass Hive-specific configs (for example hive-conf-dir), set them in catalog properties with the flink.bypass. prefix (for example flink.bypass.hive-conf-dir), which are forwarded to the Flink Hive connector.
# Create Hive catalog
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json" -d '{
"name": "hive_catalog",
"type": "relational",
"comment": "Hive catalog for Flink streaming",
"provider": "hive",
"properties": {
"metastore.uris": "'"$HIVE_METASTORE_URI"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
# Create Paimon catalog (filesystem backend)
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
-H "Content-Type: application/json" -d '{
"name": "paimon_catalog",
"type": "relational",
"comment": "Paimon catalog for Flink streaming",
"provider": "lakehouse-paimon",
"properties": {
"catalog-backend": "filesystem",
"warehouse": "'"$PAIMON_WAREHOUSE"'"
}
}' ${GRAVITINO_URI}/api/metalakes/${METALAKE_NAME}/catalogs
Step 3: Install required JARs in Flink
Place the following JARs in FLINK_HOME/lib so Flink SQL can load them:
gravitino-flink-connector-runtime-1.18_2.12-<version>.jarpaimon-flink-1.18-<version>.jarflink-sql-connector-kafka-<version>.jar- Hive dependencies required by Flink HiveCatalog (same as Flink-Hive integration)
Tip: The Kafka SQL connector is not included in the Flink binary distribution and must be added separately.
Step 4: Configure Flink to use the Gravitino catalog store
Edit FLINK_HOME/conf/flink-conf.yaml and add (replace with your values):
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: ${METALAKE_NAME}
table.catalog-store.gravitino.gravitino.uri: ${GRAVITINO_URI}
Restart Flink if it is running, then make sure a Flink cluster is reachable:
${FLINK_HOME}/bin/start-cluster.sh
curl -sS http://localhost:8081/overview
If curl fails with connection refused, INSERT INTO ... SELECT ... in Step 7 will fail because SQL Client cannot submit jobs to the cluster.
Step 5: Create a Kafka generic table in the Hive catalog
Flink's HiveCatalog supports both Hive-compatible tables and generic tables. A table is generic by default in HiveCatalog unless you explicitly set 'connector' = 'hive' or use Hive dialect. Here we create a Kafka generic table so the metadata is stored in Hive Metastore, while the data is read from Kafka by Flink. If you want a Hive-compatible table, use Hive dialect or set 'connector' = 'hive'.
Start the Flink SQL client:
${FLINK_HOME}/bin/sql-client.sh
In the SQL client, run the following statements:
-- Use the Hive catalog managed by Gravitino
USE CATALOG hive_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
-- Kafka source table stored as a generic table in Hive catalog
CREATE TABLE kafka_events (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '${KAFKA_BROKERS}', -- replace with your Kafka brokers
'properties.group.id' = 'gravitino-flink-demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
Notes about generic tables:
- HiveCatalog supports Hive-compatible tables and generic tables. Hive-compatible tables are stored in a Hive-compatible way and can be queried from Hive.
- Generic tables are Flink-specific. Hive can see the metadata in Hive Metastore, but typically cannot interpret it, so querying from Hive is undefined behavior.
- If you want Hive-compatible tables with default dialect, set
'connector' = 'hive'. If you use Hive dialect, theconnectorproperty is not required. - In Gravitino, generic table schema and partition keys are stored in
flink.*properties in Hive Metastore. Ifconnector=hive, the table is treated as a Hive-compatible table with a native Hive schema.
Step 6: Create a Paimon sink table
USE CATALOG paimon_catalog;
CREATE DATABASE IF NOT EXISTS streaming_db;
USE streaming_db;
CREATE TABLE paimon_user_behavior (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
ts TIMESTAMP_LTZ(3)
);
Step 7: Stream data from Kafka to Paimon
SET 'execution.checkpointing.interval' = '10 s';
INSERT INTO paimon_catalog.streaming_db.paimon_user_behavior
SELECT user_id, item_id, behavior, ts
FROM hive_catalog.streaming_db.kafka_events;
If Kafka is receiving data in user_behavior, Flink will continuously write it to the Paimon table.
For streaming writes to Paimon, periodic checkpoints are required for commits.
Code Examples
Sample Kafka messages (JSON lines):
{"user_id": 1, "item_id": 1001, "behavior": "click"}
{"user_id": 2, "item_id": 1002, "behavior": "buy"}
Troubleshooting
-
Catalogs not visible in Flink: Verify
table.catalog-store.*settings inflink-conf.yamland that the Gravitino server is reachable. -
ClassNotFoundException: Ensure the Gravitino connector, Kafka connector, and Paimon JARs are present in
FLINK_HOME/lib. -
java.net.ConnectException: Connection refused when running
INSERT INTO: Flink SQL client cannot reach JobManager REST endpoint (defaultlocalhost:8081). Start cluster with${FLINK_HOME}/bin/start-cluster.shand verifycurl http://localhost:8081/overview. -
Job is RUNNING but no new rows in Paimon: Ensure checkpoints are enabled in streaming mode (for example
SET 'execution.checkpointing.interval' = '10 s';) and check checkpoint progress in Flink Web UI or/jobs/<job-id>/checkpoints. -
Job is RUNNING but expected records are skipped after rerun: Kafka offsets are tracked by
properties.group.id. Use a new group id (for examplegravitino-flink-demo-v2) when you want a fresh replay behavior. -
Table not found: Use fully qualified names like
hive_catalog.streaming_db.kafka_eventsandpaimon_catalog.streaming_db.paimon_user_behavior.
Congratulations
You have successfully completed the Gravitino Flink streaming tutorial!
You now have a fully functional Flink streaming environment with Gravitino integration, including:
- A configured Gravitino Flink connector for unified catalog access
- Hive and Paimon catalogs registered in Gravitino and accessible from Flink SQL
- A working streaming pipeline that reads from Kafka and writes to Paimon
- Understanding of generic tables vs Hive-compatible tables in HiveCatalog
Your Flink environment is now ready to leverage Gravitino for unified metadata management across your streaming data ecosystem.
Further Reading
For more advanced configurations and detailed documentation:
- Review the Gravitino Flink Connector Documentation for advanced configuration options
- Learn about Apache Flink SQL for more query patterns
- Explore Apache Paimon with Flink for Paimon-specific features
Next Steps
- Explore Iceberg catalogs with Gravitino in 03-iceberg-catalog/README.md
- Try query federation with Trino in 06-trino-query/README.md
- Follow and star Apache Gravitino Repository
Apache Gravitino is rapidly evolving, and this article is written based on the latest version 1.1.0. If you encounter issues, please refer to the official documentation or submit issues on GitHub.

Top comments (0)