DEV Community

Cover image for UPSERT some columns in YugabyteDB (ON CONFLICT DO UPDATE)
Franck Pachot for YugabyteDB Distributed PostgreSQL Database

Posted on • Edited on

UPSERT some columns in YugabyteDB (ON CONFLICT DO UPDATE)

You may have a set of rows to merge to an existing table with the following logic:

  • if the primary key exists, then update the columns
  • if it doesn't exit, then insert the row

In SQL there's a MERGE command, and it came to PostgreSQL in PG15.

Here I'm mentioning a few alternatives for previous versions, and for Yugabyte 2.17 which is PG11 compatible.

Example table

I'm creating the following 5 rows table. Of course, you can test with a large table. As I look at the execution plan, I know how it scales and prefer to provide a small, easy to reproduce, test case.

My goal is to have at most one read request and one write request per batch of rows.

drop table if exists demo;
create table demo(k bigint, v0 int, v1 int, PRIMARY KEY(k));
insert into demo (k, v0, v1) 
 select i, 0, 1 from generate_series(1,5) i;
create index demo_v1 on demo(v1) include (k);

yugabyte=> select * from demo order by 1;
 k | v0 | v1
---+----+----
 1 |  0 |  1
 2 |  0 |  1
 3 |  0 |  1
 4 |  0 |  1
 5 |  0 |  1
(5 rows)
Enter fullscreen mode Exit fullscreen mode

Now I want to merge the following new rows which I put in a view for simplicity:

create view demo_new(k, v1) as
 select 2*i, 2 from generate_series(1,5) i;

yugabyte=> select * from demo_new;
 k  | v1
----+----
  2 |  2
  4 |  2
  6 |  2
  8 |  2
 10 |  2
(5 rows)
Enter fullscreen mode Exit fullscreen mode

My goal is to get the following as the new state for the demo table:

yugabyte=>
 select k, old_v0 as v0, coalesce(new_v1, old_v1) as v1 from
 ( select k, v1 as new_v1           from demo_new ) as new_values
 natural full join
 ( select k, v1 as old_v1, v0 as old_v0 from demo ) as old_values
 order by k
;

 k  | v0 | v1
----+----+----
  1 |  0 |  1
  2 |  0 |  2
  3 |  0 |  1
  4 |  0 |  2
  5 |  0 |  1
  6 |    |  2
  8 |    |  2
 10 |    |  2
(8 rows)

Enter fullscreen mode Exit fullscreen mode

⚠️ undocumented setting yb_enable_upsert_mode

I mentioned this yb_enable_upsert_mode in previous blogs with the warning that it should be used only for bulk loads with no secondary indexes.

begin transaction;
set local yb_enable_upsert_mode = true;
insert into demo (k, v1) select * from demo_new;
select * from demo order by k;
explain select k,v1 from demo where v1=1 order by k;
select k,v1 from demo where v1=1 order by k;
rollback;
Enter fullscreen mode Exit fullscreen mode

The result is:

INSERT 0 5

yugabyte=> select * from demo order by k;

 k  | v0 | v1
----+----+----
  1 |  0 |  1
  2 |    |  2
  3 |  0 |  1
  4 |    |  2
  5 |  0 |  1
  6 |    |  2
  8 |    |  2
 10 |    |  2
(8 rows)

yugabyte=> select k,v1 from demo where v1=1 order by k;

 k | v1
---+----
 1 |  1
 2 |  1
 3 |  1
 4 |  1
 5 |  1
(5 rows)

Enter fullscreen mode Exit fullscreen mode

First, it doesn't keep the values that we don't want to update, like v0 here, because it writes the new version for the whole row. That may be ok for your use case. However, the secondary index is corrupted as the old value entries were not deleted and then still visible from an Index Only Scan. This works by appending the new versions to the LSM-Tree but doesn't add a tombstone for the old values.

This is the reason why this parameter is not documented and should be used only for the special case it has been build for. Of course, for these cases, it is the fastest. The idea is to provide the same performance as NoSQL for some special use cases.

Before using it, please check #12272 to see if it is documented.

INSERT ... ON CONFLICT

The traditional way in PostgreSQL is to try to INSERT the rows and detect the Duplicate Key exception to do an UPDATE:

begin transaction;
--explain (analyze, dist, costs off)
insert into demo (k, v1) select * from demo_new
 on conflict (k) do update set v1=excluded.v1;
select * from demo order by k;
rollback;
Enter fullscreen mode Exit fullscreen mode

This provides the expected result:

yugabyte=> select * from demo order by k;

 k  | v0 | v1
----+----+----
  1 |  0 |  1
  2 |  0 |  2
  3 |  0 |  1
  4 |  0 |  2
  5 |  0 |  1
  6 |    |  2
  8 |    |  2
 10 |    |  2
(8 rows)

yugabyte=> select k,v1 from demo where v1=1 order by k;
 k | v1
---+----
 1 |  1
 3 |  1
 5 |  1
(3 rows)
Enter fullscreen mode Exit fullscreen mode

However, from a performance point of view, as of Yugabytedb 2.17, this does a write request for each row:

yugabyte=> explain (analyze, dist, costs off)
           insert into demo (k, v1) select * from demo_new
           on conflict (k) do update set v1=excluded.v1;

                                     QUERY PLAN
------------------------------------------------------------------------------------
 Insert on demo (actual time=12.184..12.184 rows=0 loops=1)
   Conflict Resolution: UPDATE
   Conflict Arbiter Indexes: demo_pkey
   Tuples Inserted: 3
   Conflicting Tuples: 2
   ->  Function Scan on generate_series i (actual time=0.014..0.020 rows=5 loops=1)
 Planning Time: 0.065 ms
 Execution Time: 14.141 ms
 Storage Read Requests: 0
 Storage Write Requests: 5
 Storage Execution Time: 8.000 ms
 Peak Memory Usage: 44 kB
(12 rows)
Enter fullscreen mode Exit fullscreen mode

The Storage Write Requests: 5 shows the number of writes, and they are network calls. With more rows, this doesn't scale. In a Distributed SQL database, you cannot avoid the latency between nodes, and then all operations must be batched.

WITH ... UPDATE ... RETURNING ... INSERT ... NOT IN

Being PostgreSQL compatible, the power of SQL with Common Table Expressions (CTE) helps to build a statement compound of multiple operations. Here is how we can UPDATE, get the keys updated with RETURNING, and INSERT the remaining ones:

begin transaction;
set yb_bnl_batch_size=1000;
explain (analyze, dist, costs off)
with
upsert(k,v1) as (
 select * from demo_new
),
updated(k) as (
 update demo set v1=upsert.v1
 from upsert where demo.k=upsert.k
 returning demo.k
)
insert into demo(k,v1) select upsert.k, upsert.v1 from
  upsert where k not in (select k from updated)
;
select * from demo order by k;
rollback;
Enter fullscreen mode Exit fullscreen mode

This gets the result we want:

yugabyte=> select * from demo order by k;

 k  | v0 | v1
----+----+----
  1 |  0 |  1
  2 |  0 |  2
  3 |  0 |  1
  4 |  0 |  2
  5 |  0 |  1
  6 |    |  2
  8 |    |  2
 10 |    |  2
(8 rows)

yugabyte=> select k,v1 from demo where v1=1 order by k;
 k | v1
---+----
 1 |  1
 3 |  1
 5 |  1
(3 rows)
Enter fullscreen mode Exit fullscreen mode

With default statistics, the query planner chooses a Nested Loop and then 5 read requests for the UPDATE and 3 Write Requests for the INSERT.

Hash Join and Merge Join can be used but the best is to enable the YugabyteDB Batched Nested Loop with yb_bnl_batch_size=1000 and the execution plan becomes:

                                                         QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------
 Insert on demo (actual time=0.984..0.984 rows=0 loops=1)
   CTE upsert
     ->  Function Scan on generate_series i (actual time=0.013..0.014 rows=5 loops=1)
   CTE updated
     ->  Update on demo demo_1 (actual time=0.858..0.888 rows=2 loops=1)
           ->  YB Batched Nested Loop Join (actual time=0.818..0.835 rows=2 loops=1)
                 Join Filter: (demo_1.k = upsert_1.k)
                 ->  CTE Scan on upsert upsert_1 (actual time=0.003..0.005 rows=5 loops=1)
                 ->  Index Scan using demo_pkey on demo demo_1 (actual time=0.736..0.749 rows=2 loops=1)
                       Index Cond: (k = ANY (ARRAY[(upsert_1.k)::bigint, ($3)::bigint, ($4)::bigint, ..., ($1001)::bigint]))
                       Storage Index Read Requests: 1
                       Storage Index Execution Time: 0.000 ms
   ->  CTE Scan on upsert (actual time=0.928..0.930 rows=3 loops=1)
         Filter: (NOT (hashed SubPlan 3))
         Rows Removed by Filter: 2
         SubPlan 3
           ->  CTE Scan on updated (actual time=0.859..0.890 rows=2 loops=1)
 Planning Time: 2.040 ms
 Execution Time: 6.655 ms
 Storage Read Requests: 1
 Storage Write Requests: 1
 Storage Execution Time: 4.000 ms
 Peak Memory Usage: 4564 kB
(23 rows)
Enter fullscreen mode Exit fullscreen mode

This is only one Read Request and one Write Request, which is optimal and scales because 1024 rows can be batched by default.

Note that the feedback from the statement is the one from the last operation: INSERT 0 3

If you want more detail about the number of updates and inserts, the WITH clause is powerful:

with
upsert(k,v1) as (
 select * from demo_new
),
updated(k) as (
 update demo set v1=upsert.v1
 from upsert where demo.k=upsert.k
 returning demo.k
),
inserted(k) as (
 insert into demo(k,v1) select upsert.k, upsert.v1 from
  upsert where k not in (select k from updated)
  returning k
) select * from 
 (select count(*) as updated from updated) as update_count,
 (select count(*) as inserted from inserted) as insert_count
;
Enter fullscreen mode Exit fullscreen mode

This returns:

 updated | inserted
---------+----------
       2 |        3
(1 row)
Enter fullscreen mode Exit fullscreen mode

This technique I used in YugabyteDB is compatible with PostgreSQL and still a good alternative to MERGE which returns only the total number of rows without the detail of inserted and updated.

Deduplication

If your input has duplicates, this query will fail because it cannot determine which value to update. In this case you should do the deduplication. In the query above, replacing the with upsert as() by the following will keep only the smallest v1:

with deduplication(k, v1) as (
 select k, v1
 , row_number() over (partition by k order by v1) > 1 as "duplicate?"
 from demo_new
),
upsert(k,v1) as (
 select * from deduplication where not "duplicate?"
),
Enter fullscreen mode Exit fullscreen mode

Doing nothing is not faster

This behavior is for all ON CONFLICT until we improved how it is processed. For ON CONFLICT DO NOTHING, better replace it with a NOT IN as in the following example:

Top comments (0)