DEV Community

Maximo Guerrero
Maximo Guerrero

Posted on

Simple Queue in PostgreSQL

Sometimes you need a simple job queue to enable offline or deferred processing of data. While this is not a pub/sub system which is what a lot of people us Queues for, you could expand upon it.

I will be using Postgres as the database (you could apply the concepts in this post to other RDMS). What this is not is a tutorial in SQL, I assume you know how to write and understand basic sql(select, insert, update, and delete).

Let's start with the definition of our table. One thing you will notice is that we have a column called jobData of type json, this is so that you store the data need to process a job in a structured way. Feel free to change it to TEXT or anything else.


CREATE TABLE public."jobQueue"
(
    "jobId" serial NOT NULL ,
    "jobData" json NOT NULL,
    status character varying ,
    added timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
    started timestamp without time zone,
    ended timestamp without time zone,
    CONSTRAINT "jobQueue_pkey" PRIMARY KEY ("jobId")
)
Enter fullscreen mode Exit fullscreen mode

Now the secret to using a database table as a queue, is to lock the row for updates while you're getting a job from the queue.

Let's create a function that will take as an argument the number of jobs to pull off the queue.


CREATE OR REPLACE FUNCTION public."jobQueue_getJobs"(
    "_numJobsToGet" integer)
    RETURNS TABLE("jobId" integer, status character varying) 
    LANGUAGE 'plpgsql'


AS $BODY$

DECLARE _jobId int;
DECLARE _status character varying;

BEGIN
   FOR _jobId,_status   IN
             SELECT jq1."jobId", jq1.status, jq1.type
                FROM "jobQueue" jq1
               WHERE 
        jq1.status = 'new'
        and
        jq1.added < NOW() 
        ORDER BY added
               LIMIT "_numJobsToGet" FOR UPDATE SKIP LOCKED
   LOOP
      UPDATE "jobQueue" jq2 SET jq2."status" = 'pickedup'
         WHERE jq2."jobId" = _jobId;
      RETURN NEXT ;
   END LOOP;
   RETURN;
END
Enter fullscreen mode Exit fullscreen mode

So executing this function will return a list of jobIds' that then you can be used to select and update the jobQueue table without any other process stepping on your jobs.

One thing about this process is that if a process dies and a job is stuck in limbo you can restart it without any data loss.

Once your done processing the job you can mark it as done or remove it from the queue if you are space constrained.

Top comments (2)

Collapse
 
davidjamesstone profile image
David Stone

Hi Maximo

Thanks for the great article. Can I ask what version of PG you are using please as I I am struggling to get your code running (using PG v12.1)?

When I try and run it complains about the "jq2" aliasing. According to another article, table aliasing can't be done like this as part of an UPDATE statement in PG.

dba.stackexchange.com/questions/15...

I would really appreciate any help on this - many thanks

Collapse
 
maximoguerrero profile image
Maximo Guerrero

Sorry for the super late reply. I have been using this in production with version 11.22.