PolarDB PostgreSQL版提供wal2json插件,可以將邏輯日志文件輸出為JSON格式。
前提條件
支持的PolarDB PostgreSQL版的版本如下:
PostgreSQL 14(內核小版本14.5.1.0及以上)
PostgreSQL 11(內核小版本1.1.29及以上)
您可通過如下語句查看PolarDB PostgreSQL版的內核小版本的版本號:
PostgreSQL 14
select version();
PostgreSQL 11
show polar_version;
背景信息
wal2json是邏輯解碼插件,具體功能如下:
可以訪問由
INSERT
和UPDATE
生成的元組。根據配置的副本身份,可以訪問
UPDATE
和DELETE
舊行版本。可以使用流協議(邏輯復制插槽)或特殊的SQL API來使用更改。
wal2json插件會在每個事務中生成一個JSON對象。JSON對象中提供了所有新/舊元組,額外選項還可以包括事務時間戳、限定架構、數據類型、事務ID等屬性。詳情請參見通過SQL獲取JSON對象。
注意事項
由于PolarDB PostgreSQL版采用的replication方式為
REPLICA_IDENTITY_FULL
,所以在更新和刪除時顯示的數據為整行數據,并不是默認的只涉及更新和刪除前后的列。要想修改只涉及更新前后的列,需要關閉polar_create_table_with_full_replica_identity
參數。該參數不能通過控制臺進行修改,請進行修改。
wal2json插件需要用到邏輯編解碼的功能,需要將
wal_level
參數修改為logical
,該參數可以在控制臺上進行修改。
通過SQL獲取JSON對象
wal2json插件不需要通過create extension
來創建,而是通過邏輯復制槽來裝載wal2json插件進行使用。
創建完含有wal2json插件的邏輯復制槽后,通過如下命令獲取WAL中的JSON對象。
-- 創建帶主鍵和不帶主鍵的表
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
-- 創建wal2json類型的邏輯復制槽
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
-- 提交事務,寫入WAL
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
DELETE FROM table2_with_pk WHERE a < 3;
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
-- 獲取WAL中的JSON對象
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
結果如下:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
DELETE 2
INSERT 0 1
UPDATE 1
COMMIT
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
參數說明
wal2json相關參數說明如下:
參數 | 說明 |
change | 每次DML的WAL條目。例如,Insert、Update、Delete、Truncate的WAL記錄。 |
changeset | 若干個change的集合。 |
include-xids | 用于控制是否添加xid到每一個changeset,默認值為false,取值如下:
|
include-timestamp | 用于控制是否添加timestamp到每一個changeset,默認值為false,取值如下:
|
include-schemas | 用于控制是否添加schema到每一個change,默認值為true,取值如下:
|
include-types | 用于控制是否添加type到每一個change,默認值為true,取值如下:
|
include-typmod | 將修飾符添加到具有修飾符的類型(例如,varchar(20)而不是varchar),默認值為true,取值如下: |
include-type-oids | 用于控制是否添加類型oids,默認值為false,取值如下:
|
include-not-null | 用于控制是否添加
|
pretty-print | 用于控制是否向JSON結構添加空格和縮進,進行格式化,默認值為false,取值如下:
|
write-in-chunks | 用于控制是否是每次change后都寫,而不是每個changeset,默認值為false,取值如下:
|
include-lsn | 用于控制是否添加nextlsn到每一個changeset,默認值為false,取值如下:
|
filter-tables | 排除指定表。默認為空,表示不過濾任何表。 說明
|
add-tables | 指定解析特定表,默認是解析所有schema下的所有表。用法和filter-tables相同。 |
filter-msg-prefixes | 排除指定prefix的行,通常用于 |
add-msg-prefixes | 增加指定prefix的行,通常用于 |
format-version | 定義使用哪種輸出格式,默認值為1。取值如下:
|
actions | 定義輸出哪種操作。默認為所有(Insert,Update,Delete和Truncate)。如果您使用 |
示例
以include-xids
為例說明參數如何使用。
創建表和邏輯復制槽,并插入一行數據。
DROP TABLE IF EXISTS tbl; CREATE TABLE tbl (id int); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); INSERT INTO tbl VALUES (1);
將參數名和參數內容依次填入函數。
SELECT count(*) = 1, count(distinct ((data::json)->'xid')::text) = 1 FROM pg_logical_slot_get_changes( 'regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '1');
原理設計
更多信息和原理設計請參見官方使用文檔。