DEV Community

Apache Doris
Apache Doris

Posted on

10x Query Performance Improvement: The Design and Implementation of the New Unique Key

In business scenarios of real-time data warehouses, providing good support for real-time data updates is an extremely important capability. For example, in scenarios such as database synchronization (CDC), e-commerce transaction orders, advertising effect delivery, and marketing business reports, when facing changes in upstream data, it is usually necessary to quickly capture change records and promptly modify single or multiple rows of data. This ensures that business analysts and related analysis platforms can quickly grasp the latest progress and improve the timeliness of business decisions.

For OLAP databases, which have traditionally been weak at data updates, how to better implement real-time update capabilities has become a key to winning fierce competition in today's environment where data timeliness requirements are increasingly strong and the application scope of real-time data warehouse businesses is expanding.

In the past, Apache Doris mainly implemented real-time data Upserts through the Unique Key data model. Due to its underlying LSM Tree-like structure, it provides strong support for high-frequency writes of large datasets. However, its Merge-on-Read update mode has become a bottleneck restricting Apache Doris' real-time update capabilities, which may cause query jitters when dealing with concurrent reading and writing of real-time data.

Based on this, in the Apache Doris 1.2.0 version, we introduced a new data update method - Merge-On-Write - for the Unique Key model, striving to balance real-time updates and efficient queries. This article will detail the design, implementation and effects of the new primary key model.

Implementation of the Original Unique Key Model

Users familiar with Apache Doris' history may know that Doris' initial design was inspired by Google Mesa, and it only had Duplicate Key and Aggregate Key models at first. The Unique Key model was added later based on user needs during Doris' development. However, the demand for real-time updates was not so strong at that time, so the implementation of Unique Key was relatively simple - it was just a wrapper around the Aggregate Key model, without in-depth optimization for real-time update requirements.

Specifically, the implementation of the Unique Key model is just a special case of the Aggregate Key model. If you use the Aggregate Key model and set the aggregation type of all non-key columns to REPLACE, you can achieve exactly the same effect. As shown in the following figure, when describing example_tbl, a table of the Unique Key model, the aggregation type in the last column shows that it is equivalent to an Aggregate Key table where all columns have the REPLACE aggregation type.

Image: Original Unique-Key-Aggregate-Key

Both the Unique Key and Aggregate Key data models adopt the Merge-On-Read implementation method. That is, when data is imported, it is first written to a new Rowset, and no deduplication is performed after writing. Only when a query is initiated will multi-way concurrent sorting be performed. During multi-way merge sorting, duplicate keys will be grouped together and aggregation operations will be performed. Among them, keys with higher versions will overwrite those with lower versions, and finally only the record with the highest version will be returned to the user.

The following figure is a simplified representation of the execution process of the Unique Key model:

Image: Performance Improvement - Simplified Unique-Key

Although their implementation methods are relatively consistent, the usage scenarios of the Unique Key and Aggregate Key data models are significantly different:

When users create a table with the Aggregate Key model, they have a very clear understanding of the aggregation query conditions - aggregating according to the columns specified by the Aggregate Key, and the aggregate functions on the Value columns are the main aggregation methods (COUNT/SUM/MAX/MIN, etc.) used by users. For example, using user_id as the Aggregate Key and summing the number of visits and duration to calculate UV and user usage duration.

However, the main function of the Key in the Unique Key data model is to ensure uniqueness, not to serve as an aggregation Key. For example, in the order scenario, data synchronized from TP databases through Flink CDC uses the order ID as the Unique Key for deduplication. However, during queries, filtering, aggregation and analysis are usually performed on certain Value columns (such as order status, order amount, order time consumption, order placement time, etc.).

Shortcomings

As can be seen from the above, when users query using the Unique Key model, they actually perform two aggregation operations. The first is to aggregate all data by Key according to the Unique Key to remove duplicate Keys; the second is to aggregate according to the actual aggregation conditions required by the query. These two aggregation operations lead to serious efficiency issues and low query performance:

  1. Data deduplication requires expensive multi-way merge sorting, and full Key comparison consumes a lot of CPU computing resources.

  2. Effective data pruning cannot be performed, introducing a large amount of additional data IO. For example, if a data partition has 10 million pieces of data, but only 1,000 pieces meet the filtering conditions, the rich indexes of the OLAP system are designed to efficiently filter out these 1,000 pieces of data. However, since it is impossible to determine whether a certain piece of data in a specific file is valid, these indexes cannot be used. It is necessary to first perform full merge sorting and data deduplication, and then filter these finally confirmed valid data. This brings about a 10,000-fold IO amplification (this figure is only a rough estimate, and the actual amplification effect is more complicated to calculate).

Scheme Research and Selection

In order to solve the problems existing in the original Unique Key model and better meet the needs of business scenarios, we decided to optimize the Unique Key model and conducted a detailed research on optimization schemes for read and write efficiency issues.

There have been many industry explorations on solutions to the above problems. There are three representative types:

  1. Delete + Insert: That is, when writing data, find the overwritten key through a primary key index and mark it as deleted. A representative system is Microsoft's SQL Server.

  2. Delta Store: Divide data into base data and delta data. Each primary key in the base data is guaranteed to be unique. All updates are recorded in the Delta Store. During queries, the base data and delta data are merged. At the same time, background merge threads regularly merge the delta data and base data. A representative system is Apache Kudu.

  3. Copy-on-Write: When updating data, directly copy the original data row, update it, and write it to a new file. This method is widely used in data lakes, with representative systems such as Apache Hudi and Delta Lake.

The implementation mechanisms and comparisons of these three schemes are as follows:

Delete + Insert (i.e., Merge-on-Write)

A representative example is the scheme proposed in the paper "Real-Time Analytical Processing with SQL Server" published by SQL Server in VLDB in 2015. Simply put, this paper proposes that when writing data, old data is marked for deletion (using a data structure called Delete Bitmap), and new data is recorded in the Delta Store. During queries, the Base data, Delete Bitmap, and data in the Delta Store are merged to obtain the latest data. The overall scheme is shown in the following figure, and will not be elaborated due to space limitations.

Image: Performance Improvement - Merge-on-Write

The advantage of this scheme is that any valid primary key exists only in one place (either in Base Data or Delta Store), which avoids a large amount of merge sorting consumption during queries. At the same time, various rich columnar indexes in the Base data remain valid.

Delta Store

A representative system using the Delta Store method is Apache Kudu. In Kudu, data is divided into Base Data and Delta Data. The primary keys in the Base Data are all unique. Any modification to the Base data will be first written to the Delta Store (marking the corresponding relationship with the Base Data through row numbers, which can avoid sorting during merging). Different from the Base + Delta of SQL Server mentioned earlier, Kudu does not mark deletions, so data with the same primary key will exist in two places. Therefore, during queries, the data from Base and Delta must be merged to obtain the latest result. Kudu's scheme is shown in the following figure:

Image: Performance Improvement - Delta-Store

Kudu's scheme can also avoid the high cost caused by merge sorting when reading data. However, since data with the same primary key can exist in multiple places, it is difficult to ensure the accuracy of indexes and cannot perform efficient predicate pushdown. Indexes and predicate pushdown are important means for analytical databases to optimize performance, so this shortcoming has a significant impact on performance.

Copy-On-Write

Since Apache Doris is positioned as a real-time analytical database, the Copy-On-Write scheme has too high a cost for real-time updates and is not suitable for Doris.

Scheme Comparison

The following table compares various schemes. Among them, Merge-On-Read is the default implementation of the Unique Key model, i.e., the implementation before version 1.2. Merge-On-Write (Merge on Write) is the Delete + Insert scheme mentioned earlier.

Image: Performance Improvement - Scheme Comparison

As can be seen from the above, Merge-On-Write trades moderate write costs for lower read costs, well supports predicate pushdown and non-key column index filtering, and has good effects on query performance optimization. After comprehensive comparison, we chose Merge-On-Write as the final optimization scheme.

Design and Implementation of the New Scheme

In short, the processing flow of Merge-On-Write is:

  1. For each Key, find its position in the Base data (rowsetid + segmentid + row number).

  2. If the Key exists, mark the corresponding row of data as deleted. The information of marked deletion is recorded in the Delete Bitmap, and each Segment has a corresponding Delete Bitmap.

  3. Write the updated data to a new Rowset, complete the transaction, and make the new data visible (able to be queried).

  4. During queries, read the Delete Bitmap, filter out the rows marked as deleted, and only return valid data.

Key Issues

To design a Merge-On-Write scheme suitable for Doris, the following key issues need to be focused on solving:

  1. How to efficiently locate whether there is old data that needs to be marked for deletion during import?

  2. How to efficiently store the information of marked deletion?

  3. How to efficiently use the marked deletion information to filter data during the query phase?

  4. Can multi-version support be realized?

  5. How to avoid transaction conflicts in concurrent imports and write conflicts between imports and Compaction?

  6. Is the additional memory consumption introduced by the scheme reasonable?

  7. Is the write performance degradation caused by write costs within an acceptable range?

Based on the above key issues, we have implemented a series of optimization measures to solve these problems well. They will be introduced in detail in the following text:

Primary Key Index

Since Doris is a columnar storage system designed for large-scale analysis, it does not have the capability of primary key index. Therefore, in order to quickly locate whether there is a primary key to be overwritten and the row number of the primary key to be overwritten, it is necessary to add a primary key index to Doris.

We have taken the following optimization measures:

  • Maintain a primary key index for each Segment. The primary key index is implemented using a scheme similar to RocksDB Partitioned Index. This scheme can achieve very high query QPS, and the file-based index scheme can also save memory usage.

  • Maintain a Bloom Filter corresponding to the primary key index for each Segment. The primary key index will only be queried when the Bloom Filter hits.

  • Record a primary key range [min-key, max-key] for each Segment.

  • Maintain a pure in-memory interval tree, constructed using the primary key ranges of all Segments. When querying a primary key, there is no need to traverse all Segments. The interval tree can be used to locate the Segments that may contain the primary key, greatly reducing the amount of indexes that need to be queried.

  • For all hit Segments, query them in descending order of version. In Doris, a higher version means more updated data. Therefore, if a primary key hits in the index of a higher-version Segment, there is no need to continue querying lower-version Segments.

The flow of querying a single primary key is shown in the following figure:

Image: Performance Improvement - Primary Key Index

Delete Bitmap

Delete Bitmap adopts a multi-version recording method, as shown in the following figure:

Image: Performance Improvement - Delete-Bitmap

The Segment file in the figure is generated by the import of version 5, including the imported data of version 5 in this Tablet.

The import of version 6 includes the update of primary key B, so the second row will be marked as deleted in the Bitmap, and the modification of this Segment by the import of version 6 will be recorded in the DeleteBitmap.

The import of version 7 includes the update of primary key A, which will also generate a Bitmap corresponding to the version; similarly, the import of version 8 will also generate a corresponding Bitmap.

All Delete Bitmaps are stored in a large Map. Each import will serialize the latest Delete Bitmap into RocksDB. The key definitions are as follows:

using SegmentId = uint32_t;
using Version = uint64_t;
using BitmapKey = std::tuple<RowsetId, SegmentId, Version>;
std::map<BitmapKey, roaring::Roaring> delete_bitmap;
Enter fullscreen mode Exit fullscreen mode

Each Segment in each Rowset will record multiple versions of Bitmaps. A Bitmap with Version x means the modification of the current Segment by the import of version x.

Advantages of multi-version Delete Bitmap:

  • It can well support multi-version queries. For example, after the import of version 7 is completed, a query on this table starts to execute and will use Version 7. Even if the query takes a long time and the import of version 8 is completed during the query execution, there is no need to worry about reading the data of version 8 (or missing the data deleted by version 8).

  • It can well support complex Schema Changes. In Doris, complex Schema Changes (such as type conversion) require double writing first, and at the same time convert historical data before a certain version and then delete the old version of data. Multi-version Delete Bitmap can well support the current Schema Change implementation.

  • It can support multi-version requirements during data copying and replica repair.

However, multi-version Delete Bitmap also has corresponding costs. In the previous example, to access the data of version 8, the three Bitmaps of v6, v7 and v8 need to be merged to get a complete Bitmap, and then this Bitmap is used to filter the Segment data. In real-time high-frequency import scenarios, a large number of Bitmaps can be easily generated, and the CPU cost of the union operation of Roaringbitmap is high. In order to minimize the impact of a large number of union operations, we added an LRUCache to DeleteBitmap to record the latest merged Bitmaps.

Write Flow

When writing data, the primary key index of each Segment will be created first, and then the Delete Bitmap will be updated. The establishment of the primary key index is relatively simple and will not be described in detail due to space limitations. The focus is on introducing the more complex Delete Bitmap update flow:

Image: Performance Improvement - Write Flow

  • DeltaWriter will first flush the data to the disk.

  • In the Publish phase, batch point queries are performed on all Keys, and the Bitmaps corresponding to the overwritten Keys are updated. In the following figure, the version of the newly written Rowset is 8, which modifies the data in 3 Rowsets, so 3 Bitmap modification records will be generated.

  • Updating the Bitmap in the Publish phase ensures that no new visible Rowsets will appear during the batch point query of Keys and Bitmap update, ensuring the correctness of Bitmap update.

  • If a Segment is not modified, there will be no Bitmap record corresponding to the version. For example, Segment1 of Rowset1 has no Bitmap corresponding to Version 8.

Read Flow

The reading flow of Bitmap is shown in the following figure. It can be seen from the figure:

Image: Performance Improvement - Read Flow

  • A Query requesting version 7 will only see the data corresponding to version 7.

  • When reading the data of Rowset5, the Bitmaps generated by the modifications of v6 and v7 to it will be merged to obtain the complete DeleteBitmap corresponding to Version7, which is used to filter data.

  • In the example in the figure, the import of version 8 overwrites a piece of data in Segment2 of Rowset1, but the Query requesting version 7 can still read this piece of data.

In high-frequency import scenarios, there may be a large number of versions of Bitmaps. Merging these Bitmaps itself may also consume a lot of CPU computing resources. Therefore, we introduced an LRUCache, and each version of Bitmap only needs to be merged once.

Handling of Compaction and Write Conflicts

Normal Compaction Flow

  • When Compaction reads data, it obtains the version Vx of the Rowset being processed, and will automatically filter out the rows marked as deleted through the Delete Bitmap (see the query layer adaptation part earlier).

  • After Compaction is completed, all DeleteBitmaps on the source Rowset that are less than or equal to version Vx can be cleaned up.

Handling of Compaction and Write Conflicts

  • During the execution of Compaction, a new import task may be submitted, assuming the corresponding version is Vy. If the write corresponding to Vy has modifications to the Rowset in the Compaction source, it will be updated to Vy of the DeleteBitmap of this Rowset.

  • After Compaction is completed, check all DeleteBitmaps on this Rowset that are greater than Vx, and update the row numbers in them to the Segment row numbers in the newly generated Rowset.

As shown in the following figure, Compaction selects three Rowsets [0-5], [6-6], [7-7]. During the Compaction process, the import of Version8 is successfully executed. In the Compaction Commit phase, it is necessary to process the new Bitmap generated by the data import of Version8.

Image: Performance Improvement - Compaction

Write Performance Optimization

In the initial design, DeltaWriter did not perform point queries and Delete Bitmap updates during the data writing phase, but did so in the Publish phase. This can ensure that all data before this version can be seen when updating the Delete Bitmap, ensuring the correctness of the Delete Bitmap. However, in actual high-frequency import tests, it was found that the additional consumption caused by serial full-point queries and updates of each Rowset's data in the Publish phase would lead to a significant drop in import throughput.

Therefore, in the final design, we changed the update of Delete Bitmap to a two-phase form: the first phase can be executed in parallel, only finding and marking deletions for the Version visible at that time; the second phase must be executed serially, and updating the data in the newly imported Rowsets that may have been missed in the previous first phase. The amount of incremental update data in the second phase is very small, so the impact on the overall throughput is very limited.

Optimization Effects

The new Merge-On-Write implementation marks old data as deleted during writing, which can always ensure that valid primary keys only appear in one file (that is, the uniqueness of primary keys is ensured during writing). There is no need to deduplicate primary keys through merge sorting during reading. For high-frequency writing scenarios, this greatly reduces the additional consumption during query execution.

In addition, the new version implementation can also support predicate pushdown and make good use of Doris' rich indexes. Sufficient data pruning can be performed at the data IO level, greatly reducing the amount of data read and computed. Therefore, there is a significant performance improvement in queries in many scenarios.

It should be noted that if users use the Unique Key in low-frequency batch update scenarios, the improvement of the Merge-On-Write implementation on users' query effects may not be obvious. Because for low-frequency batch updates, Doris' Compaction mechanism can usually quickly compact the data into a good state (that is, Compaction completes the deduplication of primary keys), avoiding the deduplication computing cost during queries.

Optimization Effects on Aggregation Analysis

We conducted tests using the Lineitem table, which has the largest data volume in TPC-H 100. To simulate multiple continuous writing scenarios, the data was divided into 100 parts and imported repeatedly 3 times. Then count(*) queries were performed, and the effect comparison is as follows:

Image: Optimization - Aggregation Analysis

The scenarios with and without Cache were compared respectively. In the case of no Cache, due to the high time consumption of loading data from the disk, there is an overall performance improvement of about 4 times; excluding the impact of disk reading overhead, in the case of Cache, the computing efficiency of the new version implementation can be improved by more than 20 times.

The effect of Sum is similar, and will not be listed due to space limitations.

SSB Flat

In addition to simple Count and Sum, we also tested the SSB-Flat dataset. The optimization effect on the 100G dataset (divided into 10 parts and imported multiple times to simulate data update scenarios) is shown in the following figure:

In business scenarios of real-time data warehouses, providing good support for real-time data updates is an extremely important capability. For example, in scenarios such as database synchronization (CDC), e-commerce transaction orders, advertising effect delivery, and marketing business reports, when facing changes in upstream data, it is usually necessary to quickly capture change records and promptly modify single or multiple rows of data. This ensures that business analysts and related analysis platforms can quickly grasp the latest progress and improve the timeliness of business decisions.

For OLAP databases, which have traditionally been weak at data updates, how to better implement real-time update capabilities has become a key to winning fierce competition in today's environment where data timeliness requirements are increasingly strong and the application scope of real-time data warehouse businesses is expanding.

In the past, Apache Doris mainly implemented real-time data Upserts through the Unique Key data model. Due to its underlying LSM Tree-like structure, it provides strong support for high-frequency writes of large datasets. However, its Merge-on-Read update mode has become a bottleneck restricting Apache Doris' real-time update capabilities, which may cause query jitters when dealing with concurrent reading and writing of real-time data.

Based on this, in the Apache Doris 1.2.0 version, we introduced a new data update method - Merge-On-Write - for the Unique Key model, striving to balance real-time updates and efficient queries. This article will detail the design, implementation and effects of the new primary key model.

Implementation of the Original Unique Key Model

Users familiar with Apache Doris' history may know that Doris' initial design was inspired by Google Mesa, and it only had Duplicate Key and Aggregate Key models at first. The Unique Key model was added later based on user needs during Doris' development. However, the demand for real-time updates was not so strong at that time, so the implementation of Unique Key was relatively simple - it was just a wrapper around the Aggregate Key model, without in-depth optimization for real-time update requirements.

Specifically, the implementation of the Unique Key model is just a special case of the Aggregate Key model. If you use the Aggregate Key model and set the aggregation type of all non-key columns to REPLACE, you can achieve exactly the same effect. As shown in the following figure, when describing example_tbl, a table of the Unique Key model, the aggregation type in the last column shows that it is equivalent to an Aggregate Key table where all columns have the REPLACE aggregation type.

Image: Original Unique-Key-Aggregate-Key

Both the Unique Key and Aggregate Key data models adopt the Merge-On-Read implementation method. That is, when data is imported, it is first written to a new Rowset, and no deduplication is performed after writing. Only when a query is initiated will multi-way concurrent sorting be performed. During multi-way merge sorting, duplicate keys will be grouped together and aggregation operations will be performed. Among them, keys with higher versions will overwrite those with lower versions, and finally only the record with the highest version will be returned to the user.

The following figure is a simplified representation of the execution process of the Unique Key model:

Image: Performance Improvement - Simplified Unique-Key

Although their implementation methods are relatively consistent, the usage scenarios of the Unique Key and Aggregate Key data models are significantly different:

When users create a table with the Aggregate Key model, they have a very clear understanding of the aggregation query conditions - aggregating according to the columns specified by the Aggregate Key, and the aggregate functions on the Value columns are the main aggregation methods (COUNT/SUM/MAX/MIN, etc.) used by users. For example, using user_id as the Aggregate Key and summing the number of visits and duration to calculate UV and user usage duration.

However, the main function of the Key in the Unique Key data model is to ensure uniqueness, not to serve as an aggregation Key. For example, in the order scenario, data synchronized from TP databases through Flink CDC uses the order ID as the Unique Key for deduplication. However, during queries, filtering, aggregation and analysis are usually performed on certain Value columns (such as order status, order amount, order time consumption, order placement time, etc.).

Shortcomings

As can be seen from the above, when users query using the Unique Key model, they actually perform two aggregation operations. The first is to aggregate all data by Key according to the Unique Key to remove duplicate Keys; the second is to aggregate according to the actual aggregation conditions required by the query. These two aggregation operations lead to serious efficiency issues and low query performance:

  1. Data deduplication requires expensive multi-way merge sorting, and full Key comparison consumes a lot of CPU computing resources.

  2. Effective data pruning cannot be performed, introducing a large amount of additional data IO. For example, if a data partition has 10 million pieces of data, but only 1,000 pieces meet the filtering conditions, the rich indexes of the OLAP system are designed to efficiently filter out these 1,000 pieces of data. However, since it is impossible to determine whether a certain piece of data in a specific file is valid, these indexes cannot be used. It is necessary to first perform full merge sorting and data deduplication, and then filter these finally confirmed valid data. This brings about a 10,000-fold IO amplification (this figure is only a rough estimate, and the actual amplification effect is more complicated to calculate).

Scheme Research and Selection

In order to solve the problems existing in the original Unique Key model and better meet the needs of business scenarios, we decided to optimize the Unique Key model and conducted a detailed research on optimization schemes for read and write efficiency issues.

There have been many industry explorations on solutions to the above problems. There are three representative types:

  1. Delete + Insert: That is, when writing data, find the overwritten key through a primary key index and mark it as deleted. A representative system is Microsoft's SQL Server.

  2. Delta Store: Divide data into base data and delta data. Each primary key in the base data is guaranteed to be unique. All updates are recorded in the Delta Store. During queries, the base data and delta data are merged. At the same time, background merge threads regularly merge the delta data and base data. A representative system is Apache Kudu.

  3. Copy-on-Write: When updating data, directly copy the original data row, update it, and write it to a new file. This method is widely used in data lakes, with representative systems such as Apache Hudi and Delta Lake.

The implementation mechanisms and comparisons of these three schemes are as follows:

Delete + Insert (i.e., Merge-on-Write)

A representative example is the scheme proposed in the paper "Real-Time Analytical Processing with SQL Server" published by SQL Server in VLDB in 2015. Simply put, this paper proposes that when writing data, old data is marked for deletion (using a data structure called Delete Bitmap), and new data is recorded in the Delta Store. During queries, the Base data, Delete Bitmap, and data in the Delta Store are merged to obtain the latest data. The overall scheme is shown in the following figure, and will not be elaborated due to space limitations.

Image: Performance Improvement - Merge-on-Write

The advantage of this scheme is that any valid primary key exists only in one place (either in Base Data or Delta Store), which avoids a large amount of merge sorting consumption during queries. At the same time, various rich columnar indexes in the Base data remain valid.

Delta Store

A representative system using the Delta Store method is Apache Kudu. In Kudu, data is divided into Base Data and Delta Data. The primary keys in the Base Data are all unique. Any modification to the Base data will be first written to the Delta Store (marking the corresponding relationship with the Base Data through row numbers, which can avoid sorting during merging). Different from the Base + Delta of SQL Server mentioned earlier, Kudu does not mark deletions, so data with the same primary key will exist in two places. Therefore, during queries, the data from Base and Delta must be merged to obtain the latest result. Kudu's scheme is shown in the following figure:

Image: Performance Improvement - Delta-Store

Kudu's scheme can also avoid the high cost caused by merge sorting when reading data. However, since data with the same primary key can exist in multiple places, it is difficult to ensure the accuracy of indexes and cannot perform efficient predicate pushdown. Indexes and predicate pushdown are important means for analytical databases to optimize performance, so this shortcoming has a significant impact on performance.

Copy-On-Write

Since Apache Doris is positioned as a real-time analytical database, the Copy-On-Write scheme has too high a cost for real-time updates and is not suitable for Doris.

Scheme Comparison

The following table compares various schemes. Among them, Merge-On-Read is the default implementation of the Unique Key model, i.e., the implementation before version 1.2. Merge-On-Write (Merge on Write) is the Delete + Insert scheme mentioned earlier.

Image: Performance Improvement - Scheme Comparison

As can be seen from the above, Merge-On-Write trades moderate write costs for lower read costs, well supports predicate pushdown and non-key column index filtering, and has good effects on query performance optimization. After comprehensive comparison, we chose Merge-On-Write as the final optimization scheme.

Design and Implementation of the New Scheme

In short, the processing flow of Merge-On-Write is:

  1. For each Key, find its position in the Base data (rowsetid + segmentid + row number).

  2. If the Key exists, mark the corresponding row of data as deleted. The information of marked deletion is recorded in the Delete Bitmap, and each Segment has a corresponding Delete Bitmap.

  3. Write the updated data to a new Rowset, complete the transaction, and make the new data visible (able to be queried).

  4. During queries, read the Delete Bitmap, filter out the rows marked as deleted, and only return valid data.

Key Issues

To design a Merge-On-Write scheme suitable for Doris, the following key issues need to be focused on solving:

  1. How to efficiently locate whether there is old data that needs to be marked for deletion during import?

  2. How to efficiently store the information of marked deletion?

  3. How to efficiently use the marked deletion information to filter data during the query phase?

  4. Can multi-version support be realized?

  5. How to avoid transaction conflicts in concurrent imports and write conflicts between imports and Compaction?

  6. Is the additional memory consumption introduced by the scheme reasonable?

  7. Is the write performance degradation caused by write costs within an acceptable range?

Based on the above key issues, we have implemented a series of optimization measures to solve these problems well. They will be introduced in detail in the following text:

Primary Key Index

Since Doris is a columnar storage system designed for large-scale analysis, it does not have the capability of primary key index. Therefore, in order to quickly locate whether there is a primary key to be overwritten and the row number of the primary key to be overwritten, it is necessary to add a primary key index to Doris.

We have taken the following optimization measures:

  • Maintain a primary key index for each Segment. The primary key index is implemented using a scheme similar to RocksDB Partitioned Index. This scheme can achieve very high query QPS, and the file-based index scheme can also save memory usage.

  • Maintain a Bloom Filter corresponding to the primary key index for each Segment. The primary key index will only be queried when the Bloom Filter hits.

  • Record a primary key range [min-key, max-key] for each Segment.

  • Maintain a pure in-memory interval tree, constructed using the primary key ranges of all Segments. When querying a primary key, there is no need to traverse all Segments. The interval tree can be used to locate the Segments that may contain the primary key, greatly reducing the amount of indexes that need to be queried.

  • For all hit Segments, query them in descending order of version. In Doris, a higher version means more updated data. Therefore, if a primary key hits in the index of a higher-version Segment, there is no need to continue querying lower-version Segments.

The flow of querying a single primary key is shown in the following figure:

Image: Performance Improvement - Primary Key Index

Delete Bitmap

Delete Bitmap adopts a multi-version recording method, as shown in the following figure:

Image: Performance Improvement - Delete-Bitmap

The Segment file in the figure is generated by the import of version 5, including the imported data of version 5 in this Tablet.

The import of version 6 includes the update of primary key B, so the second row will be marked as deleted in the Bitmap, and the modification of this Segment by the import of version 6 will be recorded in the DeleteBitmap.

The import of version 7 includes the update of primary key A, which will also generate a Bitmap corresponding to the version; similarly, the import of version 8 will also generate a corresponding Bitmap.

All Delete Bitmaps are stored in a large Map. Each import will serialize the latest Delete Bitmap into RocksDB. The key definitions are as follows:

using SegmentId = uint32_t;
using Version = uint64_t;
using BitmapKey = std::tuple<RowsetId, SegmentId, Version>;
std::map<BitmapKey, roaring::Roaring> delete_bitmap;
Enter fullscreen mode Exit fullscreen mode

Each Segment in each Rowset will record multiple versions of Bitmaps. A Bitmap with Version x means the modification of the current Segment by the import of version x.

Advantages of multi-version Delete Bitmap:

  • It can well support multi-version queries. For example, after the import of version 7 is completed, a query on this table starts to execute and will use Version 7. Even if the query takes a long time and the import of version 8 is completed during the query execution, there is no need to worry about reading the data of version 8 (or missing the data deleted by version 8).

  • It can well support complex Schema Changes. In Doris, complex Schema Changes (such as type conversion) require double writing first, and at the same time convert historical data before a certain version and then delete the old version of data. Multi-version Delete Bitmap can well support the current Schema Change implementation.

  • It can support multi-version requirements during data copying and replica repair.

However, multi-version Delete Bitmap also has corresponding costs. In the previous example, to access the data of version 8, the three Bitmaps of v6, v7 and v8 need to be merged to get a complete Bitmap, and then this Bitmap is used to filter the Segment data. In real-time high-frequency import scenarios, a large number of Bitmaps can be easily generated, and the CPU cost of the union operation of Roaringbitmap is high. In order to minimize the impact of a large number of union operations, we added an LRUCache to DeleteBitmap to record the latest merged Bitmaps.

Write Flow

When writing data, the primary key index of each Segment will be created first, and then the Delete Bitmap will be updated. The establishment of the primary key index is relatively simple and will not be described in detail due to space limitations. The focus is on introducing the more complex Delete Bitmap update flow:

Image: Performance Improvement - Write Flow

  • DeltaWriter will first flush the data to the disk.

  • In the Publish phase, batch point queries are performed on all Keys, and the Bitmaps corresponding to the overwritten Keys are updated. In the following figure, the version of the newly written Rowset is 8, which modifies the data in 3 Rowsets, so 3 Bitmap modification records will be generated.

  • Updating the Bitmap in the Publish phase ensures that no new visible Rowsets will appear during the batch point query of Keys and Bitmap update, ensuring the correctness of Bitmap update.

  • If a Segment is not modified, there will be no Bitmap record corresponding to the version. For example, Segment1 of Rowset1 has no Bitmap corresponding to Version 8.

Read Flow

The reading flow of Bitmap is shown in the following figure. It can be seen from the figure:

Image: Performance Improvement - Read Flow

  • A Query requesting version 7 will only see the data corresponding to version 7.

  • When reading the data of Rowset5, the Bitmaps generated by the modifications of v6 and v7 to it will be merged to obtain the complete DeleteBitmap corresponding to Version7, which is used to filter data.

  • In the example in the figure, the import of version 8 overwrites a piece of data in Segment2 of Rowset1, but the Query requesting version 7 can still read this piece of data.

In high-frequency import scenarios, there may be a large number of versions of Bitmaps. Merging these Bitmaps itself may also consume a lot of CPU computing resources. Therefore, we introduced an LRUCache, and each version of Bitmap only needs to be merged once.

Handling of Compaction and Write Conflicts

Normal Compaction Flow

  • When Compaction reads data, it obtains the version Vx of the Rowset being processed, and will automatically filter out the rows marked as deleted through the Delete Bitmap (see the query layer adaptation part earlier).

  • After Compaction is completed, all DeleteBitmaps on the source Rowset that are less than or equal to version Vx can be cleaned up.

Handling of Compaction and Write Conflicts

  • During the execution of Compaction, a new import task may be submitted, assuming the corresponding version is Vy. If the write corresponding to Vy has modifications to the Rowset in the Compaction source, it will be updated to Vy of the DeleteBitmap of this Rowset.

  • After Compaction is completed, check all DeleteBitmaps on this Rowset that are greater than Vx, and update the row numbers in them to the Segment row numbers in the newly generated Rowset.

As shown in the following figure, Compaction selects three Rowsets [0-5], [6-6], [7-7]. During the Compaction process, the import of Version8 is successfully executed. In the Compaction Commit phase, it is necessary to process the new Bitmap generated by the data import of Version8.

Image: Performance Improvement - Compaction

Write Performance Optimization

In the initial design, DeltaWriter did not perform point queries and Delete Bitmap updates during the data writing phase, but did so in the Publish phase. This can ensure that all data before this version can be seen when updating the Delete Bitmap, ensuring the correctness of the Delete Bitmap. However, in actual high-frequency import tests, it was found that the additional consumption caused by serial full-point queries and updates of each Rowset's data in the Publish phase would lead to a significant drop in import throughput.

Therefore, in the final design, we changed the update of Delete Bitmap to a two-phase form: the first phase can be executed in parallel, only finding and marking deletions for the Version visible at that time; the second phase must be executed serially, and updating the data in the newly imported Rowsets that may have been missed in the previous first phase. The amount of incremental update data in the second phase is very small, so the impact on the overall throughput is very limited.

Optimization Effects

The new Merge-On-Write implementation marks old data as deleted during writing, which can always ensure that valid primary keys only appear in one file (that is, the uniqueness of primary keys is ensured during writing). There is no need to deduplicate primary keys through merge sorting during reading. For high-frequency writing scenarios, this greatly reduces the additional consumption during query execution.

In addition, the new version implementation can also support predicate pushdown and make good use of Doris' rich indexes. Sufficient data pruning can be performed at the data IO level, greatly reducing the amount of data read and computed. Therefore, there is a significant performance improvement in queries in many scenarios.

It should be noted that if users use the Unique Key in low-frequency batch update scenarios, the improvement of the Merge-On-Write implementation on users' query effects may not be obvious. Because for low-frequency batch updates, Doris' Compaction mechanism can usually quickly compact the data into a good state (that is, Compaction completes the deduplication of primary keys), avoiding the deduplication computing cost during queries.

Optimization Effects on Aggregation Analysis

We conducted tests using the Lineitem table, which has the largest data volume in TPC-H 100. To simulate multiple continuous writing scenarios, the data was divided into 100 parts and imported repeatedly 3 times. Then count(*) queries were performed, and the effect comparison is as follows:

Image: Optimization - Aggregation Analysis

The scenarios with and without Cache were compared respectively. In the case of no Cache, due to the high time consumption of loading data from the disk, there is an overall performance improvement of about 4 times; excluding the impact of disk reading overhead, in the case of Cache, the computing efficiency of the new version implementation can be improved by more than 20 times.

The effect of Sum is similar, and will not be listed due to space limitations.

SSB Flat

In addition to simple Count and Sum, we also tested the SSB-Flat dataset. The optimization effect on the 100G dataset (divided into 10 parts and imported multiple times to simulate data update scenarios) is shown in the following figure:

Explanation of test results:

  • Under the typical configuration of 32C64GB, the total time for all queries to complete is 4.5 seconds for the new version implementation, and 126.4 seconds for the old version implementation, with a speed difference of nearly 30 times. Further analysis found that when queries were executed on the table of the old version implementation, all 32-core CPUs were fully loaded. Therefore, a machine with a higher configuration was used to test the query time on the table of the old version implementation when computing resources were sufficient.

  • Under the configuration of 64C128GB, the total time of the old version implementation is 49.9s, and the maximum number of cores used is about 48. When computing resources are sufficient, the old version implementation still has a 12-fold performance gap compared with the new version implementation.

It can be seen that the new version implementation not only greatly improves the query speed, but also significantly reduces CPU consumption.

Impact on Data Import

The new Merge-On-Write implementation is mainly to optimize the query performance of data. As mentioned earlier, it has achieved good results. However, these optimization effects are obtained by doing some additional work during writing. Therefore, the new version of Merge-On-Write implementation will slow down the data import efficiency to a small extent. However, due to concurrency and the pipeline effect between multiple batches of imports, the overall throughput does not decrease significantly.

Usage Method

In version 1.2, as a new Feature, Merge-on-Write is disabled by default. Users can enable it by adding the following Property when creating a table:

"enable_unique_key_merge_on_write" = "true"
Enter fullscreen mode Exit fullscreen mode

In addition, the new version of the Merge-on-Write data update mode is different from the old version of the Merge-on-Read implementation. Therefore, the already created Unique Key table cannot directly support it by adding Property through Alter Table, and it can only be specified when creating a new table. If users need to convert the old table to the new table, they can use the method of insert into new_table select * from old_table.

Explanation of test results:

  • Under the typical configuration of 32C64GB, the total time for all queries to complete is 4.5 seconds for the new version implementation, and 126.4 seconds for the old version implementation, with a speed difference of nearly 30 times. Further analysis found that when queries were executed on the table of the old version implementation, all 32-core CPUs were fully loaded. Therefore, a machine with a higher configuration was used to test the query time on the table of the old version implementation when computing resources were sufficient.

  • Under the configuration of 64C128GB, the total time of the old version implementation is 49.9s, and the maximum number of cores used is about 48. When computing resources are sufficient, the old version implementation still has a 12-fold performance gap compared with the new version implementation.

It can be seen that the new version implementation not only greatly improves the query speed, but also significantly reduces CPU consumption.

Impact on Data Import

The new Merge-On-Write implementation is mainly to optimize the query performance of data. As mentioned earlier, it has achieved good results. However, these optimization effects are obtained by doing some additional work during writing. Therefore, the new version of Merge-On-Write implementation will slow down the data import efficiency to a small extent. However, due to concurrency and the pipeline effect between multiple batches of imports, the overall throughput does not decrease significantly.

Usage Method

In version 1.2, as a new Feature, Merge-on-Write is disabled by default. Users can enable it by adding the following Property when creating a table:

"enable_unique_key_merge_on_write" = "true"
Enter fullscreen mode Exit fullscreen mode

In addition, the new version of the Merge-on-Write data update mode is different from the old version of the Merge-on-Read implementation. Therefore, the already created Unique Key table cannot directly support it by adding Property through Alter Table, and it can only be specified when creating a new table. If users need to convert the old table to the new table, they can use the method of insert into new_table select * from old_table.

Top comments (0)