If you need a queue in database but does not have in your hands Oracle with its' Advanced Queues and only PostgreSQL instead, you can simulate the queue with below simple recipe.
To make it more generic let's assume you need more than one queue (or may have need 2nd queue in the future)
1. Create a base queue table
CREATE TABLE base_queue
(
id serial NOT NULL,
status integer DEFAULT 0 NOT NULL,
try_count integer DEFAULT 0 NOT NULL,
max_tries integer DEFAULT 5 NOT NULL,
params json,
create_time timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL,
update_time timestamp,
priority integer DEFAULT 0 NOT NULL
);
attributes:
- status - the status of the message in the queue, 0 is "unprocessed", -1 "failed", 1 "processed" (Also, you can use your own status numbers if you like)
- try_count - approximate tries count (why it is approximate, will be explained a bit later)
- max_tries - tries limiter (not necessary to use, but may be useful sometimes as allow you to put in queue messages with different retry count)
- params - your business payload in json format
- create_time - when you put message to queue
- update_time - when your message was last updated (both timestamps useful for troubleshooting)
- priority - may not need it, but better to have just in case
2. Create your queue as follows
CREATE TABLE <YOUR QUEUE NAME> ()
INHERITS (base_queue);
Thats all. After that you will have a table with the same structure as "base_queue"
3. work with queue
3.1. put a message to queue - just insert a record like
insert into <YOUR QUEUE NAME> (params) values (<YOUR BUSINESS PAYLOAD JSON>);
3.2. get a message for processing from queue
select * from <YOUR QUEUE NAME> where
status = 0
limit 1
for update skip locked;
important aspect here: Do not use "AUTOCOMMIT" option. All this magic works only in the context of transaction.
You also may extend SQL with priority condition or try_count if it necessary.
- priority
...
and priority = <REQUIRED PROIRITY>
...
- try_count
...
and try_count <= max_tries
...
3.3. failed processing - case 1 (exception and rollback)
In this case on the transaction rollback you record in queue will again become available for processing, "try_count" and "update_time" will not be updated. this is why "try_count" is "approximate".
3.4. failed processing - case 2 (failing processing by your code)
- update the record (to put back to queue)
update <YOUR QUEUE NAME> set
try_count = try_count+1,
update_time = CURRENT_TIMESTAMP
where id = <ID>;
- or in case you need to fail it (for example then you reached max processing attempts)
update <YOUR QUEUE NAME> set
try_count = try_count+1,
update_time = CURRENT_TIMESTAMP,
status = -1
where id = <ID>;
- commit
3.5. saving sucessull processing state
- update the record
update <YOUR QUEUE NAME> set
try_count = try_count+1,
update_time = CURRENT_TIMESTAMP,
status = 1
where id = <ID>;
- commit
House keeping
According to your retention policy (if any):
- From time to time need to remove records with status 1 or -1
- From time to time need to remove records with try_count >= max_tries (if you are using filters by try_count while getting records from queue)
Top comments (0)