In modern database systems, data volume is growing exponentially. How to efficiently load data from primary storage (such as InnoDB) to secondary engines (such as in-memory analytical engines, columnar engines, etc.) has become a critical performance challenge. Traditional serial loading methods often become a system bottleneck when dealing with large tables, especially partitioned tables. This article will delve into a parallel data loading scheme for partitioned tables, providing a detailed analysis of its design philosophy, implementation details, and the benefits it brings.
I. Why Parallel Loading? Drawbacks of the Traditional Approach
Traditional data loading typically employs a single-threaded sequential scan (Serial Scan). A worker thread reads each partition of the source data table in order, processes rows one by one, and then writes them to the target engine. The drawbacks of this model become glaringly obvious as data scales:
Wasted Hardware Resources: Modern servers are commonly equipped with multi-core CPUs. The single-threaded model utilizes only one core, leaving a significant amount of CPU resources idle and failing to harness the hardware’s full potential.
Significant Performance Bottleneck: The loading speed is entirely limited by the processing power of a single core and the throughput of a single I/O stream. When data volume reaches the terabyte level, the loading process can take hours, severely impacting data availability and business real-time requirements.
Poor Scalability: Even if the server is upgraded with more CPU cores, the performance of serial loading cannot be improved. This architecture lacks horizontal scalability and cannot cope with future, faster data growth demands.
Disregard for Partitioned Table Advantages: Partitioned tables physically split data into independent, individually manageable data blocks. This provides a natural foundation for parallel processing. However, traditional serial loading methods completely ignore this advantage, still treating the table as a single, massive entity.
Therefore, to break the single-point performance bottleneck, fully utilize system resources, and embrace the architectural advantages brought by data partitioning, adopting a parallel loading solution is imperative.
II. Key Architectural Design Points for Parallel Loading
Our goal is to decompose the massive monolithic task of data loading into a set of smaller tasks that can be executed in parallel. Based on this concept, we have designed the following core points:
Task Granularity: A single partition within a partitioned table is treated as the smallest unit of work for parallel processing. Loading the data of each partition is considered an independent task, making task division clear and natural.
unsigned int num_threads = std::thread::hardware_concurrency() * 0.8;
if (num_threads == 0) num_threads = SHANNON_PARTS_PARALLEL;
Concurrency Model: A thread pool model is introduced. The system creates a group of worker threads based on hardware configuration (e.g., std:🧵:hardware_concurrency()). These threads atomically fetch (fetch-and-add) pending partition tasks from a shared task list, achieving dynamic load balancing.
Resource Isolation & Context Management: This is the most critical and complex aspect of the parallel design. In a database kernel environment, multiple threads operating in parallel can easily cause race conditions and state pollution. Therefore, it is essential to provide a completely isolated execution context for each worker thread. This includes:
bool PartitionLoadThreadContext::initialize(const Rapid_load_context *context) {
// Create THD
m_thd = new THD;
if (!m_thd) return true;
m_thd->set_new_thread_id();
m_thd->thread_stack = (char *)this;
m_thd->set_command(COM_DAEMON);
m_thd->security_context()->skip_grants();
m_thd->system_thread = NON_SYSTEM_THREAD;
m_thd->store_globals();
m_thd->lex->sql_command = SQLCOM_SELECT;
// Open table from source table share.
TABLE_SHARE *share = context->m_table->s;
m_table = (TABLE *)m_thd->mem_root->Alloc(sizeof(TABLE));
if (!m_table) return true;
// get a copy of source TABLE object with its table share. TABLE will be used for feteching data from part tables.
// we will clone a new handler for using multi-cursor. The invoker[mysql_secodary_load_unload] hold the refcnt
// of shhare, here, we dont need to warry about its be released.
if (open_table_from_share(m_thd, share, share->path.str, 0, SKIP_NEW_HANDLER, 0, m_table, false, nullptr)) {
return true;
}
m_table->in_use = m_thd;
m_table->alias_name_used = context->m_table->alias_name_used;
m_table->read_set = context->m_table->read_set;
m_table->write_set = context->m_table->write_set;
return false;
}
bool PartitionLoadThreadContext::clone_handler(ha_innopart *file, const Rapid_load_context *context,
std::mutex &clone_mutex) {
std::lock_guard<std::mutex> lock(clone_mutex);
THD *original_thd = context->m_table->in_use;
context->m_table->in_use = m_thd;
m_handler = static_cast<ha_innopart *>(file->clone(context->m_table->s->normalized_path.str, m_thd->mem_root));
context->m_table->in_use = original_thd;
if (!m_handler) return true;
m_handler->change_table_ptr(m_table, m_table->s);
m_table->file = m_handler;
// Note: ha_open() is not needed because:
// 1. ha_innopart::clone() inherits the open state from the source handler
// 2. change_table_ptr() updates internal pointers while preserving the open state
// 3. Partition-level operations (rnd_init_in_part/rnd_next_in_part) work directly
return false;
}
Robust Error Handling Mechanism: In a parallel system, the failure of any single task can affect the whole. We designed a “Fast-Fail” mechanism:
Safe Resource Management: Utilize C++’s RAII (Resource Acquisition Is Initialization) idiom to ensure correct resource release. For example, the PartitionLoadHandlerLock class acquires the external lock for InnoDB in its constructor and automatically releases it in its destructor. This ensures the lock is always released, regardless of whether an exception is thrown during execution, thus avoiding deadlocks.
III. Introduction to Core Implementation Details
Based on the above design, let’s look at the specific implementation in the code:
Task Creation and Distribution: The load_innodbpart_parallel function first iterates through context->m_extra_info.m_partition_infos, creating a partition_load_task_t task structure for each partition and storing it in a tasks vector. An atomic counter std::atomic_size_t task_idx serves as the shared task index, allowing all worker threads to safely claim tasks.
unsigned int num_threads = std::thread::hardware_concurrency() * 0.8;
if (num_threads == 0) num_threads = SHANNON_PARTS_PARALLEL;
std::vector<partition_load_task_t> tasks;
tasks.reserve(context->m_extra_info.m_partition_infos.size());
for (auto &[part_name, part_id] : context->m_extra_info.m_partition_infos) {
partition_load_task_t task;
task.part_id = part_id;
task.part_key = part_name + "#" + std::to_string(part_id);
task.result = ShannonBase::SHANNON_SUCCESS;
task.rows_loaded = 0;
tasks.push_back(std::move(task));
}
Thread Pool and Work Unit: A temporary thread pool is created via std::vectorstd::thread workers_pool. The core logic executed by each thread is encapsulated in the worker_func lambda expression.
Thread Context Initialization (PartitionLoadThreadContext): At the beginning of worker_func, each thread creates an instance of PartitionLoadThreadContext. Within its initialize() and clone_handler() methods, the crucial operations of creating an independent THD and cloning the ha_innopart handler are performed, ensuring that the execution environments of threads do not interfere with each other.
Single Partition Loading Logic (load_one_partition): This lambda function is the actual data mover. It receives a partition task and executes the following steps:
auto load_one_partition = [&](partition_load_task_t &task,
ha_innopart *task_handler) -> int { // Lambda: load a partition.
int result{ShannonBase::SHANNON_SUCCESS};
task.rows_loaded = 0;
if (task_handler == nullptr) {
std::lock_guard<std::mutex> lock(error_mutex);
task.error_msg = "Handler clone is null for partition " + std::to_string(task.part_id);
task.result = HA_ERR_GENERIC;
return HA_ERR_GENERIC;
}
Rapid_load_context::extra_info_t::m_active_part_key = task.part_key;
....
if (task_handler->inited == handler::NONE && task_handler->rnd_init_in_part(task.part_id, true)) {
....
}
part_initialized = true;
int tmp{HA_ERR_GENERIC};
std::unique_ptr<uchar[]> rec_buff = std::make_unique<uchar[]>(context->m_table->s->rec_buff_length);
memset(rec_buff.get(), 0, context->m_table->s->rec_buff_length);
while ((tmp = task_handler->rnd_next_in_part(task.part_id, rec_buff.get())) != HA_ERR_END_OF_FILE) {
if (tmp == HA_ERR_KEY_NOT_FOUND) break;
....
auto partition_ptr = part_tb_ptr->get_partition(task.part_key);
...
// parttable is shared_ptr/unique_ptr to PartTable
if (partition_ptr->write(context, rec_buff.get(), context->m_table->s->reclength, col_offsets.data(),
context->m_table->s->fields, null_byte_offsets.data(), null_bitmasks.data())) {
std::lock_guard<std::mutex> lock(error_mutex);
task.error_msg = "load data from " + sch_name + "." + table_name + " to imcs failed";
task.result = HA_ERR_GENERIC;
return HA_ERR_GENERIC;
}
memset(rec_buff.get(), 0, context->m_table->s->rec_buff_length);
task.rows_loaded++;
if (tmp == HA_ERR_RECORD_DELETED && !context->m_thd->killed) continue;
}
task.result = ShannonBase::SHANNON_SUCCESS;
return result;
};
Lock and Transaction Management (PartitionLoadHandlerLock): Within worker_func, the instantiation of PartitionLoadHandlerLock automatically performs the ha_external_lock(m_thd, F_RDLCK) operation, applying the necessary lock for data reading. When worker_func ends, the lock is automatically released via its destructor.
IV. Summary
The partition-based parallel loading scheme introduced in this article successfully transforms a time-consuming serial task into an efficient parallel workflow. Through carefully designed task granularity, thread model, resource isolation, and error handling mechanisms, this implementation not only significantly improves data loading performance and fully unleashes the potential of modern multi-core hardware but also ensures operational stability and security within the database environment.
Practice has proven that this architecture has excellent scalability: as the number of CPU cores and table partitions increases, the throughput of data loading can grow almost linearly. It serves as a typical example of combining modern C++ parallel programming techniques with database kernel knowledge, providing a mature, efficient, and reliable engineering solution for the rapid import of massive data.
Top comments (0)