Modern applications demand long‑lasting workflows that can withstand crashes, restarts, and failures, often relying on external platforms. pg_durable integrates durable workflow orchestration directly into PostgreSQL, enabling workflows to be created in SQL and reliably executed, monitored, inspected, and recovered from disruptions.
This tool is particularly useful for ETL processes, data pipelines, background tasks, scheduled jobs, long‑running business procedures, and internal workflow management.
To truly grasp its capabilities, the best approach is to try it yourself. I did so by building and running it locally, which provided practical insight.
Installing pg_durable
I installed Rust to compile the extension:
curl https://sh.rustup.rs -sSf | sh
. "$HOME/.cargo/env"
I installed the PostgreSQL build dependencies.
sudo dnf install -y openssl-devel pkg-config libicu-devel readline-devel flex
I installed pgrx for PostgreSQL 17:
cargo install cargo-pgrx --version 0.16.1 --locked
cargo pgrx init --pg17=download
export PG_VERSION=pg17
I enabled the extension preload library:
sed -e "/shared_preload_libraries/s/^.*$/shared_preload_libraries='pg_durable'/" \
-i ~/.pgrx/data-17/postgresql.conf
I declared the superuser that I'll create to run the background worker:
cat >> ~/.pgrx/data-17/postgresql.conf <<<"pg_durable.worker_role='superworker'"
I cloned pg_durable from the Microsoft repository:
git clone https://github.com/microsoft/pg_durable.git
cd pg_durable
I built the extension, started PostgreSQL, and got a psql prompt:
cargo pgrx run
I checked the pg_durable configuration, connected to the pg_durable.database, created the pg_durable.worker_role user, and installed the extension:
pg_durable=# \dconfig pg_durable.*
List of configuration parameters
Parameter | Value
---------------------------------------+-------------
pg_durable.database | postgres
pg_durable.enable_superuser_instances | off
pg_durable.execution_acquire_timeout | 30
pg_durable.max_duroxide_connections | 10
pg_durable.max_management_connections | 6
pg_durable.max_user_connections | 10
pg_durable.worker_role | superworker
(7 rows)
\c postgres
CREATE ROLE superworker SUPERUSER LOGIN;
CREATE EXTENSION pg_durable;
My PostgreSQL instance is now ready to use durable functions.
Hello World
I create a user that will execute workflows and grant access to pg_durable:
CREATE USER franck;
GRANT CREATE ON DATABASE postgres TO franck;
SELECT df.grant_usage('franck');
I reconnect as that user:
postgres=# \c postgres franck
You are now connected to the database "postgres" as user "franck".
postgres=>
I start a durable workflow:
postgres=> SELECT df.start(
$sql$ SELECT 'Hello, durable world!' AS message $sql$
);
start
----------
393f09e0
(1 row)
As the SQL statements can contain quotes, I used PostgreSQL dollar quoting, that can be $$ with an optional tag inside.
One second later, it is completed:
postgres=> SELECT df.status('393f09e0');
status
-----------
completed
(1 row)
The result can be fetched as a document with a list of rows:
postgres=> SELECT df.result('393f09e0');
result
------------------------------------------------------------------
{"rows": [{"message": "Hello, durable world!"}], "row_count": 1}
(1 row)
I used the same "Hello, durable world!" example as in the HorizonDB durable functions documentation, because this is where pg_durable comes from: Microsoft provides it to PostgreSQL as an open-source extension.
Here is more information about this execution:
postgres=> \x
Expanded display is on.
postgres=> SELECT * FROM df.list_instances();
-[ RECORD 1 ]---+-------------------------------------------------------------
instance_id | 393f09e0
label |
function_name | pg_durable::orchestration::execute-function-graph
status | completed
execution_count | 1
output | {"rows":[{"message":"Hello, durable world!"}],"row_count":1}
postgres=> \x
Expanded display is off.
postgres=>
The execution graph is basic, as there's a single operation:
postgres=> SELECT df.explain('393f09e0');
explain
------------------------------------------------------------------------
Instance: 393f09e0 +
Status: ✓ Completed +
Output: {"rows":[{"message":"Hello, durable world!"}],"row_count":1}+
+
SQL: SELECT 'Hello, durable world!' AS message ✓
(1 row)
There is a single node:
postgres=> \x
Expanded display is on.
postgres=> SELECT * from df.instance_nodes('393f09e0');
-[ RECORD 1 ]+-----------------------------------------------------------------
execution_id | 1
node_id | 60e433bb
node_type | SQL
query | SELECT 'Hello, durable world!' AS message
result_name |
left_node |
right_node |
status | completed
result | {"rows": [{"message": "Hello, durable world!"}], "row_count": 1}
updated_at | 2026-06-04 16:36:38.594906+00
postgres=> \x
Expanded display is off.
It was executed once:
postgres=> SELECT * from df.instance_executions('393f09e0');
execution_id | status | event_count | duration_ms | output
--------------+-----------+-------------+-------------+--------------------------------------------------------------
1 | Completed | 14 | 692 | {"rows":[{"message":"Hello, durable world!"}],"row_count":1}
(1 row)
Here are the global metrics:
postgres=> SELECT * FROM df.metrics();
total_instances | running_instances | completed_instances | failed_instances | total_executions | total_events
-----------------+-------------------+---------------------+------------------+------------------+--------------
1 | 0 | 1 | 0 | 1 | 14
(1 row)
This simply looks like ADBA—Asynchronous Database Access, but it can do more. We will see that it can define complex workflows with a declarative syntax. In the next examples I'll use \gset in PSQL to get the ID into a variable.
Beyond a Single Query
While a simple SQL statement demonstrates the basics, pg_durable becomes much more powerful when orchestrating multiple operations.
The SQL workflow DSL supports:
- Variable capture:
=>) - Sequential execution:
~> - Parallel execution and wait:
&- Parallel execution and race:| - Conditional branching:
?>and!> - Infinite loops:
@>
Here are simple table-free examples demonstrating pg_durable's key features. Let's start with a sequential execution (~>):
postgres=> SELECT df.start(
'SELECT now() as step1' |=> 't1'
~> 'SELECT pg_sleep(5)'
~> 'SELECT now() as step2' |=> 't2'
~> 'SELECT now() as step3' |=> 't3',
'sequential-timing'
) as i \gset
-- Check the results after five seconds
postgres=> SELECT df.result(:'i');
result
---------------------------------------------------------------------------
{"rows": [{"step3": "2026-06-04T21:33:59.939785+00:00"}], "row_count": 1}
(1 row)
postgres=> SELECT node_type, result_name, result, node_id, left_node, right_node
FROM df.instance_nodes(:'i');
node_type | result_name | result | node_id | left_node | right_node
-----------+-------------+---------------------------------------------------------------------------+----------+-----------+------------
THEN | t3 | {"rows": [{"step3": "2026-06-04T21:33:59.939785+00:00"}], "row_count": 1} | 49c9c8b4 | 48fcec43 | 9f818758
THEN | t2 | {"rows": [{"step2": "2026-06-04T21:33:59.507283+00:00"}], "row_count": 1} | 48fcec43 | 348c1948 | c78f2200
THEN | | {"rows": [{"pg_sleep": null}], "row_count": 1} | 348c1948 | c7317ea8 | 200f8a52
SQL | t1 | {"rows": [{"step1": "2026-06-04T21:33:53.746030+00:00"}], "row_count": 1} | c7317ea8 | |
SQL | | {"rows": [{"pg_sleep": null}], "row_count": 1} | 200f8a52 | |
SQL | | {"rows": [{"step2": "2026-06-04T21:33:59.507283+00:00"}], "row_count": 1} | c78f2200 | |
SQL | | {"rows": [{"step3": "2026-06-04T21:33:59.939785+00:00"}], "row_count": 1} | 9f818758 | |
(7 rows)
I displayed the timestamps to show that steps 2 and 3 occur 5 seconds after step 1 because I used pg_sleep(5) in the sequential execution.
The node_type column indicates the type of operation each node represents in the function graph. Here, THEN denotes sequential execution: running the left child, then the right child, and SQL is the leaf node with no children, executing the SQL query.
The complete list of valid node types in pg_durable is:
| node_type | Description | Operator/Function |
|---|---|---|
| SQL | Execute SQL query | Plain string or df.sql()
|
| THEN | Sequential execution |
~> operator or df.seq()
|
| IF | Conditional branch |
?> !> operators or df.if()
|
| JOIN | Parallel execution (wait all) |
& operator or df.join()
|
| RACE | Parallel execution (first wins) |
\ operator or df.race()
|
| LOOP | Infinite loop |
@> operator or df.loop()
|
| BREAK | Exit loop | df.break() |
| SLEEP | Pause execution | df.sleep() |
| WAIT_SCHEDULE | Wait for cron schedule | df.wait_for_schedule() |
| HTTP | Make HTTP request | df.http() |
| SIGNAL | Wait for external signal | df.wait_for_signal() |
We can execute steps in parallel (&):
postgres=> SELECT df.start(
('SELECT now() as branch1' |=> 'b1' ~> 'SELECT pg_sleep(20)')
&
('SELECT now() as branch2' |=> 'b2' ~> 'SELECT pg_sleep(10)')
~> 'SELECT now() as after_join' |=> 'final',
'parallel-sleep'
) as i \gset
postgres=> SELECT node_type, result_name, result, node_id, left_node, right_node
FROM df.instance_nodes(:'i');
node_type | result_name | result | node_id | left_node | right_node
-----------+-------------+--------------------------------------------------------------------------------------------------+----------+-----------+------------
SQL | b1 | {"rows": [{"branch1": "2026-06-04T21:53:47.870539+00:00"}], "row_count": 1} | 9d479c96 | |
SQL | b2 | {"rows": [{"branch2": "2026-06-04T21:53:47.870606+00:00"}], "row_count": 1} | cb844cd8 | |
SQL | | {"rows": [{"pg_sleep": null}], "row_count": 1} | ad8ec143 | |
THEN | | {"rows": [{"pg_sleep": null}], "row_count": 1} | 1ff3adc8 | cb844cd8 | ad8ec143
SQL | | {"rows": [{"pg_sleep": null}], "row_count": 1} | 9db57c15 | |
THEN | | {"rows": [{"pg_sleep": null}], "row_count": 1} | 5e4d8069 | 9d479c96 | 9db57c15
JOIN | | [{"rows": [{"pg_sleep": null}], "row_count": 1}, {"rows": [{"pg_sleep": null}], "row_count": 1}] | 6f95c84d | 5e4d8069 | 1ff3adc8
SQL | | {"rows": [{"after_join": "2026-06-04T21:54:08.752247+00:00"}], "row_count": 1} | d4be28af | |
THEN | final | {"rows": [{"after_join": "2026-06-04T21:54:08.752247+00:00"}], "row_count": 1} | df59e250 | 6f95c84d | d4be28af
(9 rows)
Here, b1 and b2 started at the same time, in parallel. One branch completed after 20 seconds and the other after 10 seconds, so the final step started after 20 seconds, and the total duration was 20 seconds—it would have been more than 30 seconds if executed serially:
postgres=> SELECT * from df.instance_executions(:'i');
execution_id | status | event_count | duration_ms | output
--------------+-----------+-------------+-------------+----------------------------------------------------------------------------
1 | Completed | 26 | 22107 | {"rows":[{"after_join":"2026-06-04T21:54:08.752247+00:00"}],"row_count":1}
(1 row)
Parallel nodes are used when we need the result from all operations. The next step awaits all. If we want to continue as soon as one branch gets a result, we can run a race, where the first one wins (|):
postgres=> SELECT df.start(
'SELECT pg_sleep(30)' | 'SELECT pg_sleep(90)' ~> 'SELECT ''fast'' as winner',
'race-test'
) as i \gset
postgres=> select usename,backend_start,query_start,wait_event,state,query,application_name
from pg_stat_activity where usename=user and pid!=pg_backend_pid(
)
\watch 10
Fri 05 Jun 2026 04:08:30 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.873785+00 | 2026-06-05 04:08:29.877368+00 | PgSleep | active | SELECT pg_sleep(30) |
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(2 rows)
Fri 05 Jun 2026 04:08:40 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.873785+00 | 2026-06-05 04:08:29.877368+00 | PgSleep | active | SELECT pg_sleep(30) |
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(2 rows)
Fri 05 Jun 2026 04:08:50 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.873785+00 | 2026-06-05 04:08:29.877368+00 | PgSleep | active | SELECT pg_sleep(30) |
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(2 rows)
Fri 05 Jun 2026 04:09:00 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:09:10 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:09:20 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:09:30 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:09:40 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:09:50 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+-------------------------------+-------------------------------+------------+--------+---------------------+------------------
franck | 2026-06-05 04:08:29.875738+00 | 2026-06-05 04:08:29.879296+00 | PgSleep | active | SELECT pg_sleep(90) |
(1 row)
Fri 05 Jun 2026 04:10:00 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+---------------+-------------+------------+-------+-------+------------------
(0 rows)
Fri 05 Jun 2026 04:10:10 AM GMT (every 10s)
usename | backend_start | query_start | wait_event | state | query | application_name
---------+---------------+-------------+------------+-------+-------+------------------
(0 rows)
execution_id | status | event_count | duration_ms | output
--------------+-----------+-------------+-------------+--------------------------------------------
1 | Completed | 27 | 31414 | {"rows":[{"winner":"fast"}],"row_count":1}
(1 row)
Both operations run, but we use the result from the fastest one. The trade-off is that both branches consume resources, so it's best suited for lightweight operations where responsiveness outweighs the cost. It can be used for timeout scenarios, competing API calls, or waiting for either a signal or a deadline.
We can add conditional branches with if...then (?>) and else (!>):
postgres=> SELECT df.start(
'SELECT 42 = 6*7 as condition' |=> 'cond'
?> 'SELECT ''Branch A'' as result'
!> 'SELECT ''Branch B'' as result',
'conditional-test'
) as i \gset
postgres=> SELECT df.result(:'i');
result
----------------------------------------------------
{"rows": [{"result": "Branch A"}], "row_count": 1}
(1 row)
postgres=> SELECT node_type, result_name, result, node_id, left_node, right_node
FROM df.instance_nodes(:'i');
node_type | result_name | result | node_id | left_node | right_node
-----------+-------------+----------------------------------------------------+----------+-----------+------------
SQL | cond | {"rows": [{"condition": true}], "row_count": 1} | 03784caa | |
SQL | | {"rows": [{"result": "Branch A"}], "row_count": 1} | d99e0e6c | |
IF | | {"rows": [{"result": "Branch A"}], "row_count": 1} | a764d565 | d99e0e6c | 72edfb05
SQL | | | 72edfb05 | |
(4 rows)
The result can be captured into a variable (|=>) for reuse ($):
postgres=> SELECT df.start(
'SELECT 42 as answer' |=> 'x'
~> 'SELECT $x * 2 as doubled'
~> 'SELECT $x + 10 as plus_ten',
'variable-test'
) as i \gset
postgres=> SELECT df.result(:'i');
result
----------------------------------------------
{"rows": [{"plus_ten": 52}], "row_count": 1}
(1 row)
postgres=> SELECT node_type, result_name, result, node_id, left_node, right_node
FROM df.instance_nodes(:'i');
node_type | result_name | result | node_id | left_node | right_node
-----------+-------------+----------------------------------------------+----------+-----------+------------
SQL | | {"rows": [{"doubled": 84}], "row_count": 1} | 4afc6779 | |
SQL | | {"rows": [{"plus_ten": 52}], "row_count": 1} | 4d2146fe | |
THEN | | {"rows": [{"plus_ten": 52}], "row_count": 1} | 4dd00824 | 63f0a5eb | 4d2146fe
THEN | | {"rows": [{"doubled": 84}], "row_count": 1} | 63f0a5eb | c2dbd3d3 | 4afc6779
SQL | x | {"rows": [{"answer": 42}], "row_count": 1} | c2dbd3d3 | |
(5 rows)
The result is the final result after the sequential execution but each step shows the intermediate result.
It's also possible to loop (@>) and wait for a signal or a specific time (using a cron syntax):
SELECT df.start(
@> (
'SELECT datname, usename, application_name, wait_event_type, wait_event, state, query FROM pg_stat_activity'
~> df.wait_for_schedule('*/1 * * * *')
),
'pg-stat-activity-sampler'
) as i \gset
This reads pg_stat_activity every minute.
I can query the last 30 executions:
SELECT count(*) AS sessions, r.query, r.wait_event, COUNT(DISTINCT updated_at) AS samples
FROM df.instance_nodes(:'i',30) n
CROSS JOIN LATERAL jsonb_to_recordset(n.result::jsonb -> 'rows') AS r ( datname text, usename text, application_name text, wait_event_type text, wait_event text, state text, query text)
WHERE n.node_type = 'SQL' AND r.query IS NOT NULL
GROUP BY r.query, r.wait_event
ORDER BY samples DESC, r.query, r.wait_event;
I can schedule a purge at midnight to keep only one week:
SELECT df.start(
@> (
df.wait_for_schedule('0 0 * * *')
~> 'SELECT * FROM df.prune_executions(
instance_id => ''pg-stat-activity-sampler'',
completed_before => NOW() - INTERVAL ''7 days''
)'
),
'pg-stat-activity-pruner'
);
With a simple command, I have implemented an active session history monitoring.
Here is a final test to show that each step runs in its own transaction:
postgres=> SELECT df.start(
-- Sequential execution showing different txid/session_id
'SELECT user, pg_backend_pid() as session_id, txid_current() as txid, ''step1'' as step' |=> 's1'
~> 'SELECT user, pg_backend_pid() as session_id, txid_current() as txid, ''step2'' as step'
~>
-- Race showing parallel execution with different connections
(
'SELECT user, pg_backend_pid() as session_id, txid_current() as txid, ''race_branch1'' as branch'
|
'SELECT user, pg_backend_pid() as session_id, txid_current() as txid, ''race_branch2'' as branch'
)
~> 'SELECT user, pg_backend_pid() as session_id, txid_current() as txid, ''final'' as step',
'txid-demo'
) as i \gset
postgres=> SELECT node_type, result_name, result FROM
df.instance_nodes(:'i') WHERE node_type='SQL';
node_type | result_name | result
-----------+-------------+----------------------------------------------------------------------------------------------------------------
SQL | s1 | {"rows": [{"step": "step1", "txid": 25769, "user": "franck", "session_id": 1840182}], "row_count": 1}
SQL | | {"rows": [{"step": "step2", "txid": 25784, "user": "franck", "session_id": 1840183}], "row_count": 1}
SQL | | {"rows": [{"txid": 25819, "user": "franck", "branch": "race_branch1", "session_id": 1840185}], "row_count": 1}
SQL | | {"rows": [{"txid": 25820, "user": "franck", "branch": "race_branch2", "session_id": 1840186}], "row_count": 1}
SQL | | {"rows": [{"step": "final", "txid": 25859, "user": "franck", "session_id": 1840187}], "row_count": 1}
(5 rows)
The main purpose of durable functions is to execute steps as if they were independent operations, while adding orchestration, persistence, and resilience. They run with the credentials of the user who defines the workflow, with a new session, and transaction, per operation.
Conclusion
With pg_durable, workflows become a natural extension of PostgreSQL—another “just use PostgreSQL” use case—defined in SQL, executed close to the data, with the database's resilience and persistence, and managed with the same tools you already use.
What makes it compelling is that this isn’t a prototype—it comes from real production needs—HorizonDB durable functions, now open-sourced and contributed back to PostgreSQL. It’s a clear example of Microsoft investing in the ecosystem and sharing what was built for its managed services.
If your applications already rely on PostgreSQL, this approach lets you add durable orchestration without introducing new infrastructure—just more capability where your data already lives. Try it, experiment with it, and if it clicks, give the project a star and help it grow.
Top comments (0)