DEV Community

ShannonData.AI
ShannonData.AI

Posted on

ShannonBase HTAP Architecture Analysis: From MySQL Optimizer to Vectorized Execution Engine — Overview


This article is the overview in a series introducing ShannonBase’s query optimization and execution. Subsequent articles will provide detailed introductions to each module, systematically analyzing how ShannonBase implements its HTAP capabilities.

In the modern database field, HTAP (Hybrid Transactional/Analytical Processing) capability is a core metric for evaluating high-performance databases.

ShannonBase, through its Rapid engine (commonly referred to as IMCS — In-Memory Column Store), builds an efficient columnar vectorized execution flow on top of MySQL.

The HTAP execution flow of ShannonBase can be summarized in the following five key stages.

I. Plan Capture & Translation

ShannonBase does not operate completely independently of MySQL; instead, it deeply integrates (Hooks) into MySQL’s optimizer workflow.

Intervention Timing: In the current version, after MySQL’s optimizer generates the original AccessPath, ShannonBase’s optimizer begins its intervention. Future versions will advance this intervention to an earlier stage, intervening during MySQL’s optimization process to more quickly and accurately determine if the current join order is optimal.

Translation Process: Once MySQL completes its optimization, the translate function in ShannonBase's Rapid engine optimizer is responsible for converting MySQL's optimized AccessPath tree into ShannonBase's internal QueryPlan.

Node Conversion: It converts MySQL AccessPath nodes into query nodes supported by Rapid. For example, a MySQL aggregation path is converted into ShannonBase’s Aggregate node, and a filter path is converted into a Filter node.

Fallback Mechanism: For complex operators that ShannonBase cannot yet handle, it encapsulates them using the MySQLNative class (see query_plan.h). This ensures the query can fall back to execution by the native MySQL engine, guaranteeing system robustness. The advantage of this approach is that it allows for an iterative process, first handling the query plans with the greatest impact on performance without affecting overall execution.

Plan Optimizer::translate_access_path(OptimizeContext *ctx, THD *thd, AccessPath *path, const JOIN *join) {
  if (!path) return nullptr;
  switch (path->type) {
    case AccessPath::TABLE_SCAN:
    case AccessPath::INDEX_SCAN:
    case AccessPath::INDEX_RANGE_SCAN: {
      auto scan = std::make_unique<ScanTable>();
      scan->original_path = path;
      TABLE *table{nullptr};
      if (path->type == AccessPath::INDEX_SCAN) {
        table = path->index_scan().table;
        scan->scan_type = ScanTable::ScanType::INDEX_SCAN;
      } else if (path->type == AccessPath::INDEX_RANGE_SCAN) {
        const auto &irs = path->index_range_scan();
        if (irs.used_key_part != nullptr && irs.num_used_key_parts > 0 && irs.used_key_part[0].field != nullptr)
          table = irs.used_key_part[0].field->table;
        scan->scan_type = ScanTable::ScanType::INDEX_SCAN;
      } else {
        table = path->table_scan().table;
        scan->scan_type = ScanTable::ScanType::FULL_TABLE_SCAN;
      }
      assert(table);
      auto share = ShannonBase::shannon_loaded_tables->get(table->s->db.str, table->s->table_name.str);
      auto table_id = share ? share->m_tableid : 0;
      scan->rpd_table = (share->is_partitioned) ? Imcs::Imcs::instance()->get_rpd_parttable(table_id)
                                                : Imcs::Imcs::instance()->get_rpd_table(table_id);
      assert(scan->rpd_table);
      scan->estimated_rows = path->num_output_rows();
      scan->source_table = table;
      return scan;
    } break;
    case AccessPath::HASH_JOIN: {
      auto hashjoin_node = std::make_unique<HashJoin>();
      hashjoin_node->original_path = path;
      auto &param = path->hash_join();
      // Recursively convert children
      hashjoin_node->children.push_back(translate_access_path(ctx, thd, param.outer, join));
      hashjoin_node->children.push_back(translate_access_path(ctx, thd, param.inner, join));
      // Extract Join Conditions
      if (param.join_predicate) {
        for (auto *cond : param.join_predicate->expr->equijoin_conditions) {
          hashjoin_node->join_conditions.push_back(cond);
        }
        // Handle other conditions...
      }
      hashjoin_node->allow_spill = param.allow_spill_to_disk;
      hashjoin_node->estimated_rows = path->num_output_rows();
      return hashjoin_node;
    } break;
    case AccessPath::NESTED_LOOP_JOIN: {
      auto nestloop_node = std::make_unique<NestLoopJoin>();
      nestloop_node->original_path = path;
      auto &param = path->nested_loop_join();
      // Recursively convert children
      nestloop_node->children.push_back(translate_access_path(ctx, thd, param.outer, join));
      nestloop_node->children.push_back(translate_access_path(ctx, thd, param.inner, join));
      nestloop_node->pfs_batch_mode = param.pfs_batch_mode;
      nestloop_node->already_expanded_predicates = param.already_expanded_predicates;
      // Extract Join Conditions
      nestloop_node->source_join_predicate = param.join_predicate;
      if (param.join_predicate) {
        for (auto *cond : param.join_predicate->expr->equijoin_conditions) {
          nestloop_node->join_conditions.push_back(cond);
        }
        // Handle other conditions...
      }
      nestloop_node->equijoin_predicates = param.equijoin_predicates;
      return nestloop_node;
    } break;
    case AccessPath::AGGREGATE: {
      auto agg = std::make_unique<LocalAgg>();
      agg->original_path = path;
      auto param = path->aggregate();
      agg->olap = param.olap;
      agg->children.push_back(translate_access_path(ctx, thd, param.child, join));
      fill_aggregate_info(agg.get(), join);
      return agg;
    } break;
    case AccessPath::LIMIT_OFFSET: {
      auto limit = std::make_unique<Limit>();
      limit->original_path = path;
      auto param = path->limit_offset();
      limit->limit = (param.limit - param.offset);  // mysql limit is (sql limit + sql offset)
      limit->offset = param.offset;
      limit->count_all_rows = param.count_all_rows;
      limit->reject_multiple_rows = param.reject_multiple_rows;
      limit->send_records_override = (param.send_records_override) ? *param.send_records_override : 0;
      limit->children.push_back(translate_access_path(ctx, thd, param.child, join));
      return limit;
    } break;
    case AccessPath::FILTER: {
      auto filter = std::make_unique<Filter>();
      filter->original_path = path;
      auto param = path->filter();
      filter->condition = param.condition;
      filter->children.push_back(translate_access_path(ctx, thd, param.child, join));
      filter->predict = Optimizer::convert_item_to_predicate(thd, param.condition);
      return filter;
    } break;
    case AccessPath::SORT: {
      auto param = path->sort();
      // only it has `limit` clause, can be converted to `TopN`, such as `select xxx from xxx order by xx limit xx`.
      if (param.limit > 0 && param.limit != HA_POS_ERROR) {
        auto topn = std::make_unique<TopN>();
        topn->original_path = path;
        topn->order = param.order;
        topn->limit = param.limit;
        topn->filesort = param.filesort;
        topn->children.push_back(translate_access_path(ctx, thd, param.child, join));
        topn->estimated_rows = std::min(topn->children[0]->estimated_rows, (ha_rows)param.limit);
        return topn;
      } else {
        // without `LIMIT` clause, keep it as `order by`
        auto sort = std::make_unique<Sort>();
        sort->original_path = path;
        sort->order = param.order;
        sort->filesort = param.filesort;
        sort->limit = HA_POS_ERROR;
        sort->children.push_back(translate_access_path(ctx, thd, param.child, join));
        sort->estimated_rows = sort->children[0]->estimated_rows;
        sort->remove_duplicates = param.remove_duplicates;
        sort->unwrap_rollup = param.unwrap_rollup;
        sort->force_sort_rowids = param.force_sort_rowids;
        sort->tables_to_get_rowid_for = param.tables_to_get_rowid_for;
        return sort;
      }
      assert(false);
      return nullptr;
    } break;
    case AccessPath::EQ_REF: {
      auto param = path->eq_ref();
      // Check if this is a dynamic join condition (not a constant lookup)
      bool dynamic_lookup{false};
      for (uint i = 0; i < param.ref->key_parts; i++) {
        Item *item = param.ref->items[i];
        if (!item) continue;
        // Check if this item references fields from other tables or is a non-constant expression
        if (!item->const_item()) {
          dynamic_lookup = true;
          break;
        }
      }
      if (!dynamic_lookup) {
        auto scan = std::make_unique<ScanTable>();
        scan->original_path = path;
        scan->source_table = param.table;
        scan->scan_type = ScanTable::ScanType::EQ_REF_SCAN;
        auto share = ShannonBase::shannon_loaded_tables->get(scan->source_table->s->db.str,
                                                             scan->source_table->s->table_name.str);
        auto table_id = share ? share->m_tableid : 0;
        scan->rpd_table = (share->is_partitioned) ? Imcs::Imcs::instance()->get_rpd_parttable(table_id)
                                                  : Imcs::Imcs::instance()->get_rpd_table(table_id);
        assert(scan->rpd_table);
        // If this is a dynamic join condition, we cannot convert it to a static Predicate. Return nullptr
        // to indicate this should be handled as a join condition during execution.
        // This is a JOIN condition like "c.customer_id = o.customer_id"
        // It should be handled by the join executor, not converted to a static filter predicate
        scan->prune_predicate = Optimizer::convert_item_to_predicate(thd, param.ref, param.table);
        return scan;
      } else {  // if it's dynamic join condition(such as a.id = b.id), the join condition cannot be pushed down.
        auto eq_ref = std::make_unique<MySQLNative>();
        eq_ref->original_path = path;
        return eq_ref;
      }
      assert(false);
      return nullptr;  // not reach forever.
    } break;
    default: {
      // if Rapid can not handle, then re-encapsulate to a Fallback node
      auto original = std::make_unique<MySQLNative>();
      original->original_path = path;
      original->estimated_rows = path->num_output_rows();
      // no need to transalte anymore, because it's a MySQL AccessPath.
      return original;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

II. Core Optimization Rules

Once the internal QueryPlan is generated, the Optimizer applies a series of optimization rules. The core optimizations include:


void Optimizer::AddDefaultRules() {
  // becareful the order of rules. The rules be applied in the order of added.
  // Make predicates available
  m_optimize_rules.emplace_back(std::make_unique<PredicatePushDown>());
  // Use predicates for IMCU pruning
  m_optimize_rules.emplace_back(std::make_unique<StorageIndexPrune>());
  // After predicates clarify needed columns
  m_optimize_rules.emplace_back(std::make_unique<ProjectionPruning>());
  // Before aggregation changes structure
  m_optimize_rules.emplace_back(std::make_unique<TopNPushDown>());
  // aggregation push down to lower level operators
  m_optimize_rules.emplace_back(std::make_unique<AggregationPushDown>());
  // Re-run after structure changes
  m_optimize_rules.emplace_back(std::make_unique<ProjectionPruning>());
  // Final reordering with all optimizations
  m_optimize_rules.emplace_back(std::make_unique<JoinReOrder>());
  m_registered.store(true, std::memory_order_relaxed);
}
Plan Optimizer::Optimize(const OptimizeContext *context, const THD *thd, const JOIN *join) {
  if (!m_registered.load()) AddDefaultRules();
  if (m_optimize_rules.empty()) return nullptr;
  QueryPlan plan;
  plan.root = get_query_plan(const_cast<OptimizeContext *>(context), const_cast<THD *>(thd), const_cast<JOIN *>(join));
  for (auto &rule : m_optimize_rules) {
    Timer rule_timer;
    rule->apply(plan.root);
  }
  return std::move(plan.root);
}
Enter fullscreen mode Exit fullscreen mode

Predicate PushDown: Pushes filter conditions as far down as possible towards the storage layer.

Storage Index Prune: This is key to HTAP performance improvement. With the Storage Index (see writable_access_path.inc), the engine can use statistical information (such as Max/Min) to exclude non-matching data blocks before reading them, significantly reducing I/O pressure.

Projection Pruning: A key advantage of columnar storage is reading only the required columns. The optimizer calculates projected_columns to ensure the execution engine loads only the relevant column data.

Aggregation PushDown: When possible, aggregation operations are completed directly during the scan process, avoiding the generation of a large number of intermediate row data.

III. Vectorized AccessPath Reconstruction

Once the optimization is complete, the QueryPlan needs to be remapped back into an execution framework that MySQL can understand, but at this point, the path is already equipped with “vectorized” capabilities.
The RapidAccessPath structure is defined in access_path.h:

  ...
  bool m_vectorized{false}; 
  ...
};
Enter fullscreen mode Exit fullscreen mode

The ToAccessPath method within it is responsible for completing this conversion and setting the m_vectorized flag to true. This marks that the query path is now ready to enter ShannonBase's high-performance execution channel.

IV. Vectorized Operator Instantiation (Iterator Creation)

This is the core part of HTAP execution. The PathGenerator class is responsible for instantiating the AccessPath into a concrete RowIterator. ShannonBase retains MySQL's execution engine. The advantage of this approach is that it maximizes the reuse of the original execution mechanisms, related concepts, data structures, and so on.

Vectorized Operator Library: Rapid implements a series of vectorized operators (currently completed operators include TableScanIterator, HashJoinIterator, AggregateIterator, etc., with more vectorized execution operators to be provided in the future).

Execution Mode Selection: If path->vectorized is true, the PathGenerator will prioritize creating ShannonBase's own fast operators.

Data Flow Direction: These operators no longer process data in the traditional MySQL way of “one row at a time” (Iterative Row-at-a-time), but rather process data in “one batch/vector at a time,” fully leveraging the acceleration of modern CPU SIMD instruction sets.

unique_ptr_destroy_only<RowIterator> PathGenerator::CreateIteratorFromAccessPath(THD *thd, MEM_ROOT *mem_root,
                                                                                 OptimizeContext *context,
                                                                                 AccessPath *top_path, JOIN *top_join,
                                                                                 bool top_eligible_for_batch_mode) {
  unique_ptr_destroy_only<RowIterator> ret;
  Mem_root_array<IteratorToBeCreated> todo(mem_root);
  todo.push_back({top_path, top_join, top_eligible_for_batch_mode, &ret, {}});
  // The access path trees can be pretty deep, and the stack frames can be big
  // on certain compilers/setups, so instead of explicit recursion, we push jobs
  // onto a MEM_ROOT-backed stack. This uses a little more RAM (the MEM_ROOT
  // typically lives to the end of the query), but reduces the stack usage
  // greatly.
  //
  // The general rule is that if an iterator requires any children, it will push
  // jobs for their access paths at the end of the stack and then re-push
  // itself. When the children are instantiated and we get back to the original
  // iterator, we'll actually instantiate it. (We distinguish between the two
  // cases on basis of whether job.children has been allocated or not; the child
  // iterator's destination will point into this array. The child list needs
  // to be allocated in a way that doesn't move around if the TODO job list
  // is reallocated, which we do by means of allocating it directly on the
  // MEM_ROOT.)
  while (!todo.empty()) {
    IteratorToBeCreated job = todo.back();
    todo.pop_back();
    AccessPath *path = job.path;
    JOIN *join = job.join;
    bool eligible_for_batch_mode = job.eligible_for_batch_mode;
    if (job.join != nullptr) {
      assert(!job.join->needs_finalize);
    }
    unique_ptr_destroy_only<RowIterator> iterator;
    ha_rows *examined_rows = nullptr;
    if (path->count_examined_rows && join != nullptr) {
      examined_rows = &join->examined_rows;
    }
    switch (path->type) {
      case AccessPath::TABLE_SCAN: {
        const auto &param = path->table_scan();
        std::unique_ptr<Imcs::Predicate> predicate{nullptr};
        std::vector<uint32_t> projection;
        ha_rows limit{HA_POS_ERROR};
        ha_rows offset{0};
        bool use_storage_index{false};
        if (path->secondary_engine_data) {
          auto rapid_scan_param = static_cast<RapidScanParameters *>(path->secondary_engine_data);
          predicate = std::move(rapid_scan_param->prune_predicate);
          projection = std::move(rapid_scan_param->projected_columns);
          limit = rapid_scan_param->limit;
          offset = rapid_scan_param->offset;
          use_storage_index = true;
          rapid_scan_param->~RapidScanParameters();
#ifndef NDEBUG
          if (predicate) {
            DBUG_PRINT("rapid_optimizer",
                       ("TABLE_SCAN: Passing predicate to iterator: %s", predicate->to_string().c_str()));
          }
          if (!projection.empty()) {
            DBUG_PRINT("rapid_optimizer", ("TABLE_SCAN: Column projection enabled (%zu columns)", projection.size()));
          }
#endif
        }
        // Here param.table maybe a temp table/in-memory temp table.)
        if (path->vectorized && param.table->s->table_category == enum_table_category::TABLE_CATEGORY_USER) {
          assert(param.table->s->is_secondary_engine());
          iterator = NewIterator<ShannonBase::Executor::VectorizedTableScanIterator>(
              thd, mem_root, param.table, path->num_output_rows(), examined_rows, std::move(predicate), projection,
              limit, offset, use_storage_index);
        } else
          iterator = NewIterator<TableScanIterator>(thd, mem_root, param.table, path->num_output_rows(), examined_rows);
        break;
      }
      ...
Enter fullscreen mode Exit fullscreen mode
namespace ShannonBase {
namespace Executor {
/**
 * VectorizedTableScanIterator - A vectorized table scan iterator that processes data in batches
 *
 * This iterator implements a vectorized execution model for table scanning, where data is
 * processed in batches rather than row-by-row. It leverages columnar storage and SIMD
 * optimizations to improve cache locality and CPU efficiency.
 */
class VectorizedTableScanIterator : public TableRowIterator {
 public:
  VectorizedTableScanIterator(THD *thd, TABLE *table, double expected_rows, ha_rows *examined_rows,
                              std::unique_ptr<Imcs::Predicate> predicate = nullptr,
                              const std::vector<uint32_t> &projection = {}, ha_rows limit = HA_POS_ERROR,
                              ha_rows offset = 0, bool use_storage_index = false);
  bool Init() override;
  int Read() override;
  /**
   * Set a filter function to be applied during scanning
   * @param filter Filter function that returns true for rows to keep
   */
  void set_filter(filter_func_t filter) { m_filter = filter; }
  size_t GetCurrentBatchSize() const { return m_batch_size; }
 private:
  size_t EstimateRowSize() const;
  /**
   * Calculate the optimal batch size based on system characteristics
   * Considers cache line size, row size, and expected row count
   * @param expected_rows Estimated number of rows to process
   * @return Optimal batch size in number of rows
   */
  size_t CalculateOptimalBatchSize(double expected_rows);
  void CacheActiveFields();
  /**
   * Preallocate memory for column chunks to avoid runtime allocations
   * Sets up the columnar storage buffers for batch processing
   */
  void PreallocateColumnChunks();
  int ReadNextBatch();
  inline void ClearBatchData() {
    for (auto &chunk : m_col_chunks) chunk.clear();
  }
  void UpdatePerformanceMetrics(std::chrono::high_resolution_clock::time_point start_time);
  /**
   * Adapt batch size based on runtime performance characteristics
   * Implements dynamic batch size adjustment for optimal performance
   */
  void AdaptBatchSize();
  /**
   * Populate the current row from batch data to MySQL row format
   * @return 0 on success, error code on failure
   */
  int PopulateCurrentRow();
  /**
   * Process field data from column chunk to MySQL field format
   * Dispatches to specialized handlers based on field type
   * @param field MySQL field structure
   * @param col_chunk Column chunk containing the data
   * @param rowid Row index within the current batch
   */
  inline void ProcessFieldData(Field *field, const ShannonBase::Executor::ColumnChunk &col_chunk, size_t rowid) {
    if (Utils::Util::is_string(field->type()) || Utils::Util::is_blob(field->type())) {
      ProcessStringField(field, col_chunk, rowid);
    } else {
      ProcessNumericField(field, col_chunk, rowid);
    }
  }
  ...
  }

  int VectorizedTableScanIterator::Read() {
  int result{ShannonBase::SHANNON_SUCCESS};
  if (m_batch_exhausted && !m_eof_reached) {
    result = ReadNextBatch();
    if (result) {
      if (result == HA_ERR_END_OF_FILE && m_curr_batch_size == 0) return HandleError(HA_ERR_END_OF_FILE);
      if (result != HA_ERR_END_OF_FILE) return HandleError(result);
    }
  }
  if (m_curr_row_in_batch >= m_curr_batch_size) {
    if (m_eof_reached) {
      return HandleError(HA_ERR_END_OF_FILE);
    } else {  // read the next batch.
      m_batch_exhausted = true;
      return Read();
    }
  }
  // fill up the data to table->field
  result = PopulateCurrentRow();
  if (result) return HandleError(result);
  // move to the next row.
  m_curr_row_in_batch++;
  m_metrics.total_rows++;
  if (m_curr_row_in_batch >= m_curr_batch_size) {
    m_batch_exhausted = true;
    if (!m_eof_reached) m_curr_row_in_batch = 0;
  }
  return result;
}
int VectorizedTableScanIterator::ReadNextBatch() {
  auto batch_start = std::chrono::high_resolution_clock::now();
  ClearBatchData();
  size_t read_cnt = 0;
  int result = m_cursor->next(m_batch_size, m_col_chunks, read_cnt);
  if (result != 0) {
    if (result == HA_ERR_END_OF_FILE) {
      m_eof_reached = true;
      if (read_cnt) {
        m_batch_exhausted = false;
        m_metrics.total_batches++;
        UpdatePerformanceMetrics(batch_start);
      }
      m_curr_batch_size = read_cnt;
      m_curr_row_in_batch = 0;
      return HA_ERR_END_OF_FILE;
    }
    if (++m_metrics.error_count > 10) return HA_ERR_GENERIC;
    if (result == HA_ERR_RECORD_DELETED && !thd()->killed) return ReadNextBatch();
    return result;
  }
  if (read_cnt == 0) {  // no data read, therefore set to EOF.
    m_eof_reached = true;
    return HA_ERR_END_OF_FILE;
  }
  m_curr_batch_size = read_cnt;
  m_curr_row_in_batch = 0;
  m_batch_exhausted = false;
  m_metrics.total_batches++;
  UpdatePerformanceMetrics(batch_start);
  AdaptBatchSize();
  return ShannonBase::SHANNON_SUCCESS;
}
Enter fullscreen mode Exit fullscreen mode

V. Storage Engine Integration

Finally, the execution flow interfaces with the storage layer through ha_shannon_rapid.cc.

Handler Management: The ha_rapid plugin opens tables via the open interface. If it is a partitioned table, it acquires rpd_parttable.

Columnar Scan: Execution operators interact with the In-Memory Columnar Units (IMCU) in memory via the RapidCursor.

HTAP Synchronization Mechanism: The SelfLoadManager and Populator mentioned in the code are responsible for synchronously loading/real-time streaming row-based data from InnoDB into the columnar storage space of the Rapid engine, ensuring that HTAP queries can see the latest transactional data.

Conclusion

ShannonBase’s HTAP execution flow follows a model of “deep integration, smooth enhancement.” It retains MySQL’s consistent external interface, but internally achieves its capabilities through:

Intelligent Translation Layer: Seamlessly translate MySQL plans.
High-Performance Optimization Rules: Implementing index pruning and predicate pushdown.

Vectorized Execution Engine: Leveraging columnar storage and batch processing or vector processing to achieve a leap in analytical performance.

This design allows users to directly gain the ability to handle large-scale analytical tasks without modifying business SQL, truly realizing the integration of OLTP and OLAP.

ShannonBase is designed for data + AI
• MySQL-compatible SQL engine
• Built-in Vector & ML capabilities
• HTAP / IMCS for AI-native workloads
Star the repo
🧩 Submit PRs
🐞 Open Issues
💬 Join Discussion

Top comments (0)