Job Queues in PostgreSQL with SKIP LOCKED

Toshiro Nishimura, 2017-10-03

Summary

PostgreSQL 9.5+ offers the SKIP LOCKED option for SELECT statements, allowing us to implement simple and correct queues.

Introduction

While there are various dedicated systems for job-queueing, ranging from centralized servers such as RabbitMQ, DIY libraries in the form of ZeroMQ, or cloud services like Amazon SQS, there’s a problem for data analysts: we eventually need the jobs and results in a database in order to analyze them. If you’re like me, data analysis does not start until the data gets into an relational database.

To save us from adding yet another complex piece of software into our data processing infrastructure, recent versions of PostgreSQL allow us to easily build a “good enough” queueing system leveraging the RDBMS’s built-in transactional capabilities. The key ingredient is “SKIP LOCKED”.

The Tables

First, let’s create a table which will hold our incoming jobs, and a results table which will the result of processing. We also add a handful of dummy jobs.:

CREATE TABLE jobs (
    job_id            BIGSERIAL PRIMARY KEY,
    retries_remaining INT NOT NULL DEFAULT 3,
    time_created      TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW() NOT NULL,

    -- these can be whatever you want
    job_property1     TEXT,
    job_property2     TEXT
);
CREATE INDEX jobs_time ON jobs(time_created);
CREATE INDEX jobs_retries ON jobs(retries_remaining);

CREATE TABLE results (
    job_id         BIGINT PRIMARY KEY REFERENCES JOBS(job_id),

    -- ditto - put job results here
    result_detail1 TEXT,
    result_detail2 TEXT
);

INSERT INTO JOBS( job_property1, job_property2 )
VALUES 
    ( 'big job 1', 'do important stuff 1'),
    ( 'big job 2', 'do important stuff 2'),
    ( 'big job 3', 'do important stuff 3'),
    ( 'big job 4', 'do important stuff 4');

Retrieving and processing a job

Next, we initiate a job by starting a transaction which locks a single job in the jobs table. Examples are written in Python with the psycopg2 module:

conn = psycopg2.connect("dbname=yourdb")

cur.execute("""
UPDATE jobs j1 SET retries_remaining = retries_remaining - 1
WHERE j1.job_id = ( 
    SELECT j2.job_id FROM jobs j2 
    WHERE j2.retries_remaining > 0
    ORDER BY j2.time_created FOR UPDATE SKIP LOCKED LIMIT 1 
)
RETURNING j1.job_id, j1.job_property1, j1.job_property2;
""")

The magic sauce is the SKIP LOCKED in the subquery. SKIP LOCKED locks the row, as would an ordinary FOR UPDATE. However, it also allows other SELECT’s in the meantime to ignore that row and run queries that would otherwise require it (such as those with and ORDER BY ). We also decrement the retries counter so we don’t have to do it later. Of course, without FOR UPDATE, our dequeueing statement suffers from a race-condition - two workers can attempt to retrieve the same job simultaneously.

Next, we attempt to run a job. If successful, we create a row in results, delete the job from the queue, and commit. If unsuccessful, we commit the tx, and allow another worker to attempt the job (until retries_remaining reaches 0).

record = cur.fetchone()
if record is None:
    print("no more jobs")
else:
    (job_id, _, _) = record

    ## Do something fancy and long-running here. 

    if successful_in_doing_something_fancy:

        cur.execute("""
        INSERT INTO results (job_id, result_detail1, result_detail2) 
        VALUES ( %s, %s, %s );
        """, (job_id, "some result 1", "some result 2"))

        cur.execute("""
        DELETE FROM jobs WHERE job_id = %s;
        """, (job_id,))
conn.commit()

Performance

There should be no expectation that an ad-hoc job queue in PostgreSQL should rival that of a dedicate queueing system.
However, the above method on a completely unoptimized installation of PostgreSQL 9.5 running on an AWS c4.2xlarge (8 cores, 16 gb ram, $0.40/hour) yields a not-too-shabby 7200 retrievals per second. That’s without any trickery such as unlogged tables.

Keep the queue table small

Since locking a row with SKIP LOCKED forces the DB, upon every SELECT on the table, to iterate through the entire table to find unlocked rows, it’s important for performance to keep the job queue table as small as possible. In the above example, I initially attempted to keep completed jobs in the jobs table with a completed boolean flag, and then link it to the results table. However, performance degraded steadily as the table got larger. To remedy this, the example deletes the row from the jobs table after completion.

Anti-Patterns

A common pattern I’ve seen for implementing Queues in databases is to use some sort of “status” field, indicating whether a job is new, being processed by a worker, or finished. There are several problems with this pattern. First, it increases IO on the jobs table. Using this article’s method, there is a single write when a job finishes.

Second, it becomes difficult to tell when a job has failed. In our method, a worker failure would result in the transaction rolling back. (It would also reset the retries_remaining, so it’s not perfect, but better than being stuck in an intermediate stage forever).

Third, without FOR UPDATE SKIP LOCKED, it is cumbersome to avoid race conditions. A worker would have to retrieve a candidate job id, and do something like UPDATE status = 'in-process' WHERE status = 'new' and job_id = my_candidate and make sure it affected a single row, repeating if not. This is because it is possible that another worker retrieved the same candidate job and ran the statement before your’s, in which case you try again.