First, attach the Github website
LakeSoul:https://github.com/lakesoul-io/LakeSoul
1. Introduction
LakeSoul is a cloud-native Lakehouse framework developed by DMetaSoul team, and supports scalable metadata management, ACID transactions, efficient and flexible upsert operation, schema evolution, and unified streaming & batch processing. LakeSoul Flink CDC Sink supports the entire database synchronization from MySQL to LakeSoul, and can support automatic table creation, automatic schema change, exactly once semantics, etc.
This article will introduce the functional advantages of Flink CDC and the tutorial of using LakeSoul Flink CDC whole database synchronization. In this tutorial, we fully demonstrate synchronizing a MySQL database to LakeSoul, including automatic table creation, DDL changes and other operations.
2. Flink CDC
2.1 CDC Introduction
CDC stands for Change Data Capture, which monitors and captures incremental change records from the source database, synchronizing them to one or more data destinations (Sinks). The database changes that can be captured include INSERT, UPDATE, DELETE of data or data tables, and these changes are recorded in their entirety and written to the message middleware in the order in which they occur.
LakeSoul provides a set of independent CDC semantic expression specification, set a CDC Op column through the table attribute, which can indicate the operation type of each data, and will automatically merge according to the operation semantic in the subsequent Merge. You can import CDC data into LakeSoul after conversion by Debezium, Canal, Flink, etc.
2.2 Why use Flink CDC
Instead of using the Flink CDC, the LakeSoul framework initially transforms the CDC data into LakeSoul via Debezium:
The overall process can be divided into the following:
- link Mysql and kafka
- Create Debezium CDC synchronization task
- Use Spark Streaming, consume Kafka data and synchronize updates to LakeSoul
It can be seen that the processing link for importing CDC data into LakeSoul after Debezium conversion is relatively long, and there are also many components required. Debezium achieves CDC functionality through Kafka Streams, while LakeSoul currently uses the Flink CDC module, which can skip the transition between Debezium and Kafka and use Flink cdc connectors to directly subscribe to changes in upstream data sources.
The flink-cdc-connectors component developed by the Flink community is a source component that can read full and incremental change data directly from MySQL, PostgreSQL, and other databases. flink-cdc-connectors can capture all changes that occur in one or more tables. The schema usually has a pre-record and a post-record. flink-cdc-connectors can be used directly in Flink in unconstrained mode without using middleware such as kafka to transmit data.
Flink CDC can get the binlog directly from the database for downstream business calculation and analysis. In terms of internal implementation, flink-cdc-connectors component has a set of Debezium and Kafka components built-in, but this detail is shielded from users.
You can see that the CDC data is transformed by Flink and imported into LakeSoul as shown in the figure below, and the data is no longer synchronized by kafka, simplifying the overall architecture:
After the LakeSoul Flink job is started, the initialization phase starts by reading all tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, it first determines if it exists in LakeSoul, and if not, it automatically creates a LakeSoul table with the same Schema as the MySQL counterpart. After initialization, the CDC Streams of all tables are read and written to each LakeSoul table in an Upsert manner. If a DDL Schema change occurs to a MySQL table during synchronization, the change is also applied to the corresponding LakeSoul table. The following is a complete tutorial on how to use LakeSoul Flink CDC whole database synchronization.
3. LakeSoul Flink CDC Whole Database Synchronization
3.1 Prepare the environment
3.1.1 Start a local MySQL database
It is recommended to use the MySQL Docker image to quickly start a MySQL database instance:
cd docker/lakesoul-docker-compose-env/
docker run --name lakesoul-test-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test_cdc -p 3306:3306 -d mysql:8
3.1.2 Configuring LakeSoul Meta DB and Spark Environment
1. Start A Local PostgreSQL DB
The quickest way to start a pg DB is via docker container:
docker run -d --name lakesoul-test-pg -p5432:5432 -e POSTGRES_USER=lakesoul_test -e POSTGRES_PASSWORD=lakesoul_test -e POSTGRES_DB=lakesoul_test -d swr.cn-north-4.myhuaweicloud.com/dmetasoul-repo/postgres:14.5
Perform PG database initialization, enter the docker container, copy meta_init.sql
to the container, and you can check the container id with the docker ps
command:
docker exec -it container id /bin/bash
docker cp script/meta_init.sql container id:script/meta_init.sql
Execute in the Docker container:
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql
2. Install an Apache Spark environment
You could download spark distribution from https://spark.apache.org/downloads.html, and please choose spark version 3.3.0 or above. Note that the official package from Apache Spark does not include hadoop-cloud component. We provide a Spark package with Hadoop cloud dependencies, download it from https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-hadoop-3.3.5.tgz.
After unpacking spark package, you could find LakeSoul distribution jar from https://github.com/meta-soul/LakeSoul/releases. Download the jar file put it into jars
directory of your spark environment.
wget https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-hadoop-3.3.5.tgz
tar xf spark-3.3.2-bin-hadoop-3.3.5.tgz
export SPARK_HOME=${PWD}/spark-3.3.2-bin-dmetasoul
wget https://github.com/meta-soul/LakeSoul/releases/download/v2.2.0/lakesoul-spark-2.2.0-spark-3.3.jar -P $SPARK_HOME/jars
For production deployment on Hadoop, it's recommended to use spark release without bundled hadoop:https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-without-hadoop.tgz
Refer to https://spark.apache.org/docs/latest/hadoop-provided.html on how to setup hadoop classpath.
Since 2.1.0, LakeSoul package all its dependencies into one single jar via maven shade plugin. Before that all jars were packaged into one tar.gz file.
3. Adding PG database configuration for LakeSoul
By default, the pg database is connected to the local database, with the following configuration information:
lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver
lakesoul.pg.url=jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified
lakesoul.pg.username=lakesoul_test
lakesoul.pg.password=lakesoul_test
Customizing PG database configuration information requires adding an environment variable lakesoul_home
and importing configuration file information . If the PG database configuration information file path name is : /opt/soft/pg. property, then this environment variable needs to be added before the program starts:
export lakesoul_home=/opt/soft/pg.property
Users can customize the database configuration information here, so that the customized PG DB configuration information will take effect in Spark jobs.
4. Starting the Spark environment
Start a spark-sql
SQL interactive query command line environment:
$SPARK_HOME/bin/spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --conf spark.sql.warehouse.dir=/tmp/lakesoul --conf spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10
This command starts a Spark local job, adding two options:
- spark.sql.warehouse.dir=/tmp/lakesoul This parameter is set because the default table storage location in Spark SQL needs to be set to the same directory as the Flink job output directory.
- spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10 This parameter is set because LakeSoul caches metadata information in Spark, setting a smaller cache expiration time to facilitate querying the latest data.
After starting the Spark SQL command line, you can execute:
SHOW DATABASES;
SHOW TABLES IN default;
You can see that there is currently only one default
database in LakeSoul, and there are no tables in it.
3.1.3 Create a table in MySQL in advance and write data
- Install mycli
pip install mycli
- Start mycli and connect to the MySQL database
mycli mysql://root@localhost:3306/test_cdc -p root
- Create table and write data
CREATE TABLE mysql_test_1 (id INT PRIMARY KEY, name VARCHAR(255), type SMALLINT);
INSERT INTO mysql_test_1 VALUES (1, 'Bob', 10);
SELECT * FROM mysql_test_1;
3.2 Start the sync job
3.2.1 Start a local Flink Cluster
You can download Flink 1.14.5 from the Flink download page.
Unzip the downloaded Flink installation package:
tar xf flink-1.14.5-bin-scala_2.12.tgz
export FLINK_HOME=${PWD}/flink-1.14.5
Then start a local Flink Cluster:
$FLINK_HOME/bin/start-cluster.sh
You can open http://localhost:8081 to see if the Flink local cluster has started normally:
3.2.2 Submit LakeSoul Flink CDC Sink job
Submit a LakeSoul Flink CDC Sink job to the Flink cluster started above:
./bin/flink run -ys 1 -yjm 1G -ytm 2G \
-c org.apache.flink.lakesoul.entry.MysqlCdc \
lakesoul-flink-2.2.0-flink-1.14.jar \
--source_db.host localhost \
--source_db.port 3306 \
--source_db.db_name test_cdc \
--source_db.user root \
--source_db.password root \
--source.parallelism 1 \
--sink.parallelism 1 \
--warehouse_path file:/tmp/lakesoul \
--flink.checkpoint file:/tmp/flink/chk \
--flink.savepoint file:/tmp/flink/svp \
--job.checkpoint_interval 10000 \
--server_time_zone UTC
The jar package of lakesoul-flink can be downloaded from the Github Release page.
Refer to LakeSoul Flink CDC Synchronization of Entire MySQL Database for detailed usage of the Flink job.
On the http://localhost:8081 Flink job page, click Running Job to check whether the LakeSoul job is already in the Running
state.
You can click to enter the job page, and you should see that one data record has been synchronized:
3.2.3 Use Spark SQL to read the synchronized data in the LakeSoul table
Execute in Spark SQL Shell:
SHOW DATABASES;
SHOW TABLES IN test_cdc;
DESC test_cdc.mysql_test_1;
SELECT * FROM test_cdc.mysql_test_1;
You can see the running result of each statement, that is, a test_cdc
database is automatically created in LakeSoul, and a mysql_test_1
table is automatically created. The fields and primary keys of the table are the same as those of MySQL (one more rowKinds column)
3.2.4 Observe the synchronization situation after executing Update in MySQL
Perform the update in mycli:
UPDATE mysql_test_1 SET name='Peter' WHERE id=1;
Then read again in LakeSoul:
SELECT * FROM test_cdc.mysql_test_1;
You can see that the latest data has been read:
3.2.5 Observe the synchronization after executing DDL in MySQL, and read new and old data
Modify the structure of the table in mycli:
ALTER TABLE mysql_test_1 ADD COLUMN new_col FLOAT;
That is to add a new column at the end, the default is null. Verify the execution result in mycli:
At this point, the table structure has been synchronized in LakeSoul, and we can view the table structure in spark-sql:
DESC test_cdc.mysql_test_1;
At this time, read data from LakeSoul, and the new column is also null:
SELECT * FROM test_cdc.mysql_test_1;
Insert a new piece of data into MySQL:
INSERT INTO mysql_test_1 VALUES (2, 'Alice', 20, 9.9);
Read again from LakeSoul:
Delete a piece of data from MySQL:
delete from mysql_test_1 where id=1;
Read from LakeSoul:
You can see that LakeSoul reads the synchronized result every time, which is exactly the same as in MySQL.
3.2.6 Observe the synchronization after creating a new table in MySQL
Create a new table in MySQL with a different schema from the previous table:
CREATE TABLE mysql_test_2 (name VARCHAR(100) PRIMARY KEY, phone_no VARCHAR(20));
Create a new table in MySQL with a different schema from the previous table:
Insert a piece of data into a new MySQL table:
INSERT INTO mysql_test_2 VALUES ('Bob', '10010');
LakeSoul also successfully synchronized and read the data of the new table:
4. Conclusion
LakeSoul Flink CDC supports synchronizing whole databases from MySQL data sources to LakeSoul; Support automatic synchronization of schema changes (DDL) to LakeSoul; Support automatic perception of new tables in the upstream database during operation, and automatic table creation in LakeSoul; Support Exactly Once semantics, even if a Flink job fails, it can ensure that the data is not lost or heavy.
Top comments (0)