DEV Community

Apache Doris
Apache Doris

Posted on

Build a federated query solution with Apache Doris, Apache Flink, and Apache Hudi

The Multi-Catalog feature of Apache Doris is designed to facilitate integration with external data catalogs, enhancing Doris' capabilities in data lake analytics and federated data queries.

In older versions of Doris, user data is in a two-tiered structure: database and table. Thus, connections to external catalogs could only be done at the database or table level. For example, users could create a mapping to a table in an external catalog via create external table, or to a database via create external database . If there were large amounts of databases or tables in the external catalog, users would need to create mappings to them one by one, which could be a heavy workload.

With the advent of Multi-Catalog, Doris now has a new three-tiered metadata hierarchy (catalog -> database -> table), which means users can connect to external data at the catalog level. The currently supported external catalogs include:

  • Apache Hive
  • Apache Iceberg
  • Apache Hudi
  • Elasticsearch
  • JDBC
  • Apache Paimon(Incubating)

Multi-Catalog works as an additional and enhanced external table connection method. It helps users conduct multi-catalog federated queries quickly.

This tutorial will demonstrate how to build a real-time data lake and warehouse integrated federated query analysis using Flink + Hudi + Doris. This post primarily showcases how to use Doris and Hudi. Additionally, this entire tutorial environment is set up based on a pseudo-distributed environment.

Environment

The demonstration environment for this tutorial is as follows:

  • CentOS 7
  • Apache Doris 2.0.2
  • Hadoop 3.3.3
  • Hive 3.1.3
  • Flink 1.17.1
  • Apache Hudi 0.14
  • JDK 1.8.0_311

Installation

1.Download Flink 1.17.1:

Decompress and install: tar zxf flink-1.17.1-bin-scala_2.12.tgz

2.Download dependencies for Flink and Hudi:

wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/0.14.0/hudi-hive-sync-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.14.0/hudi-flink1.17-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.14.0/hudi-hadoop-mr-bundle-0.14.0.jar
Move these dependencies to the flink-1.17.1/lib directory and remove or replace the existing flink-table-planner-loader-1.17.1.jar.
Enter fullscreen mode Exit fullscreen mode

Download these dependencies to the flink-1.17.1/lib directory and delete or remove the existing flink-table-planner-loader-1.17.1.jar

Create and Write Data to Hudi Table

Start Flink:

bin/start-cluster.sh
Enter fullscreen mode Exit fullscreen mode

Start Flink client

./bin/sql-client.sh embedded shell
Enter fullscreen mode Exit fullscreen mode

Set the result mode to tableau for direct display of results

set sql-client.execution.result-mode=tableau;
Enter fullscreen mode Exit fullscreen mode

Start Hive MetaStore and HiveServer:

nohup ./bin/hive --service hiveserver2 >/dev/null 2>&1  &
nohup ./bin/hive --service metastore >/dev/null 2>&1  &
Enter fullscreen mode Exit fullscreen mode

Start Flink:

Create an Hudi table:

Use Hive MetaStore Service to store metadata of Hudi:

CREATE TABLE table1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
  'connector'='hudi',
  'path' = 'hdfs://localhost:9000/user/hive/warehouse/demo.db',
  'table.type'='COPY_ON_WRITE',       
  'hive_sync.enable'='true',           
  'hive_sync.table'='hudi_hive',        
  'hive_sync.db'='demo',            
  'hive_sync.mode' = 'hms',         
  'hive_sync.metastore.uris' = 'thrift://192.168.31.54:9083' 
);
Enter fullscreen mode Exit fullscreen mode
  1. 'table.type'='COPY_ON_WRITE', -- The MERGE_ON_READ method will not output to Hive until the Parquet file is generated.
  2. 'hive_sync.enable'='true', -- Required: Enable Hive synchronization.
  3. 'hive_sync.table'='${hive_table}', -- Required: Name of the newly created Hive table.
  4. 'hive_sync.db'='${hive_db}', -- Required: Name of the newly created Hive database.
  5. 'hive_sync.mode'='hms', -- Required: Set the Hive sync mode to HMS, default is JDBC.
  6. 'hive_sync.metastore.uris'='thrift://ip:9083' -- Required: Metastore port for Hive synchronization.

Write data to the Hudi table:

INSERT INTO table1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
Enter fullscreen mode Exit fullscreen mode

Flink SQL

Query the Hudi table using Flink:

SELECT * FROM TABLE1
Enter fullscreen mode Exit fullscreen mode

Table program finished

You can verify that the data file is present on HDFS, and in the Hive client, you can see the table:

hive> use demo;
OK
Time taken: 0.027 seconds
hive> show tables;
OK
hudi_hive
Enter fullscreen mode Exit fullscreen mode

Browse Directory

Doris on Hudi

Doris interacts with Hudi data in a straightforward way. You only need to create a catalog, but don't need to write a complete table creation statement, as was required before. Additionally, when tables or fields are added or removed in the Hudi data source, Doris can automatically detect changes through configuration or manual refresh of the catalog.

Now, let's create a catalog in Doris to access data from external Hudi tables:

CREATE CATALOG hudi PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://192.168.31.54:9083'
);
Enter fullscreen mode Exit fullscreen mode

Here, the metadata of Hudi is stored using HMS (Hive MetaStore). During creation, you only need to specify the two pieces of information above. If your HDFS is highly available, you should add NameNode HA information:

'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
Enter fullscreen mode Exit fullscreen mode

Refer to the Doris documentation for more details.

After successful creation, you can use the highlighted steps below to see Hudi tables.

Execute a query on the Hudi table:

switch hudi

Execute a query on the Hudi table.

Move data from the Hudi table to Doris:

First, create a Doris table:

CREATE TABLE doris_hudi(
  uuid VARCHAR(20) ,
  name VARCHAR(10),
  age INT,
  ts datetime(3),
  `partition` VARCHAR(20)
)
UNIQUE KEY(`uuid`)
DISTRIBUTED BY HASH(`uuid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
Enter fullscreen mode Exit fullscreen mode

Then, use the INSERT SELECT statement to migrate data from Hudi to Doris:

insert into doris_hudi select uuid,name,age,ts,partition from hudi.demo.hudi_hive;
Enter fullscreen mode Exit fullscreen mode

Query the Doris table:

mysql> select * from doris_hudi;
+------+---------+------+-------------------------+-----------+
| uuid | name    | age  | ts                      | partition |
+------+---------+------+-------------------------+-----------+
| id1  | Danny   |   23 | 1970-01-01 08:00:01.000 | par1      |
| id2  | Stephen |   33 | 1970-01-01 08:00:02.000 | par1      |
| id3  | Julian  |   53 | 1970-01-01 08:00:03.000 | par2      |
| id4  | Fabian  |   31 | 1970-01-01 08:00:04.000 | par2      |
| id5  | Sophia  |   18 | 1970-01-01 08:00:05.000 | par3      |
| id6  | Emma    |   20 | 1970-01-01 08:00:06.000 | par3      |
| id7  | Bob     |   44 | 1970-01-01 08:00:07.000 | par4      |
| id8  | Han     |   56 | 1970-01-01 08:00:08.000 | par4      |
+------+---------+------+-------------------------+-----------+
8 rows in set (0.02 sec)
Enter fullscreen mode Exit fullscreen mode

You can also use the CATS method to migrate Hudi data to Doris, where Doris automatically handles table creation:

create table doris_hudi_01
PROPERTIES("replication_num" = "1")  as  
select uuid,name,age,ts,`partition` from hudi.demo.hudi_hive;
Enter fullscreen mode Exit fullscreen mode

In conclusion, utilizing Doris for unified data lake and warehouse, along with its federated query capabilities, is a simple and efficient experience. Boost your data analysis performance now!

Top comments (0)