DEV Community

Franck Pachot for YugabyteDB Distributed PostgreSQL Database

Posted on • Edited on

Scalable range sharding to avoid hotspots on indexes

TL;DR: To distribute the writes you can add a prefix to your range indexes, and YugabyteDB Index Skip Scan will still use it to scale the reads. This prefix can be a low-cardinality column from the table, or a hash modulo calculated on the indexed column.


When defining the sharding method in YugabyteDB you have the choice between:

  • Hash sharding. A hash function is applied to the HASH part of the key (by default the first column). This hash code (which you can calculate yourself with yb_hash_code()) is an integer from 0 to 65535 (0x0000 to 0xFFFF in hexadecimal). It determines the tablet where the table row (for the primary key) or the index entry (for secondary indexes) will be stored. Each tablet holds a range of hash values, for example a table split into 3 tablets will have those ranges: hash_split: [0x0000, 0x5554], hash_split: [0x5555, 0xAAA9] and hash_split: [0xAAAA, 0xFFFF]. Within each tablet, this hash code is added to the keys as a prefix so that the rows are sorted on the hash code before the key value.
    Hash sharding is perfect to distribute the rows, to multiple tablets, and then to multiple locations within a tablet, and read them with point queries (equality predicates on all hash columns). Typically, you use it for columns that don't have a significant order: a UUID, a text with no alphabetical order semantic (like a generated code), an integer with no arithmetic (like a sequence).

  • Range sharding. For columns that will be queried by range (greater than, less than, BETWEEN, LIKE with a known prefix), or that you will fetch with an ORDER BY, you don't want to apply a hash value but store them in order, ascending or descending, clustered. Typically you use range sharding for timestamps, alphabetical text, integers with arithmetic semantic, floats, numeric where the ordering is significant. This is not the default, except with collocated tables that go into one tablet only. For non-colocated tables, you define range sharding by mentioning ASC or DESC in the index or primary key definition.
    Contrary to hash sharding, they are not immediately split into multiple tablets because the range of values is unknown when the table is created. When you already know the data that you will load, you can define the split values in the CREATE TABLE statement to pre-split it. In all cases, auto-split will split a tablet when it grows, to keep tablet size in control. For example an index on first names may see range: [<start>, DocKey([], ["Jeff"])), range: [DocKey([], ["Jeff"]), DocKey([], ["Martin"])) and range: [DocKey([], ["Martin"]), <end>) if the volume of rows between those names is balanced. Range sharding allows index scans for point queries as well as range queries.

From a read point of view, the choice is easy. When you have only point queries, hash-sharding is great to distribute immediately without additional concern. When you may have range queries, you define range sharding.

From a write point of view, hash sharding is optimal for throughput because concurrent inserts will go to multiple tablets. Range sharding may add pressure on some hotpots when the concurrent inserts have the same or close values for the indexed column.

Those hotspots are encountered when a number is generated from a sequence, or a timestamp taking its value from the current time, or simply when a value becomes popular (think of thousands of users 'liking' a new post, at the same time, from a popular influencer). For this case, and after this long introduction, I'll introduce a third case:

  • Prefixed Range Sharding. The idea here is to use range sharding, because we expect range queries, but prefixing the key with a low cardinality hash value, and use range sharding on it. I'll explain this with an example. The idea is to distribute but to a few ranges only.

Pgbench example

YugabyteDB is compatible with PostgreSQL. I create the PostgreSQL PgBench tables:

pgbench --initialize --init-steps=dtpG 

Enter fullscreen mode Exit fullscreen mode

This initializes the tables by (d)eleting them if already exist, creating the (t)ables with (p)rimary key and (G)enerateing some rows. You can ignore the message about FILLFACTOR which is used in PostgreSQL to mitigate the bloat issues- There's no bloat with YugabyteDB's MVCC implementation. For the same reason, I didn't (v)acuum in the initialization steps because YugabyteDB doesn't need vaccum to.

I insert 20 million rows into the pgbench_history by running the 'simple' PgBench workload:

pgbench --client=20 --transactions=1000000 --protocol=prepared --builtin=simple --no-vacuum 

Enter fullscreen mode Exit fullscreen mode

If your pgbench client is far from the nodes you connect to, you can generate less rows and increase the size with an insert as select. PgBench is a row-by-row application.

I'll create a first index that illustrates my point. pgbench_history has an mtime column that is a timestamp:

yugabyte=# \d pgbench_history

                    Table "public.pgbench_history"
 Column |            Type             | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
 tid    | integer                     |           |          |
 bid    | integer                     |           |          |
 aid    | integer                     |           |          |
 delta  | integer                     |           |          |
 mtime  | timestamp without time zone |           |          |
 filler | character(22)               |           |          |
Enter fullscreen mode Exit fullscreen mode

This column is set to the transaction's CURRENT_TIMESTAMP:

yugabyte=# \! pgbench --show-script=simple
-- simple-update: <builtin: simple update>
\set aid random(1, 100000 * :scale)
\set bid random(1, 1 * :scale)
\set tid random(1, 10 * :scale)
\set delta random(-5000, 5000)
BEGIN;
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
END;
Enter fullscreen mode Exit fullscreen mode

That's exactly the kind of column where you want to distribute the writes but read by range.

You can also note that the tid column has a value from 1 to 10 and I'll use that later, in one alternative to the solution.

The application behavior for this mtime column has two properties:

  • the values inserted by concurrent transactions are very close, which may create hotspot on an range index
  • they are slightly different by a few milliseconds, so that a hash function can avoid hostpots

Indexing and Access patterns

Nobody will run a point query for a specific timestamp (given its millisecond precision). This means that an HASH index on mtime is useless.

This mtime column will be typically used by two kind of range queries:

  • find the transactions on a range of time, for example counting the transactions in the last hour:
 select count(*) from pgbench_history 
 where mtime > $1; 
;
Enter fullscreen mode Exit fullscreen mode
  • find the last transactions, a typical Top-n query with an ORDER BY and LIMIT:
 select *
 from pgbench_history
 order by mtime desc limit $1
;
Enter fullscreen mode Exit fullscreen mode

In this table, I have 20 million rows and 4096 were recently inserted. I want the queries to be scalable, not depending on the size of the table but only on the size of the expected result. In terms of time complexity, I want O(1) rather than O(N). This, in a SQL databases, is achieved by creating an index that stores the requested values clustered and ordered, where we can directly seek to the start of the specified range and read next rows from there.

Simple range index

The following range index can serve those queries:

create index pgbench_history_mtime
on pgbench_history ( mtime desc )
;
Enter fullscreen mode Exit fullscreen mode

The execution plans look good:

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select count(*) from pgbench_history
 where mtime > timestamp '2023-10-8 16:00:00'
;

                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 Aggregate (actual time=9.907..9.907 rows=1 loops=1)
   Output: count(*)
   ->  Index Only Scan using pgbench_history_mtime on public.pgbench_history (actual time=2.593..9.674 rows=4096 loops=1)
         Output: mtime
         Index Cond: (pgbench_history.mtime > '2023-10-08 16:00:00'::timestamp without time zone)
         Heap Fetches: 0
         Storage Index Read Requests: 4
         Storage Index Read Execution Time: 8.773 ms
(8 rows)

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select *
 from pgbench_history
 order by mtime desc limit 5
;

                                                    QUERY PLAN
------------------------------------------------------------------------------------------------------------------
 Limit (actual time=2.870..2.875 rows=5 loops=1)
   Output: tid, bid, aid, delta, mtime, filler
   ->  Index Scan using pgbench_history_mtime on public.pgbench_history (actual time=2.869..2.872 rows=5 loops=1)
         Output: tid, bid, aid, delta, mtime, filler
         Storage Table Read Requests: 1
         Storage Table Read Execution Time: 1.327 ms
         Storage Index Read Requests: 1
         Storage Index Read Execution Time: 1.384 ms
(8 rows)

yugabyte=# drop index pgbench_history_mtime;
DROP INDEX
Enter fullscreen mode Exit fullscreen mode

This is fast and scalable: Index Scan, with the where clause as Index Cond. and no additional Sort to fetch the Top-n rows. With the dist option, the explain analyze shows the read requests: 4 to get the 4096 rows from a range (rows are fetched by batches) and 1 to get the Top-n rows. With this, the response time stays in single-digits milliseconds.

However, as mentioned earlier, this index will create a hotpost where all concurrent insert will write their index entries into the same tablet. I remove this index for this reason: drop index pgbench_history_mtime and look for other solutions.

Here is a screenshot of my YugabyteDB 3-node cluster with pgbench running with such index:
Image description
Not all operations are well distributed. The node that holds the range for the current time receives more write operations.

Hash Index

With hash-sharding, the writes would be scalable but the two queries cannot use the index for two reasons:

  • the range of interest can be found at 65536 places. In each, the entries are ordered on mtime but getting those would require 65536 read request which can take minutes when one read is 1 millisecond
  • with hash sharding, the tablets are read with no specific order and a Top-n query will require a Sort operation.

Here are the execution plans:

yugabyte=# create index pgbench_history_mtime
 on pgbench_history ( mtime hash );
CREATE INDEX

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select count(*) from pgbench_history
 where mtime > timestamp '2023-10-8 16:00:00'
;

                                              QUERY PLAN
-----------------------------------------------------------------------------------------------------
 Finalize Aggregate (actual time=9614.158..9614.159 rows=1 loops=1)
   Output: count(*)
   ->  Seq Scan on public.pgbench_history (actual time=9614.142..9614.147 rows=6 loops=1)
         Output: tid, bid, aid, delta, mtime, filler
         Remote Filter: (pgbench_history.mtime > '2023-10-08 16:00:00'::timestamp without time zone)
         Partial Aggregate: true
         Storage Table Read Requests: 1
         Storage Table Read Execution Time: 9614.028 ms
(8 rows)

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select *
 from pgbench_history
 order by mtime desc limit 5
;

                                              QUERY PLAN
------------------------------------------------------------------------------------------------------
 Limit (actual time=132874.603..132874.607 rows=5 loops=1)
   Output: tid, bid, aid, delta, mtime, filler
   ->  Sort (actual time=132874.602..132874.603 rows=5 loops=1)
         Output: tid, bid, aid, delta, mtime, filler
         Sort Key: pgbench_history.mtime DESC
         Sort Method: top-N heapsort  Memory: 25kB
         ->  Seq Scan on public.pgbench_history (actual time=5.044..130002.114 rows=20000000 loops=1)
               Output: tid, bid, aid, delta, mtime, filler
               Storage Table Read Requests: 19533
               Storage Table Read Execution Time: 125982.337 ms
(10 rows)

yugabyte=# drop index pgbench_history_mtime;
DROP INDEX
Enter fullscreen mode Exit fullscreen mode

The range query is not so bad thanks to the expression pushdown offloading the filter to the storage (Remote Filter) but the Top-n query reads all rows (rows=20000000) to be ordered by the PostgreSQL backend (Sort) before being able to return the interesting ones. We need a better solution.

Prefixed Range Sharding

Let's combine the advantages of hash and range sharding with a range index that is prefixed by a low cardinality hash value.

I'll use the tid column which has a low cardinality (values 1 to 10) and will distribute the concurrent inserts coming from multiple tellers.

create index pgbench_history_tid_mtime
 on pgbench_history( tid asc , mtime desc )
 split at values ( (2),(3),(4),(5),(6),(7),(8),(9),(10) )
;
Enter fullscreen mode Exit fullscreen mode

While still distributing the inserts, this avoids the two problems of hash-sharding:

  • There are only 10 ranges to read rather than 65536. Even with 1 millisecond calls, this query stays around 10 milliseconds
  • The query planner knows that each range scan will preserve the order, even when split into multiple tablets, and can skip the Sort operation and return the first rows as they are fetched from the storage.

To distribute the inserts to multiple tablets, I explicitly pre-split to one tablet per prefix value. When table grows, further auto-splitting may happen automatically.

Range query with a table's column prefix

The range query doesn't need any change to use this index efficiently:

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select count(*) from pgbench_history
 where mtime > timestamp '2023-10-8 16:00:00'
;

                                                          QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
 Aggregate (actual time=15.898..15.899 rows=1 loops=1)
   Output: count(*)
   ->  Index Only Scan using pgbench_history_tid_mtime on public.pgbench_history (actual time=0.963..15.614 rows=4096 loops=1)
         Output: tid, mtime
         Index Cond: (pgbench_history.mtime > '2023-10-08 16:00:00'::timestamp without time zone)
         Heap Fetches: 0
         Storage Index Read Requests: 10
         Storage Index Read Execution Time: 14.200 ms
(8 rows)
Enter fullscreen mode Exit fullscreen mode

The number of read requests is exactly the number of tablets. There's only one seek into each tablet to get the rows within the requested range.

Top-n query with a table's column prefix

The Top-n query is not directly pushed down (at least in this version - YugabyteDB 2.19.2) but I leverage the distinct pushdown to get the values of the index prefix, and then join this to the table to get a Top-n range scan for each bucket:

yugabyte=# explain (costs off, summary off, analyze, dist)
 with buckets as (
 -- get distinct value for the first column in the index
 select distinct tid from pgbench_history
 -- workaround for https://github.com/yugabyte/yugabyte-db/issues/16771
 where tid >= 0
) select rows.*
 -- for each distinct value
 from buckets
 -- access by the index to the top-n rows
 , lateral(
  select * from pgbench_history
  where tid = buckets.tid
  order by mtime desc limit 5
 ) rows
;

                                                                                                                                                                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------
 Nested Loop (actual time=3.580..21.877 rows=50 loops=1)
   CTE buckets
     ->  Unique (actual time=0.637..0.821 rows=10 loops=1)
           ->  Index Only Scan using pgbench_history_tid_mtime on pgbench_history pgbench_history_1 (actual time=0.636..0.811 rows=10 loops=1)
                 Index Cond: (tid >= 0)
                 Heap Fetches: 0
                 Storage Index Read Requests: 10
                 Storage Index Read Execution Time: 0.507 ms
   ->  CTE Scan on buckets (actual time=0.639..0.830 rows=10 loops=1)
   ->  Limit (actual time=2.099..2.102 rows=5 loops=10)
         ->  Index Scan using pgbench_history_tid_mtime on pgbench_history (actual time=2.086..2.088 rows=5 loops=10)
               Index Cond: (tid = buckets.tid)
               Storage Table Read Requests: 1
               Storage Table Read Execution Time: 1.370 ms
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 0.665 ms
(16 rows)
Enter fullscreen mode Exit fullscreen mode

Again, the number of rad request is limited to the number of tablets to get the distinct values, and the number of nested loops is limited by the low-cardinality of this column.

Modulo-Prefixed Range Sharding

The previous index was using a low-cardinality column that already exists in the table. When there's no such column, there's no need to create one because you can index a virtual column, as an expression.

I create such expression by applying a hash function (yb_hash_code()) and and reduce the cardinality by taking the modulo ( %8 can reduce it to 8 values from 0 to 7). As my index is on mtime, I create an index that adds the (yb_hash_code(mtime)%8) expression in front of it:

create index pgbench_history_mod_mtime
 on pgbench_history( (yb_hash_code(mtime)%8) asc , mtime desc )
 split at values ( (1),(2),(3),(4),(5),(6),(7) )
;
Enter fullscreen mode Exit fullscreen mode

This is different from hash-sharding because the prefix (yb_hash_code(mtime)%8) asc is limited to 8 different values, and ordered with range sharding. This avoids the two issues of hash-sharding:

  • There are only 8 ranges to read rather than 65536.
  • The query planner knows that each range scan will preserve the order.

To distribute the inserts, I've explicitly pre-split to one tablet per prefix value. When table grows, further auto-splitting may happen automatically.

Range query with modulo-expression index prefix

The range query doesn't need any change to use this index efficiently:

yugabyte=# explain (analyze, dist, verbose, costs off, summary off)
 select count(*) from pgbench_history
 where mtime > timestamp '2023-10-8 16:00:00'
;

                                                          QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
 Aggregate (actual time=11.310..11.310 rows=1 loops=1)
   Output: count(*)
   ->  Index Only Scan using pgbench_history_mod_mtime on public.pgbench_history (actual time=2.143..11.046 rows=4096 loops=1)
         Output: ((yb_hash_code(mtime) % 8)), mtime
         Index Cond: (pgbench_history.mtime > '2023-10-08 16:00:00'::timestamp without time zone)
         Heap Fetches: 0
         Storage Index Read Requests: 8
         Storage Index Read Execution Time: 9.876 ms
(8 rows)
Enter fullscreen mode Exit fullscreen mode

Top-n query with modulo-expression index prefix

The Top-n query must explicitly get all distinct values get each range by a nested loop, with a correlated LATERAL query:

yugabyte=# explain (costs off, summary off, analyze, dist)
 with buckets as (
 -- get distinct value for the first column in the index
 select distinct ((yb_hash_code(mtime) % 8)) mod from pgbench_history
 -- workaround for https://github.com/yugabyte/yugabyte-db/issues/16771
 where ((yb_hash_code(mtime) % 8)) >= 0
) select rows.*
 -- for each distinct value
 from buckets
 -- access by the index to the top-n rows
 , lateral(
  select * from pgbench_history
  where ((yb_hash_code(mtime) % 8)) = buckets.mod
  order by mtime desc limit 5
 ) rows
;

                                                                  QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------
 Nested Loop (actual time=3.812..16.741 rows=40 loops=1)
   CTE buckets
     ->  Unique (actual time=1.327..1.456 rows=8 loops=1)
           ->  Index Only Scan using pgbench_history_mod_mtime on pgbench_history pgbench_history_1 (actual time=1.326..1.447 rows=8 loops=1)
                 Index Cond: (((yb_hash_code(mtime) % 8)) >= 0)
                 Heap Fetches: 0
                 Storage Index Read Requests: 8
                 Storage Index Read Execution Time: 1.203 ms
   ->  CTE Scan on buckets (actual time=1.329..1.464 rows=8 loops=1)
   ->  Limit (actual time=1.904..1.907 rows=5 loops=8)
         ->  Index Scan using pgbench_history_mod_mtime on pgbench_history (actual time=1.891..1.893 rows=5 loops=8)
               Index Cond: ((yb_hash_code(mtime) % 8) = buckets.mod)
               Storage Table Read Requests: 1
               Storage Table Read Execution Time: 1.126 ms
               Storage Index Read Requests: 1
Enter fullscreen mode Exit fullscreen mode

The query must use exactly the same expression that was used in the index ((yb_hash_code(mtime) % 8)) to benefit from the distinct push down. Them there's one read request per tablet to get the value, and one more read request per value to get the rows.

Here, the query returns 40 rows (8 x 5 ), and you need an additional order by time desc limit 5 to sort them, but this is fast given the number of rows. Don't forget that this relies on distinct pushdown to get all values for (yb_hash_code(mtime) % 8) but you know those values so you probably end with a query like:

yugabyte=# explain (costs off)
 with buckets(mod) as (
 select generate_series(0,7)
) select rows.*
 -- for each distinct value
 from buckets
 -- access by the index to the top-n rows
 , lateral(
  select * from pgbench_history
  where ((yb_hash_code(mtime) % 8)) = buckets.mod
  order by mtime desc limit 5
 ) rows
order by mtime desc limit 5
;
                                      QUERY PLAN
--------------------------------------------------------------------
 Limit
  CTE buckets
   -> ProjectSet
      -> Result
  -> Sort
     Sort Key: pgbench_history.mtime DESC
     -> Nested Loop
        -> CTE Scan on buckets
        -> Limit
           -> Index Scan using pgbench_history_mod_mtime on pgbench_history
              Index Cond: ((yb_hash_code(mtime) % 8) = buckets.mod)

Enter fullscreen mode Exit fullscreen mode

Scalable writes with no hotspots

To verify the scalability of writes, I've run the PgBench inserts and checked the read and write operations on the 3 nodes of my YugabyteDB cluster:
Image description
Image description
This was with the two indexes (with tid and on (yb_hash_code(mtime) % 8). Of course, you don't need the two of them. Either you use a column from the table which you know is well distributed and low cardinality, or you build one with a hash value and modulo.

This is now scalable, adding more nodes will automatically re-distributed the tablets and increase the throughput, and the range queries will still be fast even when the tables are growing.

To summarize

YugabyteDB provides all building blocks for scalable reads and writes, to optimize the infinity of use-cases that can run on an SQL database:

  • the necessary optimizations in the DocDB distributed storage to avoid too many remote calls, like the skip scan and distinct pushdowns
  • all PostgreSQL features in the YSQL query layer to build efficient queries on top of the distributed transactions, like the Common Table Expression and lateral joins

Some queries do not need any transformation, some may benefit from rewriting the queries to leverage the optimized access paths. If you need transparent rewrites for those, YugabyteDB is open source and you can contribute by opening issues or pull requests.

Top comments (0)