DEV Community

Franck Pachot for Yugabyte

Posted on

πŸ˜πŸš€ Update/Insert/Soft-Delete from a JSON payload

A user exposed his "ETL" use-case as the following:

  • a transaction table holds an ID (k1 int in my example) for with there are multiple records (k2 int for their key and v1 int,v2 int for their value in my small example)
  • the application receives a payload, in JSON, describing the new data
  • new IDs will be INSERTed
  • existing IDs will be UPDATEd
  • missing IDs will me marked as deleted (deleted boolean)
  • I add a timestamp ts timestamptz to record the last change time

Their current application uses Python Pandas to retrieve the current records, compare them with the new paysload, and write it back to the database. With a distributed database like YugabyteDB, it is more efficient to process this in the database, because that can scale (we can connecto to any node), run in one server-side transaction (easier to handle transparent retries in case of clock skew) and reduce the roundtrips (working on batches of rows). And with PostgreSQL compatibility, JSON processing and INSERT ... ON CONFLICT ... UPDATE behavior can do the same with clean and simple SQL.

Example

Here is a simple example declaring my table:

create table demo (
 k1 int, k2 int, v1 int, v2 text
 , deleted boolean, ts timestamptz
 , primary key ( k1,k2 )
);
Enter fullscreen mode Exit fullscreen mode

This works in PostgreSQL and YugabyteDB. With YugabyteDB the primary key defaults to lsm (k1 HASH, k2 ASC) which is good for my use case: distribute on the ID and colocate their records.

I'll implement the whole logic in one SQL query. WITH clauses (aka CTE - Common Table Expressions) maintain the readability:

  • payload reads my JSON payload passed as $2 into records, thanks to jsonb_to_recordset, adding the ID $1 as k1
  • to_upsert formats the payload into the destination format, setting is_deleted to false and adding the timestamp
  • to_soft_delete retrieves the current records and compare it with the payload to format them as soft deletes with is_deleted set to true
  • finally the union of to_upsert and to_soft_delete is merged with an INSERT ... ON CONFLICT ... UPDATE

Here it is as a prepared statement:

prepare etl (int, jsonb) as
with
payload as (
 select $1 as k1,* from jsonb_to_recordset($2) as payload( k2 int, v1 int, v2 text )
),
to_upsert as (
 select k1, k2, v1, v2, false as deleted, now() as ts from payload
),
to_soft_delete as (
 select k1, k2, v1, v2, true  as deleted, now() as ts from demo
 where  k1=$1
 and (k1, k2) not in ( select k1, k2 from payload)
)
insert into demo select * from to_upsert union select * from to_soft_delete
on conflict (k1, k2) do
update set v1=excluded.v1, v2=excluded.v2, deleted=excluded.deleted, ts=excluded.ts
;
Enter fullscreen mode Exit fullscreen mode

This can be coded as a stored procedure but a prepared statement avoids to parse and optimize it for each call. It is easy to prepare it in the connection pool initialization command.

My table is empty. I'll insert a payload for k1=0 with 3 records:

execute etl(0, $$
[
{"k2":1,"v1":1,"v2":"my first insert"},
{"k2":2,"v1":1,"v2":"my first insert"},
{"k2":3,"v1":1,"v2":"my first insert"}
]
$$::jsonb);
Enter fullscreen mode Exit fullscreen mode

This has inserted 3 rows:

yugabyte=# select * from demo;
 k1 | k2 | v1 |       v2        | deleted |              ts
----+----+----+-----------------+---------+-------------------------------
  0 |  1 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
  0 |  2 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
  0 |  3 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Now, on the same k1=0 the new payload updates 2 rows, removes the other and adds a new one:

execute etl(0,$$
[
{"k2":1,"v1":1,"v2":"my update"},
{"k2":2,"v1":1,"v2":"my update"},
{"k2":4,"v1":1,"v2":"my second insert"}
]
$$::jsonb);
Enter fullscreen mode Exit fullscreen mode

Here is the result:

yugabyte=# select * from demo;
 k1 | k2 | v1 |        v2        | deleted |              ts
----+----+----+------------------+---------+-------------------------------
  0 |  1 |  1 | my update        | f       | 2022-04-29 14:41:00.109561+00
  0 |  2 |  1 | my update        | f       | 2022-04-29 14:41:00.109561+00
  0 |  3 |  1 | my first insert  | t       | 2022-04-29 14:41:00.109561+00
  0 |  4 |  1 | my second insert | f       | 2022-04-29 14:41:00.109561+00
(4 rows)
Enter fullscreen mode Exit fullscreen mode

Performance

Always check the execution plan:

yugabyte=# explain (costs off, analyze) execute etl(0,$$                                                          [9/3108]
[
{"k2":1,"v1":1,"v2":"my update"},
{"k2":2,"v1":1,"v2":"my update"},
{"k2":4,"v1":1,"v2":"my second insert"}
]
$$::jsonb);
                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------
 Insert on demo (actual time=11.787..11.787 rows=0 loops=1)
   Conflict Resolution: UPDATE
   Conflict Arbiter Indexes: demo_pkey
   Tuples Inserted: 0
   Conflicting Tuples: 4
   CTE payload
     ->  Function Scan on jsonb_to_recordset payload (actual time=0.011..0.012 rows=3 loops=1)
   CTE to_upsert
     ->  CTE Scan on payload payload_1 (actual time=0.013..0.014 rows=3 loops=1)
   CTE to_soft_delete
     ->  Index Scan using demo_pkey on demo demo_1 (actual time=1.079..1.081 rows=1 loops=1)
           Index Cond: (k1 = 0)
           Filter: (NOT (hashed SubPlan 3))
           Rows Removed by Filter: 3
           SubPlan 3
             ->  CTE Scan on payload payload_2 (actual time=0.001..0.001 rows=3 loops=1)
   ->  HashAggregate (actual time=1.105..1.110 rows=4 loops=1)
         Group Key: to_upsert.k1, to_upsert.k2, to_upsert.v1, to_upsert.v2, to_upsert.deleted, to_upsert.ts
         ->  Append (actual time=0.014..1.099 rows=4 loops=1)
               ->  CTE Scan on to_upsert (actual time=0.014..0.016 rows=3 loops=1)
               ->  CTE Scan on to_soft_delete (actual time=1.080..1.082 rows=1 loops=1)
 Planning Time: 0.262 ms
 Execution Time: 11.864 ms
(23 rows)
Enter fullscreen mode Exit fullscreen mode

This reads only one tablet, the one for Index Cond: (k1 = 0), with an Index Scan on the primary key, where all rows are colocated. This is where the (k1 HASH, k2 ASC) is important.

Of course, this could be optimized, like not updating the values that are the same. But keeping code simple is probably the best choice. Optimization will depend on more precise use-case.

This query can also be easily generated from the Information Schema. You need the column definitions (k1 HASH, k2 ASC), column list (k1, k2, v1, v2), key columns (k1, k2) and set clauses (v1=excluded.v1, v2=excluded.v2).

Discussion (0)