I. Introduction to Apache Doris
Apache Doris is a high-performance, real-time analytical database based on the MPP architecture. Its overall architecture is streamlined, consisting of only two system modules: FE and BE. The FE (Frontend) mainly handles request reception, query parsing, metadata management, and task scheduling, while the BE (Backend) mainly handles query execution and data storage. Apache Doris supports standard SQL and is fully compatible with the MySQL protocol, allowing databases stored in Apache Doris to be accessed through various client tools and BI software that support the MySQL protocol.
In typical data integration and processing pipelines, data sources such as TP databases, user behavior logs, time-series data, and local files are often collected. These data sources are processed by data integration tools or ETL tools and then written into the real-time data warehouse, Apache Doris. Doris provides querying and analysis capabilities for downstream data applications, covering typical scenarios such as BI report analysis, OLAP multi-dimensional analysis, Ad-hoc queries, and log search and analysis.
Flink-Doris-Connector is an integration solution between Apache Doris and Apache Flink for real-time data processing ETL scenarios. It leverages Flink's real-time computing capabilities to build an efficient data processing and analysis pipeline. The main use cases for Flink-Doris-Connector are divided into three types:
Scan: Typically used for data synchronization or joint analysis with other data sources.
Lookup Join: Joins data from real-time streams with dimension tables in Doris.
Real-time ETL: Uses Flink to clean data and then writes it into Doris in real-time.
II. Design and Implementation of Flink-Doris-Connector in Typical Scenarios
This section introduces the design and implementation of Flink-Doris-Connector in three scenarios: Scan, Lookup Join, and Write.
01 Scan Scenario
The Scan scenario refers to quickly extracting existing data from Doris. When reading large amounts of data from Doris, traditional JDBC methods may face performance bottlenecks. Therefore, the Flink-Doris-Connector utilizes the Doris Source to take full advantage of Doris's distributed architecture and Flink's parallel processing capabilities, achieving more efficient data synchronization.
Doris Source Reading Process:
The Job Manager sends a request for a query plan to the FE (Frontend), which returns the corresponding BE (Backend) and Tablet for the data to be queried.
Requests are distributed to different TaskManagers based on the different BEs.
Through the TaskManager, data from each Tablet on the corresponding BE is read directly.
By using this method, we can leverage Flink's distributed processing capabilities to improve the efficiency of the entire data synchronization process.
02 Lookup Join Scenarios
For scenarios where dimension tables are stored in Doris, real-time stream data can be joined with Doris dimension tables through Lookup Join.
Limitations of JDBC Connector
Doris supports the MySQL protocol, so it can directly use the JDBC Connector for Lookup Join. However, this method has certain limitations:
The Lookup Join in the JDBC Connector performs synchronous queries, which means each piece of data in the real-time stream must wait for the query result from Doris, increasing latency.
It only supports querying one record at a time. When the upstream data throughput is high, this can easily lead to performance bottlenecks and backpressure.
Optimizations in Flink-Doris-Connector
To address the limitations of the Lookup Join scenario, the Flink-Doris-Connector implements asynchronous Lookup Join and batch accumulation query optimizations:
Support for Asynchronous Lookup Join: Asynchronous Lookup Join means that data in the real-time stream does not need to explicitly wait for the query result of each record, significantly reducing latency.
Support for Batch Accumulation Query: Real-time stream data is appended to a queue. A background listener thread (Watcher) retrieves data from the queue and pushes it to a query execution worker thread pool. The worker threads concatenate the received batch of data into a single
Union All
query and send this query to Doris.
By implementing asynchronous Lookup Join and batch accumulation queries, the throughput of dimension table joins can be significantly improved when upstream data volume is large, ensuring efficient data reading and processing.
03 Real-time ETL Scenario
For real-time writing, the write operation of the Doris Sink is implemented using the Stream Load import method. Stream Load is one of the most common data import methods in Apache Doris, supporting the import of local files or data streams into Doris via the HTTP protocol. The main process is as follows:
After receiving the data, the Sink will initiate a long-lived Stream Load request. During the Checkpoint interval, it continuously sends the received data to Doris in chunks.
At the time of Checkpoint, the previously initiated Stream Load request is committed. Only after the commit is completed will the data become visible.
How to Ensure End-to-End Exactly-Once Semantics for Data Writing
To ensure end-to-end exactly-once semantics for data writing, let's take the example of synchronizing data from Kafka to Doris during the Checkpoint process:
Checkpoint Time: At checkpoint time, the Source will receive a Checkpoint Barrier.
Source Receives Barrier: After receiving the Barrier, the Source first takes a snapshot of its own state and then passes the Checkpoint Barrier down to the Sink.
Sink Receives Barrier: Upon receiving the Barrier, the Sink performs a Pre-commit operation. If successful, the data is fully written to Doris. Since this is a pre-commit, the data is not yet visible to users in Doris.
Save Pre-Commit Transaction ID: The transaction ID of the successfully pre-committed operation is saved to the state.
All Operator Checkpoints Complete: Once all operator checkpoints are completed, the Job Manager sends out a notification that the current Checkpoint has been successfully completed.
Final Commit by Sink: The Sink then commits the previously pre-committed transaction.
By using this two-phase commit process, end-to-end exactly-once semantics can be achieved, ensuring that each record is processed exactly once, even in the event of failures.
Real-Time Processing and Exactly-Once
As mentioned above, the writing operation on the Doris Sink side is bound to Checkpoint, and the delay of data writing to Doris depends on the Checkpoint interval. However, in some users' scenarios, they hope that data can be written in real-time, but Checkpoint cannot be performed too frequently. Meanwhile, for some jobs, overly frequent Checkpoint will consume a lot of resources. To address this situation, Flink-Doris-Connector has introduced a batching mechanism to balance the contradiction between real-time performance and resource consumption.
The implementation principle of batching is that after the Sink side receives upstream data, it does not immediately write each piece of data to Doris individually. Instead, it first caches the data in the memory, and then submits the cached data to Doris through corresponding parameter settings. Combining batch writing with the primary key model in Doris can ensure the idempotency of data writing.
The introduction of the batching mechanism not only meets users' needs for real-time data writing but also avoids the resource consumption problem caused by frequent Checkpoint, thereby achieving the optimization of performance and efficiency.
III. Full Database Synchronization Solution Based on Flink CDC
The above is an introduction to the typical scenarios and implementation principles of Flink-Doris-Connector. Next, let's look at one of its important applications in actual business—full database synchronization. Compared with the underlying implementation, full database synchronization is more inclined to specific usage scenarios. Based on the capabilities introduced earlier, we will further explore how to realize efficient and automated synchronization from TP databases to Doris through Flink CDC.
01 Pain Points of Full Database Synchronization
During data migration, users usually hope to migrate data to Doris as soon as possible. However, when synchronizing TP databases, full database synchronization often faces the following challenges:
-
Table Creation:
- Rapid batch creation of existing tables: There are often thousands of tables in TP databases, with different structures. For existing tables, corresponding table structures need to be created one by one in Doris;
- Automatic creation and synchronization of new tables after the synchronization task is started: To ensure data integrity and real-time performance, the synchronization tool needs to monitor changes in the TP database in real-time and automatically create and synchronize new tables in Doris.
Metadata Mapping: Convenient mapping of field metadata between upstream and downstream, including conversion of field types, corresponding modification of field names, etc.
Automatic DDL Synchronization: Operations such as adding or deleting columns will change the database structure, which in turn affects data synchronization. Therefore, the synchronization tool needs to be able to capture DDL in real-time and dynamically update the Doris table structure to ensure data accuracy and consistency.
Out-of-the-Box: Zero code, low threshold. An ideal synchronization tool can realize data migration and synchronization with simple configuration.
02 Implementing Full Database Synchronization Based on Flink CDC
In terms of data extraction, Flink-Doris-Connector leverages the characteristic capabilities of Flink CDC:
-
Incremental Snapshot Reading:
- Lock-free reading and concurrent reading: No matter how large the amount of existing data is, the data reading speed can be improved by horizontally increasing the concurrency of Flink.
- Resumable transmission: When the amount of existing data is large, synchronization interruption may occur, and CDC supports the connection and synchronization of interrupted tasks.
Rich Data Source Support: Flink CDC supports various databases such as MySQL, Oracle, SQLServer, etc.
Seamless integration with the existing Flink ecosystem, facilitating combination with existing Flink Sources and Sinks.
One-Click Table Creation and Automatic Metadata Mapping
Flink-Doris-Connector integrates capabilities such as Flink CDC, enabling users to perform full-database synchronization with a single operation. The main principle is that after the Flink CDC Source receives the upstream data source, it will perform shunting processing, with different tables using different Sinks. Meanwhile, in the latest version of the Connector, it also supports a single Sink to synchronize multiple tables and supports the creation and synchronization of new tables.
After integrating the functions of Flink CDC, users only need to submit tasks through Flink-Doris-Connector to automatically create the required tables in Doris, without configuring explicit associations between upstream and downstream tables, thus realizing rapid data synchronization.
When the Flink task starts, the Doris-Flink-Connector will automatically identify whether the corresponding Doris table exists. If the table does not exist, the Doris Flink Connector will automatically create the table and perform shunting according to the table name, so as to realize the Sink access of multiple downstream tables; if the table exists, the synchronization task will start directly.
This improvement not only simplifies the configuration process but also makes the creation and synchronization of new tables more convenient, thereby improving the overall efficiency of data processing.
Light Schema Change and Automatic DDL Synchronization
Before the Apache Doris 1.2 version, the Schema Change operation was quite cumbersome, requiring manual addition or modification of data columns. When the table structure of the upstream TP database changed, it was necessary to pause the data synchronization task, wait for the Schema Change in Doris to complete, and then restart the task.
Schema Change Process:
The client initiates a request to add or delete columns to the FE;
After receiving the request, the FE modifies the current metadata and persists the latest Schema;
The FE synchronizes the result of the Schema Change to the client;
Data Load Process:
When a subsequent import task is initiated, the FE sends the import task and the latest Schema information to the BE;
During data writing, each Rowset of the BE stores the Schema information of the current import;
Query Process:
The FE sends the query plan along with the latest Schema to the BE;
The BE executes the query plan using the latest Schema;
Compaction Process:
In the BE, compare the versions of the Rowsets involved in the merge;
Perform data merging according to the latest Schema Change information.
Tests have shown that compared with the early Schema Change, the data synchronization performance of Light Schema Change has been improved by hundreds of times.
The combination of Light Schema Change and Flink-Doris-Connector enables automatic synchronization of DDL through Flink CDC, with the specific steps as follows:
The Source side captures the upstream Schema Change information and starts DDL change synchronization;
The Doris Sink side identifies and parses DDL operations (adding or deleting columns);
Table verification is performed to determine whether Light Schema Change can be carried out;
Initiate the Schema Change operation;
Based on this implementation, Doris can automatically obtain DDL statements and complete the Schema Change operation in milliseconds. When the table structure of the upstream TP database changes, there is no need to pause the data synchronization task.
Out-of-the-Box: Example of MySQL Full Database Synchronization
For users, as long as they have a Flink client, they can submit a full database synchronization job through the operations shown in the above figure. It supports passing in Flink configurations, such as concurrency settings and Checkpoint intervals. It also allows using regular expressions to configure the tables that need to be synchronized. Meanwhile, the configurations of Flink CDC Source and Doris Sink can be directly passed through to the specific Connector. In this way, users can conveniently submit full database synchronization jobs.
03 Core Advantages of Flink-Doris-Connector
The above optimizations perfectly address user pain points:
Automatic table creation, including automatic creation of existing and incremental tables, eliminating the need for users to pre-create corresponding table structures in Doris.
Automatic mapping of upstream and downstream fields, eliminating the need to manually write matching rules between upstream and downstream fields, saving significant labor costs.
Seamless synchronization of column additions and deletions: Immediately receiving upstream DDL statements and automatically implementing millisecond-level schema changes in Doris, eliminating the need for service downtime and ensuring smooth data synchronization.
Out-of-the-box operation reduces learning costs and allows users to focus more on their business.
IV. Future Plans
Future capabilities planned for the Flink-Doris-Connector are as follows:
Support for real-time reads. Currently, the Doris source only scans data, reading from a bounded stream. CDC scenarios will be supported in the future, allowing Flink to stream data from Doris.
Multiple-table sinks. Currently, the Flink-Doris-Connector supports synchronizing multiple tables with a single sink, but the Stream Load import method only supports importing a single table. Therefore, when there are many tables, it is necessary to maintain a large number of Stream Load connections on the sink side. In the future, a single Stream Load connection will support writing to multiple tables.
For full-database synchronization, support will be provided for more upstream data sources, meeting more data synchronization scenarios.
Top comments (0)