Postgres 作为队列
Postgres as queue

原始链接: https://leontrolski.github.io/postgres-as-queue.html

使用 PostgreSQL 作为队列,而不是单独的服务或基础设施组件,可以简化系统设计和操作的许多方面,同时降低运营费用。 虽然为许多任务引入了最小的数据库负载,但这种方法消除了复杂配置的需要,并且可以更轻松地与现有代码库集成。 这个概念涉及更新排队消息的状态列,允许多个并行工作线程同时处理队列,而不需要复杂的事务和锁管理技术。 此外,由于任务在关系数据库的范围内运行,因此重试、忽略特定时间戳之前以及清理残渣等附加功能变得更容易实现。 此外,实现关闭处理程序有助于通过单一通信方法正常退出应用程序,为开发人员塑造应用程序架构和设计模式提供宝贵的支持。 通过利用 PostgreSQL 作为队列,开发人员可以显着降低系统复杂性和成本,而无需牺牲成功软件开发实践所需的关键功能。

不,这并不是指将 postgresql 作为队列。 由于在较大的工作流程中跟踪已完成流程的状态存在挑战,我们在使用单独的消息传递层实现队列时遇到了问题。 然而,使用 PostgreSQL 作为队列,特别是数据库本身内的侦听器和通知事件,解决了这些问题,允许我们使用一致的状态更新和可靠的交付来处理工作流程的所有方面。 总体而言,向混合模型的过渡有助于简化基础设施并缓解许多技术难题,同时提供更全面、更高效的体验。
相关文章

原文

The team I've been working in for the last year or so have had great success using Postgres-as-queue. We've managed to avoid the following:

  • Infrastructure/configuration - I'd estimate each line of terraform to be an order of magnitude more risk/maintenance/faff than each line of Python.
  • Slow/crunky multi-container testing.
  • The need for expertise in anything beyond Python + Postgres.
  • Elaborate retry/dead-letter-queue mechanisms.
  • Thinking about data serialisation over the wire.

In a nut shell, it's simpler - there are just way fewer moving parts.

As we're using a monolithic codebase with a reasonable ORM, we also have none of the CMD-clickability issues that plague ad-hoc SNS/PubSub/Kafka architectures.

The main objection to doing Postgres-as-queue is a performance one, along the lines of "don't put unnecessary extra load on the db/don't increase row churn". Let's construct a reasonable example demonstrating that queue usage shouldn't introduce much extra load in many cases. As always, before following anyone's advice on this kind of stuff, profile, profile, profile!

In the (fairly unusual) case that you're doing many tasks, none of which touch the db (say constructing and sending emails from static data), you can ignore this blog post and get on with life. In another case, you may be operating at some crazy scale where these downsides start applying, again, run the numbers and profile.

Let's imagine the db load introduced by a hypothetical task - I currently work in the energy industry, so the example might be: a customer submits a meter reading, we queue a task to write the reading and update some account balance - the load looks like:

  • Receive the message from the broker.
  • Make 3 primary key SELECTs totalling 0.3ms db time.
  • Make 2 slightly hairier SELECTs with some JOINs/GROUP BYs totalling 4ms db time.
  • Perform 2 UPDATEs totalling 2ms db time (and some row churn).
  • ACK the message.

In the new Postgres-as-queue world, this looks like:

  • Poll for a message that needs processing, on finding one, UPDATE the status, totalling 1ms db time.
  • Make 3 primary key SELECTs totalling 0.3ms db time.
  • Make 2 slightly hairier SELECTs with some JOINs/GROUP BYs totalling 4ms db time.
  • Perform 2 UPDATEs totalling 2ms db time (and some row churn).
  • ACK the message by UPDATEing the status totalling 0.5ms db time (and some row churn).

In this example, our db time has gone up from 6.3ms per task to 7.8ms. These figures are totally fictional, but we've demonstrated a reasonable way of thinking about the overhead.

If we had just one worker polling for tasks, we could ignore locking and transactions, but we want to have many, so we have to use FOR UPDATE SKIP LOCKED. This atomically locks the row at the point where it selects it - there's discussion of ins and outs in this excellent blog post by 2ndQuadrant.

For our example implementation, we have an event table that looks like:

id   | status   | updated_at
------------------------------------------
UUID | SMALLINT | TIMESTAMP WITH TIME ZONE

We have an INDEX on (status, updated_at). In reality we have many tables, one per queue.

Our polling workers run a loop like:

for _ in shutdown_handler.loop():  
    event_meta = get_event_to_process(
        where_status_eq=TO_PROCESS,
        set_status_to=PROCESSING,
    )
    if event_meta is None:
        time.sleep(x)  
        continue

    try:
        
        set_status(event_meta, PROCESSED)
    except:
        set_status(event_meta, ERRORED, ...)

And get_event_to_process(...) performs SQL along the lines of:

WITH ids AS MATERIALIZED (
    SELECT id FROM event_queue
    WHERE status = {where_status_eq}
    ORDER BY updated_at
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
UPDATE event_queue
SET status = {set_status_to}
WHERE id = ANY(SELECT id FROM ids)
RETURNING id

Note the use of MATERIALISED to force the CTE to evaluate eagerly before the UPDATE (aside: I'd like a postgres expert to assert that this query is truly race condition free).

set_status(...) just performs an update of status and updated_at for a particular row.

Because you're simply interacting with a persistent table rather that some black-box queue, it's easy to add bells and whistles as your requirements change.

Retrying

Sometimes tasks fail/timeout. We have jobs that periodically poll for old tasks that have weird statuses and attempt to retry them as appropriate.

Ignore before

We have one more timestamp column on our event_queue tables - ignore_before. This is useful in two scenarios:

  • We can represent timeouts (eg. "send an email if we didn't receive inbound x after 10 days") as regular ol' events.
  • We want to batch up certain types of outbound event, so we can set their ignore_before to "at the next whole hour" and bundle up a load of events at dispatch-time.

Cruft cleanup

You may want have cron jobs that delete queue data older than some time.

Shutdown handler

The following is a nice helper for polling loops that aids with shutdown handling, and times itself out after an hour of no activity.

import os, signal, threading

INTERRUPT_TIMEOUT = 60 * 60  
work_done: threading.Event


def kill_after_timeout() -> None:
    global work_done
    work_done = threading.Event()
    if work_done.wait(INTERRUPT_TIMEOUT):
        return
    os.kill(os.getpid(), signal.SIGKILL)


class ShutdownHandler:
    def __init__(self, max_loops: int | None = None) -> None:
        self.exit_flag = False
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)

    def signal_handler(self, signal: int, frame: FrameType | None) -> None:
        self.exit_flag = True

    def loop(self) -> Iterator[None]:
        global work_done
        while True:
            if self.exit_flag():
                work_done.set()
                return
            
            threading.Thread(target=kill_after_timeout, daemon=True).start()
            yield None
            work_done.set()  
联系我们 contact @ memedata.com