DEV Community

DMetaSoul
DMetaSoul

Posted on

Quick use of CDC: A new demo from lakesoul makes it easier to set up the environment

Change Data Capture (CDC) is a database-oriented technology used to Capture Data changes in a database, applied to Data synchronization, Data distribution, and Data collection. The former is offline, which can be queried through offline scheduling, and a table is synchronized to other systems to obtain the latest data through the query, which cannot guarantee the consistency and real-time performance of data. The data may be changed several times in the query process. [Lakesoul](https://github.com/meta-soul/LakeSoul)'s CDC technology belongs to the log-based CDC type, which can implement consumption logs to ensure data consistency and real-time.

A few days ago,** [Lakesoul](https://github.com/meta-soul/LakeSoul) uploaded a demo of them to GitHub.Add, delete, and change operations of relational databases such as Mysql and Oracle can be accessed into Lakesoul through CDC and stored in real-time. The process is as follows:Mysql->Debezium->Kafka->SparkStreaming->Lakesoul.After building a complete framework, the system can add, delete and modify data in real-time, and get the latest data when querying. **[Upsert](https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples) is required when using.

Let's see the demo below.Or check it out on [Lakesoul](https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium).

There are two ways of CDC ingestion for LakeSoul: 1) Write CDC stream into Kafka and use spark streaming to transform and write into LakeSoul (already supported); 2) Use Flink CDC to directly write into LakeSoul.

In this demo, Lakesoul team demonstrated the first way. They setup a MySQL instance, used scripts to generate DB modifications and used Debezium to sync them into Kafka, and then into LakeSoul.

1. Setup MySQL
1.1 Create database and table

Create database cdc;
CREATE TABLE test(
 id int primary key,
 rangeid int,
 value varchar(100) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Enter fullscreen mode Exit fullscreen mode

*2.2 Use cdc benchmark generator:
*

We provide a mysql data generator for testing and benchmarking cdc sync. The generator is located under diretory

examples/cdc_ingestion_debezium/MysqlBenchmark.
Enter fullscreen mode Exit fullscreen mode

1.Modify mysqlcdc.conf as needed

user=user name of mysql
 passwd=password of mysql
 host=host of mysql
 port=port of mysql
Enter fullscreen mode Exit fullscreen mode

2.Insert data into table

# Inside () are comments of parameters, remove them before execution
 bash MysqlCdcBenchmark.sh  insert  cdc(db name) test(table name) 10(lines to insert) 1(thread number)
Enter fullscreen mode Exit fullscreen mode

3.Update data into table

bash MysqlCdcBenchmark.sh  update  cdc test id(primary key) value(column to update) 10(lines to update) 
Enter fullscreen mode Exit fullscreen mode

4.Delete data from table
bash MysqlCdcBenchmark.sh delete cdc test 10(lines to delete)

2.Setup Kafka (Ignore this step if you already have Kafka running)
2.1 Install Kafka via K8s

kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
kubectl apply -f examples/kafka/kafka-persistent-single.yaml
Enter fullscreen mode Exit fullscreen mode

3.Setup Debezium (Ignore if you already have it)
3.1 Install Debezium

To quickly setup a running container of Debezium on K8s:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: dbz-pod-claim
spec:
  accessModes:
    - ReadWriteOnce
  # replace to actual StorageClass in your cluster
  storageClassName: 
  resources:
    requests:
      storage: 10Gi
---
apiVersion: v1
kind: Pod
metadata:
  name: dbz-pod
  namespace: dmetasoul
spec:
  restartPolicy: Never
  containers:
  - name: dbs
    image: debezium/connect:latest
    env:
      - name: BOOTSTRAP_SERVERS
        # replace to actual kafka host
        value: ${kafka_host}:9092
      - name: GROUP_ID
        value: "1"
      - name: CONFIG_STORAGE_TOPIC
        value: my_connect_configs
      - name: OFFSET_STORAGE_TOPIC
        value: my_connect_offsets
      - name: STATUS_STORAGE_TOPIC
        value: my_connect_statuses
    resources:
      requests:
        cpu: 500m
        memory: 4Gi
      limits:
        cpu: 4
        memory: 8Gi
    volumeMounts:
      - mountPath: "/kafka/data"
        name: dbz-pv-storage

  volumes:
    - name: dbz-pv-storage
      persistentVolumeClaim:
        claimName: dbz-pod-claim
Enter fullscreen mode Exit fullscreen mode

Then apply this yaml file:

kubectl apply -f pod.yaml
Enter fullscreen mode Exit fullscreen mode

3.2 Setup Debezium sync task

# remember to replace {dbzhost} to actual dbz deployment ip address
# replace database parameters accordingly
curl -X POST http://{dbzhost}:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
    "name": "cdc",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "tasks.max": "1",
        "database.hostname": "mysqlhost",
        "database.port": "mysqlport",
        "database.user": "mysqluser",
        "database.password": "mysqlpassword",
        "database.server.id": "1",
        "database.server.name": "cdcserver",
        "database.include.list": "cdc",
        "database.history.kafka.bootstrap.servers": "kafkahost:9092",
        "database.history.kafka.topic": "schema-changes.cdc",
        "decimal.handling.mode": "double",
        "table.include.list":"cdc.test" 
    }
}'
Enter fullscreen mode Exit fullscreen mode

Then check if sync task has been succcessfully created:

curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/
Enter fullscreen mode Exit fullscreen mode

You could delete sync task after testing finished:

curl -i  -X DELETE http://dbzhost:8083/connectors/cdc
Enter fullscreen mode Exit fullscreen mode

4.Start Spark Streaming Sink to LakeSoul
4.1 Setup

Please refer to Quick Start on how to setup LakeSoul and Spark environment.

4.2 Start Spark Shell
Spark shell needs to be started with kafka dependencies:

> ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.dmetasoul.lakesoul.meta.host=localhost --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.dmetasoul.lakesoul.meta.database.name=test_lakesoul_meta --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
Enter fullscreen mode Exit fullscreen mode

4.3 Create a LakeSoul Table
We'll create a LakeSoul table called MysqlCdcTest, which will sync with the MySQL table we just setup. The LakeSoul table also has a primary key id, and we need an extra field op to represent CDC ops and add a table property lakesoul_cdc_change_column with op field.
import com.dmetasoul.lakesoul.tables.LakeSoulTable

>val path="/opt/spark/cdctest"
>val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
>LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
> 5.4 Start spark streaming to sync Debezium CDC data into LakeSoul
> import com.dmetasoul.lakesoul.tables.LakeSoulTable
> val path="/opt/spark/cdctest"
> val lakeSoulTable = LakeSoulTable.forPath(path)
> var strList = List.empty[String]
> //js1 is just a fake data to help generate the schema
> val js1 = """{
>           |  "before": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "after": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "source": {
>           |    "version": "1.8.0.Final",
>           |    "connector": "mysql",
>           |    "name": "cdcserver",
>           |    "ts_ms": 1644461444000,
>           |    "snapshot": "false",
>           |    "db": "cdc",
>           |    "sequence": null,
>           |    "table": "sms",
>           |    "server_id": 529210004,
>           |    "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
>           |    "file": "binlog.000033",
>           |    "pos": 54831329,
>           |    "row": 0,
>           |    "thread": null,
>           |    "query": null
>           |  },
>           |  "op": "c",
>           |  "ts_ms": 1644461444777,
>           |  "transaction": null
>           |}""".stripMargin
> strList = strList :+ js1
> val rddData = spark.sparkContext.parallelize(strList)
> val resultDF = spark.read.json(rddData)
> val sche = resultDF.schema
> import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
> // Specify kafka settings
> val kfdf = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "kafkahost:9092")
>   .option("subscribe", "cdcserver.cdc.test")
>   .option("startingOffsets", "latest")
>   .load()
> // parse CDC json from debezium, and transform `op` field into one of 'insert', 'update', 'delete' into LakeSoul
> val kfdfdata = kfdf
>   .selectExpr("CAST(value AS STRING) as value")
>   .withColumn("payload", from_json($"value", sche))
>   .filter("value is not null")
>   .drop("value")
>   .select("payload.after", "payload.before", "payload.op")
>   .withColumn(
>     "op",
>     when($"op" === "c", "insert")
>       .when($"op" === "u", "update")
>       .when($"op" === "d", "delete")
>       .otherwise("unknown")
>   )
>   .withColumn(
>     "data",
>     when($"op" === "insert" || $"op" === "update", $"after")
>       .when($"op" === "delete", $"before")
>   )
>   .drop($"after")
>   .drop($"before")
>   .select("data.*", "op")
> // upsert into LakeSoul with microbatch
> kfdfdata.writeStream
>   .foreachBatch { (batchDF: DataFrame, _: Long) =>
>     {
>       lakeSoulTable.upsert(batchDF)
>       batchDF.show
>     }
>   }
>   .start()
>   .awaitTermination()
Enter fullscreen mode Exit fullscreen mode

4.5 Read from LakeSoul to view synchronized data:

import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()
Enter fullscreen mode Exit fullscreen mode

This is a very detailed demo to help quickly set up an environment using CDC. Next, I will compare open-source CDC solutions, such as Flink CDC, Lakesoul CDC, Debezium, DataX, Kettle, etc.

Top comments (0)