通过Postgres WAL监听数据库变更
Listen to Database Changes Through the Postgres WAL

原始链接: https://peterullrich.com/listen-to-database-changes-through-the-postgres-wal

## Postgres WAL 用于实时变更跟踪:摘要 本文深入探讨了使用 Postgres 的预写式日志 (WAL) 来响应数据库变更,提供了一种强大的替代方案,可替代 `NOTIFY/pg_notify`,后者可能成为高吞吐量系统的瓶颈。WAL 记录*所有*数据库变更,从而实现诸如复制、备份以及关键的变更数据捕获 (CDC) 等功能。 传统方法如 `pg_notify` 存在单个队列的问题,限制了可扩展性并提供了不可靠的传递。然而,WAL 提供了一个可靠的变更流。要访问它,需要配置 Postgres 以存储“逻辑”WAL 记录。这涉及设置 `wal_level = logical` 并重启数据库。 该过程利用*复制槽*——WAL 的专用副本——来跟踪进度并允许在侦听器失败时重新传递消息。临时槽会自动清理,防止磁盘空间问题,但有丢失停机期间变更的风险。持久槽会保留记录,确保不会丢失数据,但需要仔细管理。 本文详细介绍了使用 Elixir 和 Postgrex 设置侦听器,强调了解码 WAL 消息(BEGIN、INSERT、COMMIT 等)和处理潜在重复消息的重要性。最终,利用 WAL 提供了一种可扩展且可靠的方式来构建实时功能,如通知和审计日志,而不会影响数据库性能。

黑客新闻 新 | 过去 | 评论 | 提问 | 展示 | 招聘 | 提交 登录 通过Postgres WAL监听数据库变更 (peterullrich.com) 7点 由 pjullrich 1小时前 | 隐藏 | 过去 | 收藏 | 讨论 指南 | 常见问题 | 列表 | API | 安全 | 法律 | 申请YC | 联系 搜索:
相关文章

原文

Welcome back, friends!

Did you hear the news about the inventor of the Knock-Knock joke?

He got a No-bell prize.

Alright, this post will be a bit longer since we'll dive deeply into the Postgres WAL and will explore how to listen for database changes through it.

NOTIFY/pg_notify and triggers.

pg_notify on a small, not so active database table. It works! Great.

You roll it out to a larger, much more active database table. It works, but suddenly your query times explode! Instead of doing 1 million transactions per second* you can now do only 3 (*These numbers were exaggerated for dramatic effect)

So, what happened?

The problem with Postgres' NOTIFY is that all notifications go through a single queue!

Even if you have 20 database connections making 20 transactions in parallel, all of them need to wait for their turn to lock the notification queue, add their notification, and unlock the queue again. This creates a bottleneck especially in high-throughput databases.

pg_notify also has other limitations:

  • Its notification size is limited to 8000 bytes. This might not be enough to send all the data in the notification and you'll need to query the database for the full dataset which adds additional load to your database.
  • It does not wait for an acknowledgment of the listener and does not retry messages which means if your listener is dead or can't receive the message, the notification is lost.
  • If you call pg_notify or NOTIFY inside a trigger, it will get called 100,000 times and send out 100,000 notifications if you change 100,000 rows in a single transaction which from a performance perspective is ... not ideal.

You realize that pg_notify doesn't work for you. But there must be another way, right?

There is and it's called the Postgres Write-Ahead Log (WAL).

WAL is a log of every change to your database. Every transaction must save its changes to both the WAL and the data pages, or it will be rolled back.

The WAL's main benefit is that it lets Postgres delay writing data pages to disk after a transaction finishes. Without the WAL, Postgres would have to save all changes to disk right after a transaction completes. If it didn't and the system crashed, the changes would be lost.

With the WAL, Postgres only needs to write one file to disk (the WAL) and can wait to write the data pages later. If a crash happens, it can recreate the data pages from their latest version on disk and the changes tracked in the WAL. This significantly reduces the writes to disk, which are slow compared to changing a row in memory, and it allows Postgres to support much higher transaction throughputs.

The WAL allows Postgres to support a range of other functionalities as well like:

  • Database replication - replaying the WAL's changes on a follower database is the most common method for creating database replicas
  • Online/streamed backups - following the WAL means you can have database backups that are always up-to-date. Without the WAL, you'd need to take snapshots at a high frequency and would still miss the latest changes!
  • Point-in-time recovery (PITR) - did you ever wonder how a database provider can rebuild your database at a specific point in time? By recording the WAL and rebuilding your database based on its changes up to that time!
  • Change Data Capture (CDC) - the WAL enables listening to database changes in real-time, which is what this post is all about! You can use it to send notifications and maintain audit logs, all without having to change your application code.

Now, let's see how we can listen to the WAL using Elixir.

this exploration, written by my friend Filipe Cabaço, who nerd-sniped me into this topic, which caused me to write about it, which eventually forced you to read about it! So, now you know who to blame if you're unhappy with this post ;-)

Also, a warning that the following code wasn't tested in production. See Supabase's Realtime repo for a production-ready solution (and blame Filipe for any bugs who has written a large chunk of it).

I'll introduce the solution piece-by-piece but you can find the complete and final version on my GitHub

wal_level is similar to a log level and it determines how much information is written to the WAL.

By default, it is set to replica which stores enough information to replicate the database, recover lost state after a crash, and enable point-in-time recovery. It balances overhead with functionality and serves well as a default.

If you don't need the functionality of the replica level, you can set your database to track only minimal records. This level supports crash recovery, but not database replication or point-in-time recovery. It is faster than replica though.

For our use-case, we need the third level, logical. This level stores the same data as replica but also tracks changes in a more high-level format of what changed (e.g. "a row with these values was inserted into this table") rather than how it changed (e.g. "these bytes were written to this page"). It can lead to significantly more overhead, especially if you configured tables with REPLICA IDENTITY FULL or make many UPDATE or DELETE calls, so keep an eye on your database usage levels after changing the wal_level.


Postgrex.ReplicationConnection documentation

Publication is a way to tell Postgres which tables and operations (inserts, updates, deletes) you want to track. Think of it as an event "topic" which will only broadcast events for the tables and operations you have specified. Here's an example:

CREATE PUBLICATION active_user_publication

FOR TABLE users

-- Filters are available only from Postgres 15+

WHERE (active IS TRUE)

WITH (publish = 'insert,delete');

The publication above will publish only the INSERT, DELETE, BEGIN, and COMMIT events for the active users in the users table.

Here's another example:

CREATE PUBLICATION everything_publication FOR ALL TABLES;

This publication will publish all events (BEGIN, INSERT, UPDATE, DELETE, and COMMIT) for all rows and tables. It might be a lot, so when you add your listener, rather restrict the publication to only what you need. You can always change a publication later with a ALTER PUBLICATION call.

Alright, now you know about publications and how to create them. Next, let's dive into replication slots.

Replication Slot is a separate copy of the WAL just for our use-case, which is a WAL listener. It tracks how far the listener has read through the WAL and removes WAL records only once the listener has acknowledged them. By using a separate copy of the WAL, we can keep records that the listener hasn't processed yet, even if they've been deleted from the original WAL. It also lets us resend records if the listener crashes.

Now, if you had a peek at Filipe's or Supabase's implementation above, you might have spotted that they use a temporary replication slot. The reason for this is that replication slots can create serious problems for the database if we don't consume them. They will keep on growing until they fill up all disk space allocated to the WAL because Postgres deletes their WAL records only after we consume them.

A temporary replication slot avoids this issue because Postgres deletes it whenever we lose connection to the database. So, if our listener crashes or we remove the listener from our application, Postgres will automatically clean up the replication slot for us which is great from a not-taking-down-the-database perspective.

The downside of using a temporary replication slot is that we have to keep the listener online at all times if we want to listen to all database changes. You might see the problem with this already. If the listener is offline, it will miss change events which is not great from a not-missing-any-events perspective.

If never missing a database change is important to you, a persistent replication slot might be the better choice. Postgres will always write WAL records to it, even if your listener is offline. When it comes back online, the replication slot will send the WAL records it has missed. If the listener crashes before it can acknowledge the records, the replication slot will resend them when the listener restarts.

Be warned though that persistent replication slots allow only one connection, so you can have only one listener at a time. Also, every implementation I've seen in my research for this post has warned against using a persistent slot, but other than filling up the disk, I have yet to understand what other downsides it brings.

Alright, now with these pros and cons in mind, let's continue. I will show you how to create a persistent replication slot. If you want a temporary one, have a look at Supabase's implementation above.

output plugin the replication slot should use to convert the raw WAL record to a readable output format. pgoutput is the default and comes with Postgres but we could also use other plugins like wal2json. The third argument false means that this replication slot is not temporary.

Alright. Now, if you execute these two migrations with mix ecto.migrate, you should see the following records in your database:

> SELECT * FROM pg_publication;

oid pubname pubowner puballtables pubinsert pubupdate pubdelete pubtruncate pubviaroot pubgencols

16409 wal_listener_publication 10 TRUE TRUE TRUE TRUE TRUE FALSE n

> SELECT * FROM pg_replication_slots;

slot_name plugin slot_type datoid database temporary active active_pid xmin catalog_xmin restart_lsn confirmed_flush_lsn wal_status safe_wal_size two_phase two_phase_at inactive_since conflicting invalidation_reason failover synced

wal_listener pgoutput logical 26370 wal_dev FALSE FALSE NULL NULL 142253 4/4FC8CB80 4/4FC8CBB9 reserved NULL FALSE NULL 2025-10-27 14:30:54.98639+00 FALSE NULL FALSE FALSE

If you see an output similar to the one above, your migrations successfully created the publication and replication slot we need to listen to the WAL. So, let's do that next.

Postgrex.ReplicationConnection which handles the low-level communication with Postgres for us. Here is the first part of our Listener module:

defmodule Wal.Replication do

use Postgrex.ReplicationConnection

require Logger

def start_link(_opts) do

config = Wal.Repo.config()

# Automatically reconnect if we lose connection.

extra_opts = [

auto_reconnect: true

]

Postgrex.ReplicationConnection.start_link(

__MODULE__,

:ok,

extra_opts ++ config

)

end

@impl Postgrex.ReplicationConnection

def init(:ok) do

{:ok, %{messages: [], relations: %{}}}

end

@impl Postgrex.ReplicationConnection

def handle_connect(state) do

query =

"""

START_REPLICATION SLOT wal_listener

LOGICAL 0/0

(proto_version '1', publication_names 'wal_listener_publication')

"""

Logger.debug(query)

{:stream, query, [], state}

end

end

Now, what happens here?

First, we pull in all the nitty-gritty low-level logic necessary to connect, reconnect, query, and stream from Postgres with just one line:

use Postgrex.ReplicationConnection

Next, we define the start_link/1 function to start the listener. We pull in the already existing database configuration from our Repo with Wal.Repo.config(). This allows us to connect the WAL listener to our application database without having to duplicate the connection configuration but you could also provide your own connection details.

We add the extra option auto_reconnect: true to instruct Postgrex to automatically reconnect our ReplicationConnection. And lastly, we call the start_link/3 function on Postgrex.ReplicationConnection which uses :gen_statem and the Postgrex.Protocol to create a database connection for us.

Now that we defined how to start the listener, we must implement the init/1 callback. It simply returns our initial state with is %{messages: [], relations: %{}}. The messages list will hold the WAL messages we receive from Postgres until we can handle them and the relations map will hold information about the tables in which the WAL changes occurred. But more about these later.

Lastly, we implement one optional callback handle_connect/1. This callback allows us to send a query to Postgres after the database connection was established.

START_REPLICATION SLOT wal_listener

LOGICAL 0/0

(proto_version '1', publication_names 'wal_listener_publication')

With this query, we tell Postgres that it can start sending the logical WAL records from the wal_listener replication slot our way. We tell it to start at the Log Sequence Number (LSN) 0/0, which is the very first record ever recorded. This doesn't mean it will send us all WAL records ever created though.

Our replication slot starts tracking WAL records only after we create it and it doesn't backfill records that happened before. So, unless we create our replication slot before we store a single row, it will miss WAL records, just keep that in mind.

Lastly, we provide the options proto_version and publication_name. The publication name is the publication we've created before. As proto version, we use version 1. You can also use proto version 2, 3, or 4 which are useful if you have very large transactions or two-phase transactions.

On proto version 1, Postgres will wait until very large transactions commit and only then send the WAL records, but in version 2 and 4, it will send the records while the transactions are still in progress, optionally in parallel on version 4. Version 3 supports two-phase commits, which are useful to prepare and commit distributed transactions in multiple databases at once.

If you need these advanced features, make sure to also add the streaming 'on' or 'parallel' option for streaming in-progress transactions and the two_phase TRUE options to the START_REPLICATION query.

Alright, now that we told Postgres to start sending WAL records our way, let's see how we can receive and decode the messages, starting with the keep-alive heartbeat.

START_REPLICATION, you'll see that Postgres will send us two types of messages: XLogData and the Primary Keep-Alive Message. XLogData contains the data of one logical WAL record in binary format. The Primary Keep-Alive Message is a heartbeat from Postgres to our listener and has four purposes:

  1. It checks that our listener is still alive, hence the name. If our listener fails to respond, Postgres will close the connection.
  2. It tells the listener that our Postgres instance is still alive and our replication slot is active.
  3. It provides the LSN of the latest WAL record our replication slot has recorded.
  4. It gives the listener the opportunity to acknowledge the XLogData messages it has received since the last keep-alive message by replying with a Standby Status Update or Hot Standby Feedback message.

Let's implement the last step, which is to receive and to reply to a keep-alive message.

Here's how the listener handles this:

# Primary Keep Alive Message

# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-PRIMARY-KEEPALIVE-MESSAGE

@impl Postgrex.ReplicationConnection

def handle_data(<<?k, wal_end::64, _server_time::64, should_reply::8>>, state) do

messages =

case should_reply do

# Standby Status Update

# https://www.postgresql.org/docs/current/protocol-replication.html#PROTOCOL-REPLICATION-STANDBY-STATUS-UPDATE

1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]

0 -> []

end

Logger.debug("Responding to keep alive: #&lbrace;should_reply&rbrace; - #&lbrace;inspect(messages)&rbrace;")

&lbrace;:noreply, messages, state&rbrace;

end

Our handle_data/2 callback receives both the keep-alive and XLogData messages. Here, we match against the keep-alive message which starts with the character k. After the identifier, we receive the latest LSN our replication slot has recorded, the server time, and a one byte flag should_reply which Postgres might set if it wants to know that our listener is still active.

If we should reply, we respond with a Standby Status Update message. This message starts with an r as identifier, followed by the LSN of the last message that we have received and written to disk (first LSN), that we have flushed to disk (second LSN), and that we have applied (third LSN). In this case, we are not too sensitive about losing messages, which is why we simply return the last LSN from the server three times, assuming that we have received and processed all messages it has sent to us successfully.

However, if we wanted to be absolutely sure that we never lose a message, we could track which messages we wrote to a buffer (first LSN), which messages we flushed from the buffer to disk or other persistence method (second LSN) and which messages we applied to our database or other state (third LSN). Postgres will consider a message acknowledged only after we mark it as "applied". We usually don't need this level of detail but it's good to know about.

We respond with the simplified Standby Status Update message, but we could also have sent a Hot Standby Feedback Message. The difference between Standby and Hot Standby is that Hot Standbys run queries against their state and these queries might touch records that have been deleted on the primary but are still needed on the Hot Standby. For example, imagine that your listener receives the INSERT message for a record, adds it to its state, and then runs a query against that state. Now, if Postgres sends us a DELETE message for that record before our query completes, the listener cannot apply that message until the query completes. Otherwise, it would "pull the rug" from underneath the query so to speak.

So, the listener buffers the message and waits for the query to complete. While it is waiting, it would reply to Postgres with a Hot Standby Feedback Message that tells Postgres "Hey, hang on a sec. I first gotta finish this work before I can acknowledge all the new messages you've sent me". Postgres would then keep the WAL records that it has sent until the listener sends a regular Standby Status Update which acknowledges the old WAL records. Again, you probably won't ever need to send a Hot Standby Feedback Message, but it's good to know about.

Realtime decoder or the PgoutputDecoder project.

Now, here's how to decode the BEGIN, INSERT, and COMMIT messages:

defmodule Wal.Decoder do

# Begin

# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-BEGIN

def parse(<<?B, lsn_end::64, _timestamp::64, tx_id::32>>) do

%&lbrace;type: :begin, lsn_end: to_lsn(lsn_end), tx_id: tx_id&rbrace;

end

# Insert

# Without transaction_id because we are not streaming transactions

# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-INSERT

def parse(<<?I, relation_id::32, ?N, tuple_data::bytes>>) do

%&lbrace;type: :insert, relation_id: relation_id, data: parse_tuple_data(tuple_data)&rbrace;

end

# Commit

# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-COMMIT

def parse(<<?C, _flags::8, lsn::64, lsn_end::64, _timestamp::64>>) do

%&lbrace;type: :commit, commit_lsn: to_lsn(lsn), tx_end_lsn: to_lsn(lsn_end)&rbrace;

end

# Relation

# https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-RELATION

def parse(<<?R, relation_id::32, rest::bytes>>) do

# Parses a Relation message

end

defp parse_tuple_data(data) do

# Parses a TupleData struct

end

def to_lsn(lsn_int) do

# Converts an integer LSN to the hex format "XXX/XXX"

end

end

As you can see, every message starts with an identifiying character and otherwise follows the specs in the docs. I only stubbed the parse_tuple_data/1 and to_lsn/1 functions and the Relation parser, since this post is already long enough. Please see my partial Decoder on GitHub for more details on the actual implementation.

Alright, now our listener can receive and decode WAL messages. Let's take it for a spin and see what events we receive for a typical insert transaction.

ReorderBuffer which buffers WAL records before forwarding them to pgoutput.

The ReorderBuffer doesn't send the WAL records as they come in, but it first buffers and reorders them - hence the name - and only sends them to pgoutput when the transaction commits. If a transaction is rolled back, it won't forward the messages (see 49.6.4) and we'll never receive them either. The ReorderBuffer forwards messages in sequence one transaction at a time, not in parallel. When pgoutput receives these messages, it prepends a BEGIN message to indicate the beginning of a transaction, and sends them our way.

All of this means you don't have to worry about receiving messages out-of-order or interleaved with messages from other transactions (at least not if you use proto_version '1'), or handling rolled back vs committed transactions. Our listener will always receive ordered messages for one committed transaction at a time. Thank you Postgres!

here. What you do with the data depends on your use-case. Be creative!

  1. Walex or Cainophile which use the WAL for Change Data Capture (CDC) in Elixir.
  2. Peruse the Supabase Realtime for inspiration.
  3. Look into creating a Snapshot when creating the Replication Slot which saves the current state of the database when you create your replication slot.
  4. Call me a liar or how much this post inspired you over on Bluesky. Your choice.

book or video courses (one and two). Follow me on Bluesky or subscribe to my newsletter below if you want to get notified when I publish the next blog post. Until next time! Cheerio 👋

联系我们 contact @ memedata.com