The Nested Loop join is attractive in Distributed SQL databases because it can push down the join condition rather than reading all rows from the distributed storage and joining them in the SQL layer. Using the same Nested Loop as in PostgreSQL will result in a remote read for each outer row, which may negate the benefits of this optimization. YugabyteDB introduced the Batched Nested Loop, where multiple outer rows are read and pushed down as a batch to reduce the number of loops. With a batching size of 1000 rows, the latency, typically 1 millisecond in a single region, will finally add only 1 microsecond per row.
The batching uses another feature of YugabyteDB, loose index scan, which can read multiple ranges in one Index Scan, typically from a IN()
with a list of values or =ANY()
with an array. When a Nested Loop pushes a condition like Index Cond: (id = outer.id)
, the batched version pushes Index Cond: (id = ANY (ARRAY[outer.id, $1, $2, ..., $1023]))
.
Many join use equality predicates between columns from the outer table and columns from the inner table, like the example above where Index Cond: (id = outer.id)
is transformed to Index Cond: (id = ANY (ARRAY[outer.id, $1, $2, ..., $1023]))
If an expression is applied on the outer column, it is simply pushed down to the array. For example, Index Cond: (id = upper(outer.id))
is transformed to Index Cond: (id = ANY (ARRAY[upper(outer.id), $1, $2, ..., $1023]))
If there are multiple columns, they are combined with ROW()
and pushed down, like Index Cond: ((a = outer.a) AND (b = outer.b))
being transformed to Index Cond: (ROW(a, b) = ANY (ARRAY[ROW(outer.a, outer.b), ROW($1, $1025), ROW($2, $1026), ..., ROW($1023, $2047)]))
if there's an index on (a,b)
.
I'll run some examples with the following tables and settings:
drop table if exists demo1, demo2;
create table demo1 (a text, b int, c float, primary key(a,b));
create table demo2 (a text, b int, c float, primary key(a,b));
insert into demo1 select generate_series(1,1000), 1, 0.5;
insert into demo2 select generate_series(1,1000), 1, random();
set yb_bnl_batch_size=1024;
set enable_hashjoin=off;
set enable_mergejoin=off;
A join with equality on one column is batched, joining 1000 rows in one Read Request:
yugabyte=# explain (costs off, analyze, dist)
select demo1.a demo1_a, demo2.a demo2_a, demo1.b demo1_b, demo2.b demo2_b, demo1.c demo1_c, demo2.c demo2_c
from demo1 join demo2 on demo1.a=demo2.a
;
QUERY PLAN
-----------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=8.902..10.020 rows=1000 loops=1)
Join Filter: (demo1.a = demo2.a)
-> Seq Scan on demo1 (actual time=0.645..2.002 rows=1000 loops=1)
Storage Table Read Requests: 3
Storage Table Read Execution Time: 1.679 ms
-> Index Scan using demo2_pkey on demo2 (actual time=5.822..6.208 rows=1000 loops=1)
Index Cond: (a = ANY (ARRAY[demo1.a, $1, $2, ..., $1023]))
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.615 ms
Planning Time: 6.185 ms
Execution Time: 10.911 ms
Storage Read Requests: 4
Storage Read Execution Time: 5.294 ms
It is the same with multiple columns:
yugabyte=# explain (costs off, analyze, dist)
select demo1.a demo1_a, demo2.a demo2_a, demo1.b demo1_b, demo2.b demo2_b, demo1.c demo1_c, demo2.c demo2_c
from demo1 join demo2 on demo1.a=demo2.a and demo1.b=demo2.b
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------
YB Batched Nested Loop Join (actual time=10.082..11.274 rows=1000 loops=1)
Join Filter: ((demo1.a = demo2.a) AND (demo1.b = demo2.b))
-> Seq Scan on demo1 (actual time=0.673..1.930 rows=1000 loops=1)
Storage Table Read Requests: 3
Storage Table Read Execution Time: 1.596 ms
-> Index Scan using demo2_pkey on demo2 (actual time=7.438..7.829 rows=1000 loops=1)
Index Cond: (ROW(a, b) = ANY (ARRAY[ROW(demo1.a, demo1.b), ROW($1, $1025), ROW($2, $1026), ..., ROW($1023, $2047)]))
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.004 ms
Planning Time: 3.105 ms
Execution Time: 13.535 ms
Storage Read Requests: 4
Storage Read Execution Time: 4.601 ms
However, if I add a join predicate that is not batchable, like an inequality between columns from the outer and inner table:
yugabyte=# explain (costs off, analyze, dist)
select demo1.a demo1_a, demo2.a demo2_a, demo1.b demo1_b, demo2.b demo2_b, demo1.c demo1_c, demo2.c demo2_c
from demo1 join demo2 on demo1.a=demo2.a and demo1.b=demo2.b
and demo2.c>demo1.c
;
QUERY PLAN
-----------------------------------------------------------------------------------------
Nested Loop (actual time=1.252..469.154 rows=501 loops=1)
-> Seq Scan on demo1 (actual time=0.678..1.663 rows=1000 loops=1)
Storage Table Read Requests: 3
Storage Table Read Execution Time: 0.522 ms
-> Index Scan using demo2_pkey on demo2 (actual time=0.451..0.451 rows=1 loops=1000)
Index Cond: ((a = demo1.a) AND (b = demo1.b))
Filter: (c > demo1.c)
Rows Removed by Filter: 0
Storage Table Read Requests: 1
Storage Table Read Execution Time: 0.404 ms
Planning Time: 1.926 ms
Execution Time: 469.524 ms
Storage Read Requests: 1003
Storage Read Execution Time: 404.518 ms
The join is not batched because of the additional filter that uses columns from both tables: Filter: (c > demo1.c)
, resulting to a thousand loops and increasing the latency because of a thousand of read requests.
What to do if a condition is not unbatchable?
I can split the predictates into the batchable and unbatchable, run the batchable join, project all columns needed for additional filtering and apply the remaining predicates on the join result. Of course it reads more rows from the storage, but doing so with less read requests is still advantageous:
yugabyte=# explain (costs off, analyze, dist)
with batchable as (
select demo1.a demo1_a, demo2.a demo2_a, demo1.b demo1_b, demo2.b demo2_b, demo1.c demo1_c, demo2.c demo2_c
from demo1 join demo2 on demo1.a=demo2.a and demo1.b=demo2.b
)
select * from batchable where demo2_c>demo1_c
;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on batchable (actual time=9.841..11.570 rows=501 loops=1)
Filter: (demo2_c > demo1_c)
Rows Removed by Filter: 499
CTE batchable
-> YB Batched Nested Loop Join (actual time=9.836..11.056 rows=1000 loops=1)
Join Filter: ((demo1.a = demo2.a) AND (demo1.b = demo2.b))
-> Seq Scan on demo1 (actual time=0.746..1.895 rows=1000 loops=1)
Storage Table Read Requests: 3
Storage Table Read Execution Time: 1.527 ms
-> Index Scan using demo2_pkey on demo2 (actual time=7.284..7.667 rows=1000 loops=1)
Index Cond: (ROW(a, b) = ANY (ARRAY[ROW(demo1.a, demo1.b), ROW($1, $1025), ROW($2, $1026), ..., ROW($1023, $2047)]))
Storage Table Read Requests: 1
Storage Table Read Execution Time: 3.583 ms
Planning Time: 1.882 ms
Execution Time: 12.645 ms
Storage Read Requests: 4
Storage Read Execution Time: 5.110 ms
Note that I rely on the fact that the Common Table Expression (CTE) in the WITH clause is materialized. That's the behavior of PostgreSQL 11 which is used by YugabyteDB version I'm using here (2.19.3) but in future versions, the materialize keyword may be necessary to avoid inlining. In case of doubt, check the execution plan.
If you want to know more about batchable and unbatchable conditions, you can ask the YugabyteDB documentation. Yes, literally "ask" because a GenAI Bot enriches the response with git issues and commit information:
If you want a more comprehensive list of examples with batchable and unbatchable conditions, the best is probably looking at the regression tests: src/test/regress/expected/yb_join_batching.out
Top comments (0)