Intro
Now days we, as developers, love to build message-driven, (micro-)service oriented applications. If you chosen PostgreSQL as data-storage engine for your services there are good news - it supports asynchronous events via LISTEN
and NOTIFY
, so it gives us an ability to turn RDBMS into event-based engine.
For example, you may expose NOTIFY
events from table triggers and your application layer may LISTEN
to them, apply required business-logic on top and deliver changes to some API layer or to some front-end application layer. As well as you can utilize database only as event-bus for your application or combine both approaches.
Here we would try to review how we can use this awesome feature of database in Node/TypeScript service-based applications using @imqueue/pg-pubsub module.
Why Using Yet Another Module?
If you decided to use LISTEN/NOTIFY
in your app there are several issues rising to be solved:
- These commands are connection-specific, so you would need to run them on dedicated connection. There would be a problem running them using connection pools.
- You might be need to make additional steps on connection implementation related to reliability, thus, if connection was lost or thrown an error, usually you might want to have some re-connection mechanism.
- When using pub/sub on application level there could be a need to filter messages in a process. For example, if we have process 1 and process 2 listening to the same event channel we may want to make sure that if process 1 notified a message it won't handle that message.
- On some architectures you may have running several similar processes at scale which are channel listeners, whom delivering caught events to some application layer on top, for example, API gateway layer. In this case, if all of them catching the same message and trying to deliver it to upper layer - you may fall into duplicated data delivery problem.
- Graceful shutdown support. You would definitely need it in case of implementing inter-process locking.
- Problem with dozens of existing modules is that they resolve only a limited set of the described problems and, usually, introduce yet another one - they hide database driver from the end-user, so them are really hard to extend without patching or another kind of ugly hacks.
So, if you're going to build your solution on a naked database driver or some existing 3rd-party solution - those problems are a candidates to be resolved by your own. Here is why we came up with idea of building a module, which has all that addressed issues solved out-of-the-box.
Example Scenarios
Let's see how we can use @imqueue/pg-pubsub
in a different scenarios by example.
NOTE: We would emulate periodical notifications using
setInterval
timers, of course in real app them could be real-triggered events either on application or database level.
You may copy-paste the code below, play with it and run it in a several different processes to observe the behavior. Or you may clone ready-to-launch examples from repository.
The examples will listen/notify to a single channel HelloChannel
, when we talking about "Listen All", "Publish All" - we mean all running processes, not channels.
1. Listen All, Publish All
This is, probably the most used common case. In this scenario all running processes will listen and notify to HelloChannel
and handling all caught messages, even those which were emitted by the same process:
import { PgPubSub } from '@imqueue/pg-pubsub';
import Timer = NodeJS.Timer;
let timer: Timer;
const NOTIFY_DELAY = 2000;
const CHANNEL = 'HelloChannel';
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
singleListener: false,
});
pubSub.on('listen', channel => console.info('Listening to ' + channel + '...'));
pubSub.on('connect', async () => {
console.info('Database connected!');
await pubSub.listen(CHANNEL);
timer = setInterval(async () => {
await pubSub.notify(CHANNEL, { hello: { from: process.pid } });
}, NOTIFY_DELAY);
});
pubSub.on('notify', channel => console.log(channel + ' notified`));
pubSub.on('end', () => console.warn('Connection closed!'));
pubSub.channels.on(CHANNEL, console.log);
pubSub.connect().catch(err => console.error('Connection error:', err));
2. Listen All Filtered, Notify All
In this scenario all running processes would listen and notify messages to HelloChannel
, but self-emitted messages would not be handled. Mostly the code will remain the same, you would need to modify only PgPubSub
instantiation options to this:
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
singleListener: false,
filtered: true,
});
3. Listen Single, Notify All
In this scenario all running processes would notify HelloChannel
, but only one process will listen to it unless it gracefully shut down. When that, another live process will become a listener. So, the code remaining the same, but you need to change PgPubSub
instantiation options to this:
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
});
or, more explicitly, to this:
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
singleListener: true,
filtered: false,
});
4. Listen Single Filtered, Notify All
This scenario would be almost the same as previous one, except that self-emitted messages will not be handled by a listener process:
const pubSub = new PgPubSub({
connectionString: 'postgres://postgres@localhost:5432/postgres',
singleListener: true,
filtered: true,
});
API
@imqueue/pg-pubsub
does not hide and underlying objects, having it public, so whenever you would need to inject or extend attached PostgreSQL client object behavior you can easily do it via pubSub.pgClient
. As well as it provides an ability to inject existing client object rather than of constructing it, just pass it as pgClient
option instead of connectionString
:
new PgPubSub({ pgClient: existingAppClient });
Inter process lock implementation, by nature is implemented on top of the same LISTEN/NOTIFY
features and utilizes the same shared connection so it won't require any additional technologies on top or computing resources, like additional network connections.
You can read full library API documentation here.
Hope it would be helpful and,
Happy Coding!
Top comments (0)