DEV Community

Cover image for Implementing a Task Queue in SQL
Todd McNeal
Todd McNeal

Posted on • Originally published at reflect.run

Implementing a Task Queue in SQL

Introduction

Software systems often periodically execute collections of similar or identical tasks. Whether it's computing daily account analytics, or running background computations asynchronously like GitHub Actions, it's common to structure and complete this computation using a queue of tasks and the concept of a generic "worker" which executes the task. In the naive approach, a single worker sequentially executes each task, avoiding locks or other coordination.

Two downsides to the single worker approach are:

  • Tasks that are ready to execute will be delayed from executing until prior tasks complete (head-of-line blocking).
  • Independent tasks are not run in parallel (as they can be logically), so the overall wall-clock execution time is longer than necessary.

The alternative to the single worker is to use a pool of workers, each pulling a task from the queue when they are ready to execute one. In exchange for the reduced queueing delay and the reduced overall wall-clock execution time,
the programmer must manage the complexity of assigning and executing tasks. Some cloud services, such as Amazon Simple Queue Service (SQS), offer a managed abstraction for queueing tasks and assigning them to workers. However, they can be difficult to debug or impose undesirable properties, such as SQS's default policy of at least once delivery (rather than exactly once). Lastly, it might be the case that you just don't want the third-party dependency!

This article describes how to implement a task queue in SQL with the following properties:

  • A task is assigned to exactly 1 worker (or 0 workers) at a time.
  • Once completed, a task is never assigned again.
  • When a task reaches a configured maximum execution time without completing, it will be assigned again to a worker.

Let's jump in!

Design

A queue provides some elasticity in your system when your message (or, in our case, task) volume varies or is unpredictable. It also allows you to impose smoothing to a computational workload, which would otherwise be bursty,
by using a fixed set of resources to consume the queue and process the messages.

Let's consider some concrete examples to inform the design of our SQL task queue:

  • Daily Analytics - your analytics application displays usage metrics broken down by day.
    Since you want these analytics available to your users by 8AM local time, you queue up one analytics job for each account every night starting at 12:01AM.

  • Scheduled Reminders - your online game relies on in-app ads and upgrade notifications to drive revenue, but the logic for deciding which content to trigger for the user is dynamic. So, you queue up all the notification content for each account and in real-time consume and filter the desired content.

  • Real-Time Events - your financial application receives millions of hits to its API throughout the day. In order to keep your endpoints fast, you queue events immediately after receiving them and process them later.

Exactly once delivery

The distinguishing property in any queueing use case is whether or not the task is idempotent. That is, can the same task be executed multiple times without adverse effects? If so, then the queue can deliver the same message to the worker pool more than once and the internal queueing coordination and locking complexity is reduced. Of course, if the tasks are not idempotent, or you simply don't want to waste the duplicative compute capacity, the worker itself can use a lock to ensure a message is only processed once in an at least once queueing system.

For our use case, we're interested in exactly once delivery of each message (i.e., exactly once execution of a task), and given that the at least once policy still requires a lock anyway to achieve exactly once behavior, we're going to use the atomicity of SQL writes as the lock in our queue.

Data model

To this point, we've been alluding to a task as a single data object, but in practice, we only really need the task's identifier in order to coordinate its lock and owner. Any worker receiving a task identifier for execution could read the task's information from another table. As such, we could use two tables: one for the task assignment and one for the task information, potentially saving bandwidth on database operations.

For simplicity, we'll consider a single table with all of the relevant information stored in a details field.
Consider the following data model definition for a Task:

table tasks:
    id serial
    details text
    status text -- One of: queued, executing, completed
    operator_id text
    expiry timestamp
    created timestamp
    last_updated timestamp
Enter fullscreen mode Exit fullscreen mode

The lifecycle of a task in our model is the following:

  • a task begins in the queued state when it is written to the table,
  • a task enters the executing state when a worker is assigned the task and it begins execution, and
  • a worker updates a task to the completed state when it finishes execution.

Implementation

We use a Manager process to assign tasks to workers, and we run two instances of the Manager to improve fault tolerance and maintain availability during deployments. Managers repeatedly query the database for queued tasks and assign them to workers. Additionally, Managers look for tasks that have reached their maximum execution time (i.e., "timed out"), and reassign those tasks to new workers.

A worker can be a local thread on the same machine, or a remote machine that can accept work. The Manager doesn't really care as long as it can assume a reliable communication channel to the worker. (Obviously, the Manager might perform additional jobs such as worker liveness or reporting,
but we omit those details here.)

Database queries

The database queries form the backbone of our SQL queue and
focus on the areas of identifying unfinished tasks and completing tasks. The Manager (or another process) is responsible for queueing new tasks by writing them to the table. Then, the Manager queries the table periodically to identify tasks that are ready to be executed.

Find executable tasks:

    select
        *
    from tasks t
    where t.status = 'queued' or (t.status = 'executing' and t.expiry <= NOW())
Enter fullscreen mode Exit fullscreen mode

Our Manager randomizes the returned tasks to reduce contention across Manager instances when selecting a task. Then, the Manager attempts to lock a task so that it can assign the task to a worker.

Attempt to lock a task

    update tasks t
        set t.status = 'executing',
            t.operator_id = $operator_id,
            t.expiry = $expiry,
            t.last_updated = NOW()
    where t.id = $id and t.last_updated = $last_updated
Enter fullscreen mode Exit fullscreen mode

The Manager calculates the expiry value as the current time plus the maximum amount of time it expects a worker to take to execute the task, plus some buffer. If the query returns with 0 rows updated, then it means the Manager did not obtain the lock. Otherwise, if the query returns with a value of 1, it means the Manager successfully locked the task. The Manager can now assign the task to a worker for execution.
If the worker fails to execute the task or takes too long to execute the task, then the task will be selected in the first query to find executable tasks.

This code uses the last_updated column as an indicator for whether a row has been written since it was last read. Thus, this optimistic concurrency control mechanism will fail if a row could be written or updated without updating the last_updated column. In general, the resolution of the last_updated timestamp must be greater than the system's shortest read+write update.

The worker will then read the task from the table by its id and use the details to execute the task. You may wish to have your worker update the task row to specify its own operator_id to aid in debugging once it begins executing the task. Regardless, when the worker completes execution of the task, it updates the row to indicate that it's complete.

Update a completed task

    update tasks t
        set t.status = 'complete',
            t.operator_id = NULL,
            t.expiry = NULL,
            t.last_updated = NOW()
    where t.id = $id and t.last_updated = $last_updated
Enter fullscreen mode Exit fullscreen mode

This system has workers marking tasks as complete, but if you wanted to consolidate writes to this table within the Manager exclusively, you could have the Manager look for tasks that satisfy some external property, such as the presence of a log file or a row in another table, before marking tasks as complete.

Discussion

A major benefit to this approach is that it has no external or third-party dependencies. For any system that already relies on a relational database, this approach incurs no additional infrastructure.

A second major advantage of this approach is that it automatically produces a historical record or log of all tasks that were executed. You may wish to trim the tasks table depending how quickly it grows in size, but broadly speaking, the audit log it provides is very useful for debugging problems.

Lastly, this approach is both scalable as you add more workers, and fault tolerant to worker and Manager failures. With only three database queries, the Manager or worker can fail at any point, and the task will eventually be retried. To provide further protection against a duplicate execution, you can introduce an external lock or state management to track a worker's progress in executing a task. In a similar vein, you may wish to add a column for retries of a task.

The major downside or risk to this approach of queueing is that this setup couples the throughput/volume of your system processing to the read/write capacity of your database. If your database is "small" and has an otherwise low volume of writes, but your work queue has high volume/throughput requirements, you may end up in a situation where you have to scale your database solely because of the requirements of the work queue. And, of course, if you don't proactively monitor the growth of your volume, the contention caused by workers reading and writing on this table could negatively impact the rest of your database operations and your system as a whole.

A second drawback to this approach is that you are managing the complexity yourself. With Amazon SQS you are outsourcing all implementation details to a third-party. But with this approach, you need to make sure that the queries and table indices are correct. Similarly, and related to the first downside, there isn't the warm-and-fuzzy feeling with a self-built approach like this that you might get from Amazon's eleven 9's of service reliability or throughput guarantees.
Still, over time, the operational maturity will increase your confidence.

Conclusion

The simplicity of a SQL queue makes it attractive as an alternative to a managed, third-party queue abstraction. In this article, we presented a minimal implementation of one such SQL-based task queue. If you like this article and would like to discuss more, you can reach us at info@reflect.run. We'd love to hear from you!

Oldest comments (1)

Collapse
 
darkain profile image
Vincent Milum Jr

For those running more modern databases, this is now a built in feature for MariaDB 10.6

SKIP LOCKED handles pretty much everything for concurrency to ensure only one worker can pull any given row, and allows multiple workers to work in parallel without colliding.

mariadb.com/kb/en/select/#skip-locked