PolarDB PostgreSQL版支持使用邏輯復制來復制數據定義命令 (DDL)。
背景信息
原生PostgreSQL僅支持表數據的同步,需要用戶手動在發布端和訂閱端建立相同定義的表,才能確保表數據正確的同步。
PolarDB PostgreSQL版擴展了邏輯復制的能力,支持數據定義語言(DDL)邏輯復制,即通過發布訂閱的方式可以復制數據庫對象的CREATE/ALTER/DROP
行為到訂閱端。
前提條件
DDL邏輯復制功能需要將wal_level
參數設置為logical
,修改參數的具體操作請參見設置集群參數。
語法
CREATE PUBLICATION
CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] publication_parameter: ... pubddl = '(none | table | all)'
其中,
publication_parameter
增加了pubddl
參數,取值為none
、table
、all
。none(默認):表示不復制DDL。
table:表示僅復制
table
相關的DDL語句。CREATE TABLE
ALTER TABLE
DROP TABLE
CREATE TABLE AS
all:表示復制所有的DDL語句,目前支持的DDL語句如下:
ALTER INDEX
ALTER SEQUENCE
ALTER TABLE
ALTER TYPE
CREATE INDEX
CREATE SCHEMA
CREATE SEQUENCE
CREATE TABLE
CREATE TABLE AS
CREATE TYPE
CREATE TYPE HEADER
CREATE TYPE BODY
DROP INDEX
DROP SCHEMA
DROP SEQUENCE
DROP TABLE
DROP TYPE
說明如果指定
pubddl = 'all'
,則必須指定FOR ALL TABLES
。 全局命令可以在任何數據庫執行,目前不支持復制,全局命令包括ROLE
、DATABASE
、TableSpace
語句和一些GrantStmt
/RevokeStmt
(如果目標對象是全局對象)。
CREATE SUBSCRIPTION
CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] subscription_parameter: ... dump_schema = false/true
其中,
subscription_parameter
新增dump_schema
參數,支持在創建訂閱時將發布端的存量對象定義dump
到訂閱端,默認值為false,取值如下:false(默認):表示不支持在創建訂閱時將發布端的存量對象定義
dump
到訂閱端。true:表示支持在創建訂閱時將發布端的存量對象定義
dump
到訂閱端。
dump_schema
使用了pg_dump/pg_restore工具,需要確保集群訪問限制支持host='127.0.0.1'
連接,否則會恢復失敗。 dump的文件存放在集群的本地目錄pg_logical/schemadumps
中,恢復或出錯后會刪除。
參數說明
參數 | 說明 |
polar_enable_ddl_replication | 開啟或關閉DDL邏輯復制功能。取值如下:
|
polar_enable_debug_ddl_replication | 開啟或關閉debug ddl replicaiton,打印更多日志。取值如下: true:開啟debug ddl replicaiton。 false(默認):關閉debug ddl replicaiton。 |
示例
創建一個支持DDL的發布。
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
顯示結果如下:
CREATE PUBLICATION
創建一個訂閱。
CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;
顯示結果如下:
NOTICE: created replication slot "mysub" on publisher CREATE SUBSCRIPTION
在發布端執行SQL。
# 創建表 CREATE TABLE t1(id int ,val char(3)); # 插入數據 INSERT INTO t1 values (1,'a'); INSERT INTO t1 values (2,'b'); INSERT INTO t1 values (3,'c'); # 修改表 ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY, ALTER COLUMN c SET GENERATED ALWAYS; # 查看表內容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注釋 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Publications: "mypub" Replica Identity: FULL Access method: heap
在訂閱端查看復制情況。
# 查看表內容 SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # 查看注釋 \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Replica Identity: FULL Access method: heap
在發布端刪除表。
DROP TABLE t1;
在訂閱端查看復制情況。
SELECT * FROM t1;
顯示結果如下:
ERROR: relation "t1" does not exist LINE 1: SELECT * FROM t1;
解碼插件
解碼插件新增以下兩個回調接口。
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
{
...
LogicalDecodeDDLMessageCB ddl_cb;
/* streaming of changes */
...
LogicalDecodeStreamDDLMessageCB stream_ddl_cb;
} OutputPluginCallbacks;
/*
* Called for the logical decoding DDL messages.
*/
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
/*
* Callback for streaming logical decoding DDL messages from in-progress
* transactions.
*/
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
test_decoding
插件中已經實現ddl message的方法,使用test decoding
插件的方法如下:
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid |
data
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
0/C001BF10 | 783 | BEGIN 783
0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
%{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present":
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
0/C001EE98 | 783 | COMMIT 783
select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
ddl_deparse_expand_command
-------------------------------------------------------------------------
CREATE TABLE public.t3 (id pg_catalog.int4 STORAGE plain )
(1 row)
新增系統表與系統函數
polar_catalog.ddl_deparse_to_json
定義:
ddl_deparse_to_json(IN pg_ddl_command) RETURN text
說明:將內部的parsetree解析成JSON字符串。
參數:輸入pg_ddl_command類型parsetree, 返回text類型的JSON串。
polar_catalog.ddl_deparse_expand_command
定義:
ddl_deparse_expand_command(IN text) RETURN text
說明:將JSON string解析成SQL string。
參數:輸入text類型JSON串, 返回text類型的SQL字符串。
polar_catalog.polar_publication
定義如下:
TABLE polar_publication ( puboid Oid primary key, -- publication oid pubddl "char", -- publication是否支持ddl object pubglobal "char", -- publication是否支持global object(未來支持) pubflags int -- 預留標記位 );