Job Queues in PostgreSQL with SKIP LOCKED
Toshiro Nishimura, 2017-10-03
PostgreSQL 9.5+ offers the
SKIP LOCKED option for
SELECT statements, allowing us to implement simple and correct queues.
While there are various dedicated systems for job-queueing, ranging from centralized servers such as RabbitMQ, DIY libraries in the form of ZeroMQ, or cloud services like Amazon SQS, there’s a problem for data analysts: we eventually need the jobs and results in a database in order to analyze them. If you’re like me, data analysis does not start until the data gets into an relational database.
To save us from adding yet another complex piece of software into our data processing infrastructure, recent versions of PostgreSQL allow us to easily build a “good enough” queueing system leveraging the RDBMS’s built-in transactional capabilities. The key ingredient is “SKIP LOCKED”.
First, let’s create a table which will hold our incoming jobs, and a results table which will the result of processing. We also add a handful of dummy jobs.:
CREATE TABLE jobs ( job_id BIGSERIAL PRIMARY KEY, retries_remaining INT NOT NULL DEFAULT 3, time_created TIMESTAMP WITHOUT TIME ZONE DEFAULT NOW() NOT NULL, -- these can be whatever you want job_property1 TEXT, job_property2 TEXT ); CREATE INDEX jobs_time ON jobs(time_created); CREATE INDEX jobs_retries ON jobs(retries_remaining); CREATE TABLE results ( job_id BIGINT PRIMARY KEY REFERENCES JOBS(job_id), -- ditto - put job results here result_detail1 TEXT, result_detail2 TEXT ); INSERT INTO JOBS( job_property1, job_property2 ) VALUES ( 'big job 1', 'do important stuff 1'), ( 'big job 2', 'do important stuff 2'), ( 'big job 3', 'do important stuff 3'), ( 'big job 4', 'do important stuff 4');
Retrieving and processing a job
Next, we initiate a job by starting a transaction which locks a single job in the jobs table. Examples are written in Python with the psycopg2 module:
conn = psycopg2.connect("dbname=yourdb") cur.execute(""" UPDATE jobs j1 SET retries_remaining = retries_remaining - 1 WHERE j1.job_id = ( SELECT j2.job_id FROM jobs j2 WHERE j2.retries_remaining > 0 ORDER BY j2.time_created FOR UPDATE SKIP LOCKED LIMIT 1 ) RETURNING j1.job_id, j1.job_property1, j1.job_property2; """)
The magic sauce is the
SKIP LOCKED in the subquery.
SKIP LOCKED locks the row, as would an ordinary
However, it also allows other
SELECT’s in the meantime to ignore that row and run queries that would otherwise require it (such as those with and
ORDER BY ).
We also decrement the retries counter so we don’t have to do it later.
Of course, without
FOR UPDATE, our dequeueing statement suffers from a race-condition - two workers can attempt to retrieve the same job simultaneously.
Next, we attempt to run a job. If successful, we create a row in
results, delete the job from the queue, and commit.
If unsuccessful, we commit the tx, and allow another worker to attempt the job (until
retries_remaining reaches 0).
record = cur.fetchone() if record is None: print("no more jobs") else: (job_id, _, _) = record ## Do something fancy and long-running here. if successful_in_doing_something_fancy: cur.execute(""" INSERT INTO results (job_id, result_detail1, result_detail2) VALUES ( %s, %s, %s ); """, (job_id, "some result 1", "some result 2")) cur.execute(""" DELETE FROM jobs WHERE job_id = %s; """, (job_id,)) conn.commit()
There should be no expectation that an ad-hoc job queue in PostgreSQL should rival that of a dedicate queueing system.
However, the above method on a completely unoptimized installation of PostgreSQL 9.5 running on an AWS c4.2xlarge (8 cores, 16 gb ram, $0.40/hour) yields a not-too-shabby 7200 retrievals per second. That’s without any trickery such as unlogged tables.
Keep the queue table small
Since locking a row with
SKIP LOCKED forces the DB, upon every
SELECT on the table, to iterate through the entire table to find unlocked rows, it’s important for performance to keep the job queue table as small as possible. In the above example, I initially attempted to keep completed jobs in the jobs table with a
completed boolean flag, and then link it to the results table. However, performance degraded steadily as the table got larger. To remedy this, the example deletes the row from the
jobs table after completion.
A common pattern I’ve seen for implementing Queues in databases is to use some sort of “status” field, indicating whether a job is new, being processed by a worker, or finished. There are several problems with this pattern. First, it increases IO on the jobs table. Using this article’s method, there is a single write when a job finishes.
Second, it becomes difficult to tell when a job has failed. In our method, a worker failure would result in the transaction rolling back. (It would also reset the
retries_remaining, so it’s not perfect, but better than being stuck in an intermediate stage forever).
FOR UPDATE SKIP LOCKED, it is cumbersome to avoid race conditions.
A worker would have to retrieve a candidate job id, and do something like
UPDATE status = 'in-process' WHERE status = 'new' and job_id = my_candidate and make sure it affected a single row, repeating if not. This is because it is possible that another worker retrieved the same candidate job and ran the statement before your’s, in which case you try again.