DEV Community

loading...

Asynchronous Notifications in Postgres

citizen428 profile image Michael Kohl Originally published at citizen428.net Updated on ・4 min read

I'm fascinated by Postgres: the more I learn about it, the more I realize how much I still don't know. Recently I discovered its asynchronous communication capabilities, which apparently have been around for a long time ¯\(ツ)

Let's look at the two most interesting commands related to this topic, NOTIFY and LISTEN. Here's what the documentation has to say on them:

NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).

Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.

LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.

Sounds like publish-subscribe on the database level, interesting! I learn best by trying things out and writing some code, so let's dive in.

Setting up Postgres for notifications

For testing purposes, let's create an overly simplified orders table, that except for the primary key also contains an email address to identify the person who placed the order and a bigint field to store the total order amount in cents:

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,  
  email TEXT NOT NULL,
  total BIGINT NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

Next we need to define a function which returns a trigger:

CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
  DECLARE 
    record RECORD;  
    payload JSON;
  BEGIN
    IF (TG_OP = 'DELETE') THEN
      record = OLD;
    ELSE
      record = NEW;
    END IF;

    payload = json_build_object('table', TG_TABLE_NAME, 
                                'action', TG_OP, 
                                'data', row_to_json(record));

    PERFORM pg_notify('events', payload::text);

    RETURN NULL; 
  END;    
$$ LANGUAGE plpgsql;
Enter fullscreen mode Exit fullscreen mode

The above is pretty straightforward:

  1. Declare some variables for later use.
  2. Switch on the TG_OP special variable to decide which version of the row we want to serialize.
  3. Use json_build_object and row_to_json to generate the notification payload.
  4. Use pg_notify to broadcast a message on the events channel.
  5. Return NULL since this is an AFTER trigger.

Now we can create a notify_order_event trigger, which will call this function after we perform a CRUD operation on the orders table:

CREATE TRIGGER notify_order_event
AFTER INSERT OR UPDATE OR DELETE ON orders
  FOR EACH ROW EXECUTE PROCEDURE notify_event();
Enter fullscreen mode Exit fullscreen mode

With this in place we should now be able to receive events. Let's inform Postgres that we're interested in notifications on the events channel:

LISTEN events;
Enter fullscreen mode Exit fullscreen mode

Now whenever we insert, update or delete a record we will receive a notification:

INSERT into orders (email, total) VALUES ('test@example.com', 10000);
INSERT 0 1
Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.
Enter fullscreen mode Exit fullscreen mode

Great, we just received our first asynchronous notification, though admittedly that's not particularly useful within the same psql session, so let's add another listener.

Listening from another process

For the following example we'll once again use Jeremy Evan's excellent Sequel gem:

require 'sequel'

DB = Sequel.connect('postgres://user@localhost/notify-test')

puts 'Listening for DB events...'
DB.listen(:events, loop: true) do |_channel, _pid, payload| 
  puts payload
end
Enter fullscreen mode Exit fullscreen mode

The above code first connects to the database and then uses Sequel::Postgres::Database#listen to listen for events in a loop.

If we start this script and insert a record in our database the JSON payload will get output to the console:

→ ruby test.rb
Listening for DB events...
{"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"test@example.com","total":10000}}
Enter fullscreen mode Exit fullscreen mode

Nice, but still not terribly useful. Wouldn't it be nice if we could forward this notification to our web browser to build a dashboard or something? Enter WebSockets.

Connecting a frontend

Since this is just a simple demo and not a full-fledged app I felt like Rails + ActionCable would be overkill, so I decided to just write a very simple Rack app with Faye instead:

require 'faye/websocket'
require 'sequel'

DB = Sequel.connect('postgres://user@localhost/notify-test')

App = lambda do |env|
  ws = Faye::WebSocket.new(env)

  DB.listen(:events, loop: true) do |_channel, _pid, payload|
    ws.send(payload)
    ws.rack_response
  end
end
Enter fullscreen mode Exit fullscreen mode

Like every Rack "app" this is just an object supporting call, in our case a lambda. In there we create a websocket which we use to forward the messages we receive from Postgres to the browser. To receive them there, all we need to do is instantiate a Websocket with an onmessage handler that logs the payload to the console:

socket = new WebSocket("ws://localhost:9292")
socket.onmessage = (event) => console.log(event.data)
Enter fullscreen mode Exit fullscreen mode

Whenever we create, modify or delete a new record in our orders table now, we'll be notified in the browser:

Chrome console

Summary

Postgres' LISTEN and NOTIFY offer easy interprocess communication via a publish-subscribe pattern. There are many potential use cases for this mechanism, from logging over ETL to realtime dashboards.

Discussion (6)

pic
Editor guide
Collapse
rhymes profile image
rhymes

Postgresql is full of gems, I still have to explore foreign data wrappers for example.

I wonder how much toll this feature takes on the database in case of an app with heavy writing load. I wouldn't want the db to perform worse because it has a queue of payloads to send back to the app

Collapse
citizen428 profile image
Michael Kohl Author • Edited

Good point, though probably a bridge I'd not try to cross before hitting that heavy write load. Over the years I saw too many people complicate their architecture before there was a need a for it, so my current approach nowadays is generally "simplest thing that can possibly work". It's way easier to add more layers/tech later than to rip out something that was added at the very beginning of a project.

Collapse
rhymes profile image
rhymes

Over the years I saw too many people complicate their architecture before there was a need a for it, so my current approach nowadays is generally "simplest thing that can possibly work". It's way easier to add more layers/tech later than to rip out something that was added at the very beginning of a project.

Yeah, I'm definitely guilty of over engineering in the early days but I think I developed an enzyme that reacts everytime it gets in contact with over engineered stuff sounding an alarm :D

But I still try to keep an eye on the "future direction"

Thread Thread
citizen428 profile image
Michael Kohl Author

I'm sure we've all been guilty of this at one point or another. I worked a lot with early stage startups, where this is extra important, because if you spend too much time on the wrong thing there is no future. That said, I also do keep the future direction in mind and generally outline the rationale for doing it the current way plus possible future alternatives in comments.

Collapse
citizen428 profile image
Michael Kohl Author

For example two applications already connected to the same DB.