DEV Community

Cover image for Batching and Push-Downs to Distribute with High-Performance reads and writes

Batching and Push-Downs to Distribute with High-Performance reads and writes

It wouldn't be efficient to read all rows scanned by a query and replicate each write synchronously to the quorum, especially in a geo-distributed cluster where the node latency is higher than a millisecond. The two-layer architecture still allows efficient communication. Let's consider two examples: one for reading and one for writing.

Batching the read requests

The PostgreSQL code is primarily used in the query layer but is also available in the storage layer. In this setup, some SQL processing can be offloaded to distributed storage. When it's possible to reduce the number of rows to fetch from the storage, the query layer can push down the filtering conditions (WHERE clause), aggregations (GROUP BY), or other row limiting functions (FETCH FIRST ROWS ONLY or LIMIT) with the read operation.
With this information, the table or index scan becomes a smart scan that can filter while reading (visible in the execution plan as a Storage Filter) and calculate partial aggregates before returning the result to the query layer. This reduces the network calls and their size for read operations. Thanks to using PostgreSQL, the User Defined Functions (UDFs) are inlined and can be pushed down so that they are fully distributed.

The following example illustrates how PostgreSQL inlines the User Defined Function (UDF), while YugabyteDB enhancement pushes it down as a Storage Filter.

  • During the query execution, 10014 rows were scanned without being sent over the network. 458 rows from the outer table were returned in the order of the index, eliminating the need for a sort operation.
  • The pagination limit was also pushed down, resulting in the return of 5 rows in a single read request.
  • The PostgreSQL nested loop join technique was employed to push down the join filter. YugabyteDB enhancement further optimized the join by transforming the join condition into an array, allowing for a batched nested loop to reduce the number of loops.
yugabyte=# create function earnings( sal int, comm int)
 returns int as $UDF$
 select sal + case when comm is not null then comm else 0 end
$UDF$ language SQL ;

CREATE FUNCTION

yugabyte=# explain (analyze, dist, costs off, summary off)
            select * from dept
             join emp using(deptno)
             where earnings(sal,comm)>42
             and dept.loc != 'DALLAS'
              order by emp.empno fetch first 5 rows only
;
                                              QUERY PLAN
------------------------------------------------------------------------------------------------------
 Limit (actual time=1.332..8.805 rows=5 loops=1)
   ->  Result (actual time=1.331..8.802 rows=5 loops=1)
         ->  YB Batched Nested Loop Join (actual time=1.330..8.799 rows=5 loops=1)
               Join Filter: (dept.deptno = emp.deptno)
               Sort Keys: emp.empno
               ->  Index Scan using pk_emp on emp (actual time=0.643..6.561 rows=458 loops=1)
                     Storage Filter: ((sal + CASE WHEN (comm IS NOT NULL) THEN comm ELSE 0 END) > 42)
                     Storage Table Read Requests: 2
                     Storage Table Read Execution Time: 6.149 ms
                     Storage Table Rows Scanned: 10014
               ->  Index Scan using pk_dept on dept (actual time=0.664..0.666 rows=2 loops=2)
                     Index Cond: (deptno = ANY (ARRAY[emp.deptno, $1, $2, ..., $1023]))
                     Storage Filter: (loc <> 'DALLAS'::text)
                     Storage Table Read Requests: 1
                     Storage Table Read Execution Time: 0.604 ms
                     Storage Table Rows Scanned: 2

Enter fullscreen mode Exit fullscreen mode

As a result, this query required only two read requests, minimizing the impact of network latency and resulting in a response time in the single digit milliseconds.

Buffering the write requests

Instead of mixing synchronous and asynchronous writes, YugabyteDB replicates all writes synchronously to the Raft quorum. However, the SQL write operations are batched into one Raft write, so the latency doesn't impact the response time for each row. The writes from multiple inserts, deletes, and updates are buffered to be sent to the tablet leaders. They are flushed only when necessary, typically when the SQL transaction commits or requires acknowledgment of the write before a read or response to the application.

In the example below, you can see how YugabyteDB decreases the number of raft consensus syncs to the quorum. 2678 rows were updated, which translates to 2678 write requests. However, these requests were buffered and sent to the tablets in the distributed storage in just one flush request:

yugabyte=# explain (analyze, dist, costs off, summary on)
with
u as (
 update emp set comm=10 where deptno in (10,20,30) and sal>32
)
select;
                                        QUERY PLAN
-------------------------------------------------------------------------------------------
 Result (actual time=0.000..0.001 rows=1 loops=1)
   CTE u
     ->  Update on emp (actual time=51.740..51.740 rows=0 loops=1)
           ->  Seq Scan on emp (actual time=6.809..42.497 rows=2678 loops=1)
                 Storage Filter: ((sal > 32) AND (deptno = ANY ('{10,20,30}'::integer[])))
                 Storage Table Read Requests: 3
                 Storage Table Read Execution Time: 12.474 ms
                 Storage Table Rows Scanned: 10014
                 Storage Table Write Requests: 2678
                 Storage Flush Requests: 1
                 Storage Flush Execution Time: 26.780 ms
 Planning Time: 0.083 ms
 Execution Time: 101.509 ms
 Storage Read Requests: 3
 Storage Read Execution Time: 12.474 ms
 Storage Rows Scanned: 10014
 Storage Write Requests: 2678
 Catalog Read Requests: 0
 Catalog Write Requests: 0
 Storage Flush Requests: 2
 Storage Flush Execution Time: 75.286 ms
 Storage Execution Time: 87.759 ms
 Peak Memory Usage: 72 kB
Enter fullscreen mode Exit fullscreen mode

Some databases do not sync the Raft log before committing, but the application must roll back and retry the transactions in case of failure. To maintain high availability, YugabyteDB minimizes transaction cancellations as much as possible.


In the Oracle patent for their Raft implementation, we can read a strange and incorrect claim about YugabyteDB:
Image description
This is wrong. The current post explains that writes in YugabytDB are buffered to be replicated in sync, and it has nothing to do with single-key. The previous post describes how changes are applied to the follower, and the next post will explain what is optimized for single-key transactions.


💡Use standard SQL to allow efficient buffering

As the batching possibilities depend on the transaction flow, it is better to use all SQL features, such as multi-value inserts, Common Table Expressions, or returning clauses, to define a declarative query that the database can batch rather than row-by-row procedural code or equivalent.

Here is an example.
Instead of the PostgreSQL ON CONFLICT insert that is not SQL standard:

yugabyte=# explain (analyze, dist, costs off, summary off)
 insert into emp(ename,deptno)
 values ('SMITH',20),('ALLEN',30),('WARD',30),('JONES',20),('MARTIN',30),('BLAKE',30),('CLARK',10),
        ('SCOTT',20),('KING',10),('TURNER',30),('ADAMS',20),('JAMES',30),('FORD',20),('MILLER',10),
        ('ZEUS',10),('HERA',20),('POSEIDON',30),('DEMETER',10),('ATHENA',20),
        ('APOLLO',30),('ARTEMIS',10),('ARES',20),('APHRODITE',30),('HEPHAESTUS',10),
        ('HERMES',20),('HESTIA',30),('DIONYSUS',10),('PERSEPHONE',20),('HADES',30)
on conflict (ename) do nothing;
                                 QUERY PLAN
----------------------------------------------------------------------------
 Insert on emp (actual time=61.445..61.445 rows=0 loops=1)
   Conflict Resolution: NOTHING
   Conflict Arbiter Indexes: emp_ename_deptno_idx
   Tuples Inserted: 15
   Conflicting Tuples: 14
   ->  Values Scan on "*VALUES*" (actual time=5.600..5.765 rows=29 loops=1)
         Storage Table Read Requests: 14
         Storage Table Read Execution Time: 4.378 ms
         Storage Table Rows Scanned: 14
         Storage Index Read Requests: 29
         Storage Index Read Execution Time: 16.673 ms
         Storage Index Rows Scanned: 14
         Storage Table Write Requests: 15
         Storage Index Write Requests: 30
         Storage Flush Requests: 14
         Storage Flush Execution Time: 29.065 ms
 Planning Time: 3.053 ms
 Execution Time: 63.266 ms
 Storage Read Requests: 43
 Storage Read Execution Time: 21.052 ms
 Storage Rows Scanned: 28
 Storage Write Requests: 45
 Catalog Read Requests: 14
 Catalog Read Execution Time: 12.376 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 15
 Storage Flush Execution Time: 30.781 ms
 Storage Execution Time: 64.209 ms
 Peak Memory Usage: 73 kB
Enter fullscreen mode Exit fullscreen mode

and shows 15 Flush Requests for 45 Write Requests,

you can use the SQL standard Common Table Expression and anti-join:

yugabyte=# explain (analyze, dist, costs off, summary on)
 with new(ename,deptno) as (
 values ('SMITH',20),('ALLEN',30),('WARD',30),('JONES',20),('MARTIN',30),('BLAKE',30),('CLARK',10),
        ('SCOTT',20),('KING',10),('TURNER',30),('ADAMS',20),('JAMES',30),('FORD',20),('MILLER',10),
        ('ZEUS',10),('HERA',20),('POSEIDON',30),('DEMETER',10),('ATHENA',20),
        ('APOLLO',30),('ARTEMIS',10),('ARES',20),('APHRODITE',30),('HEPHAESTUS',10),
        ('HERMES',20),('HESTIA',30),('DIONYSUS',10),('PERSEPHONE',20),('HADES',30)
) insert into emp(ename,deptno)
 select ename,deptno from new
 where not exists ( select from emp where emp.ename=new.ename)
;

                                                   QUERY PLAN
----------------------------------------------------------------------------------------------------------------
 Insert on emp (actual time=7.297..7.297 rows=0 loops=1)
   CTE new
     ->  Values Scan on "*VALUES*" (actual time=0.001..0.009 rows=29 loops=1)
   ->  YB Batched Nested Loop Anti Join (actual time=6.914..6.940 rows=15 loops=1)
         Join Filter: (emp_1.ename = new.ename)
         ->  CTE Scan on new (actual time=0.004..0.020 rows=29 loops=1)
         ->  Index Only Scan using emp_ename_deptno_idx on emp emp_1 (actual time=1.795..1.802 rows=14 loops=1)
               Index Cond: (ename = ANY (ARRAY[new.ename, $3, $4, ..., $1025]))
               Heap Fetches: 0
               Storage Index Read Requests: 1
               Storage Index Read Execution Time: 0.736 ms
               Storage Index Rows Scanned: 14
 Planning Time: 5.445 ms
 Execution Time: 17.147 ms
 Storage Read Requests: 1
 Storage Read Execution Time: 0.736 ms
 Storage Rows Scanned: 14
 Storage Write Requests: 45
 Catalog Read Requests: 16
 Catalog Read Execution Time: 11.434 ms
 Catalog Write Requests: 0
 Storage Flush Requests: 1
 Storage Flush Execution Time: 9.477 ms
 Storage Execution Time: 21.647 ms
 Peak Memory Usage: 512 kB
Enter fullscreen mode Exit fullscreen mode

which does only one Flush Requests for 45 Write Requests.

Being PostgreSQL-compatible makes it easy, given PostgreSQL's extensive SQL features, and being a fork rather than an extension allows YugabyteDB optimizations in any part of the code.

Top comments (0)