
The team I've been working in for the last year or so have had great success using Postgres-as-queue. We've managed to avoid the following:
- Infrastructure/configuration - I'd estimate each line of terraform to be an order of magnitude more risk/maintenance/faff than each line of Python.
- Slow/crunky multi-container testing.
- The need for expertise in anything beyond Python + Postgres.
- Elaborate retry/dead-letter-queue mechanisms.
- Thinking about data serialisation over the wire.
In a nut shell, it's simpler - there are just way fewer moving parts.
As we're using a monolithic codebase with a reasonable ORM, we also have none of the CMD-clickability issues that plague ad-hoc SNS/PubSub/Kafka architectures.
The main objection to doing Postgres-as-queue is a performance one, along the lines of "don't put unnecessary extra load on the db/don't increase row churn". Let's construct a reasonable example demonstrating that queue usage shouldn't introduce much extra load in many cases. As always, before following anyone's advice on this kind of stuff, profile, profile, profile!
In the (fairly unusual) case that you're doing many tasks, none of which touch the db (say constructing and sending emails from static data), you can ignore this blog post and get on with life. In another case, you may be operating at some crazy scale where these downsides start applying, again, run the numbers and profile.
Let's imagine the db load introduced by a hypothetical task - I currently work in the energy industry, so the example might be: a customer submits a meter reading, we queue a task to write the reading and update some account balance - the load looks like:
- Receive the message from the broker.
- Make 3 primary key
SELECT
s totalling 0.3ms db time. - Make 2 slightly hairier
SELECT
s with someJOIN
s/GROUP BY
s totalling 4ms db time. - Perform 2
UPDATE
s totalling 2ms db time (and some row churn). - ACK the message.
In the new Postgres-as-queue world, this looks like:
- Poll for a message that needs processing, on finding one,
UPDATE
the status, totalling 1ms db time. - Make 3 primary key
SELECT
s totalling 0.3ms db time. - Make 2 slightly hairier
SELECT
s with someJOIN
s/GROUP BY
s totalling 4ms db time. - Perform 2
UPDATE
s totalling 2ms db time (and some row churn). - ACK the message by
UPDATE
ing the status totalling 0.5ms db time (and some row churn).
In this example, our db time has gone up from 6.3ms per task to 7.8ms. These figures are totally fictional, but we've demonstrated a reasonable way of thinking about the overhead.
If we had just one worker polling for tasks, we could ignore locking and transactions, but we want to have many, so
we have to use FOR UPDATE SKIP LOCKED
. This atomically locks the row at the point where it selects it -
there's discussion of ins and outs in this excellent blog post
by 2ndQuadrant.
For our example implementation, we have an event table that looks like:
id | status | updated_at
------------------------------------------
UUID | SMALLINT | TIMESTAMP WITH TIME ZONE
We have an INDEX
on (status, updated_at)
. In reality we have many tables, one per
queue.
Our polling workers run a loop like:
for _ in shutdown_handler.loop():
event_meta = get_event_to_process(
where_status_eq=TO_PROCESS,
set_status_to=PROCESSING,
)
if event_meta is None:
time.sleep(x)
continue
try:
set_status(event_meta, PROCESSED)
except:
set_status(event_meta, ERRORED, ...)
And get_event_to_process(...)
performs SQL along the lines of:
WITH ids AS MATERIALIZED (
SELECT id FROM event_queue
WHERE status = {where_status_eq}
ORDER BY updated_at
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE event_queue
SET status = {set_status_to}
WHERE id = ANY(SELECT id FROM ids)
RETURNING id
Note the use of MATERIALISED
to force the CTE to evaluate eagerly before the UPDATE
(aside: I'd like a postgres expert to assert that this query is truly race condition free).
set_status(...)
just performs an update of status
and updated_at
for a
particular row.
Because you're simply interacting with a persistent table rather that some black-box queue, it's easy to add bells and whistles as your requirements change.
Retrying
Sometimes tasks fail/timeout. We have jobs that periodically poll for old tasks that have weird statuses and attempt to retry them as appropriate.
Ignore before
We have one more timestamp column on our event_queue
tables - ignore_before
. This is useful
in two scenarios:
- We can represent timeouts (eg. "send an email if we didn't receive inbound x after 10 days") as regular ol' events.
- We want to batch up certain types of outbound event, so we can set their
ignore_before
to "at the next whole hour" and bundle up a load of events at dispatch-time.
Cruft cleanup
You may want have cron jobs that delete queue data older than some time.
Shutdown handler
The following is a nice helper for polling loops that aids with shutdown handling, and times itself out after an hour of no activity.
import os, signal, threading
INTERRUPT_TIMEOUT = 60 * 60
work_done: threading.Event
def kill_after_timeout() -> None:
global work_done
work_done = threading.Event()
if work_done.wait(INTERRUPT_TIMEOUT):
return
os.kill(os.getpid(), signal.SIGKILL)
class ShutdownHandler:
def __init__(self, max_loops: int | None = None) -> None:
self.exit_flag = False
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signal: int, frame: FrameType | None) -> None:
self.exit_flag = True
def loop(self) -> Iterator[None]:
global work_done
while True:
if self.exit_flag():
work_done.set()
return
threading.Thread(target=kill_after_timeout, daemon=True).start()
yield None
work_done.set()