DEV Community

MikeVV
MikeVV

Posted on • Edited on

Simple queue with PostgreSQL

If you need a queue in database but does not have in your hands Oracle with its' Advanced Queues and only PostgreSQL instead, you can simulate the queue with below simple recipe.

To make it more generic let's assume you need more than one queue (or may have need 2nd queue in the future)

1. Create a base queue table

CREATE TABLE base_queue
(
   id           serial      NOT NULL,
   status       integer     DEFAULT 0 NOT NULL,
   try_count    integer        DEFAULT 0 NOT NULL,
   max_tries    integer        DEFAULT 5 NOT NULL,
   params       json,
   create_time  timestamp   DEFAULT CURRENT_TIMESTAMP NOT NULL,
   update_time  timestamp,
   priority     integer     DEFAULT 0 NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

attributes:

  • status - the status of the message in the queue, 0 is "unprocessed", -1 "failed", 1 "processed" (Also, you can use your own status numbers if you like)
  • try_count - approximate tries count (why it is approximate, will be explained a bit later)
  • max_tries - tries limiter (not necessary to use, but may be useful sometimes as allow you to put in queue messages with different retry count)
  • params - your business payload in json format
  • create_time - when you put message to queue
  • update_time - when your message was last updated (both timestamps useful for troubleshooting)
  • priority - may not need it, but better to have just in case

2. Create your queue as follows

CREATE TABLE <YOUR QUEUE NAME> ()
INHERITS (base_queue);
Enter fullscreen mode Exit fullscreen mode

Thats all. After that you will have a table with the same structure as "base_queue"

3. work with queue

3.1. put a message to queue - just insert a record like

insert into <YOUR QUEUE NAME> (params) values (<YOUR BUSINESS PAYLOAD JSON>);
Enter fullscreen mode Exit fullscreen mode

3.2. get a message for processing from queue

select * from <YOUR QUEUE NAME> where 
  status = 0 
limit 1 
for update skip locked;
Enter fullscreen mode Exit fullscreen mode

important aspect here: Do not use "AUTOCOMMIT" option. All this magic works only in the context of transaction.
You also may extend SQL with priority condition or try_count if it necessary.

  • priority
  ... 
  and priority = <REQUIRED PROIRITY>
  ... 
Enter fullscreen mode Exit fullscreen mode
  • try_count
  ... 
  and try_count <= max_tries
  ... 
Enter fullscreen mode Exit fullscreen mode

3.3. failed processing - case 1 (exception and rollback)

In this case on the transaction rollback you record in queue will again become available for processing, "try_count" and "update_time" will not be updated. this is why "try_count" is "approximate".

3.4. failed processing - case 2 (failing processing by your code)

  • update the record (to put back to queue)
update <YOUR QUEUE NAME> set 
  try_count = try_count+1,
  update_time = CURRENT_TIMESTAMP
where id = <ID>;
Enter fullscreen mode Exit fullscreen mode
  • or in case you need to fail it (for example then you reached max processing attempts)
update <YOUR QUEUE NAME> set 
  try_count = try_count+1,
  update_time = CURRENT_TIMESTAMP,
  status = -1
where id = <ID>;
Enter fullscreen mode Exit fullscreen mode
  • commit

3.5. saving sucessull processing state

  • update the record
update <YOUR QUEUE NAME> set 
  try_count = try_count+1,
  update_time = CURRENT_TIMESTAMP,
  status = 1
where id = <ID>;
Enter fullscreen mode Exit fullscreen mode
  • commit

House keeping

According to your retention policy (if any):

  1. From time to time need to remove records with status 1 or -1
  2. From time to time need to remove records with try_count >= max_tries (if you are using filters by try_count while getting records from queue)

Top comments (0)