當您需要將Kafka數據寫入云原生數據倉庫AnalyticDB PostgreSQL版,且不希望使用其他數據集成工具時,可以通過實時數據消費功能直接消費Kafka數據,減少實時處理組件依賴,提升寫入吞吐。
Apache Kafka是一個容錯、低延遲、分布式的發布-訂閱消息系統。Streaming Server支持從Apache和Confluent Kafka發行版中加載Kafka數據。通過云原生數據倉庫AnalyticDB PostgreSQL版可讀外表對Kafka數據進行轉換,并將數據寫入云原生數據倉庫AnalyticDB PostgreSQL版目標表中。
前提條件
Kafka服務與云原生數據倉庫AnalyticDB PostgreSQL版實例需在同一專有網絡(VPC)。
已在Kafka服務中生成了大量樣例數據。本文以阿里云云消息隊列Kafka版為例,具體信息如下。
接入點信息:alikafka-post-cn-wwo3hflb****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflb****-3-vpc.alikafka.aliyuncs.com:9092
Topic:test_topic
consumer group:test_consumer_group
已在云原生數據倉庫AnalyticDB PostgreSQL版中創建目標用戶和目標表,同時在任務中使用的數據庫用戶需要具備以下權限。
使用gpfdist協議創建只讀外表的權限。
任務中配置的數據庫Schema的USAGE和CREATE權限。
任務中配置的寫入目標表的SELECT和INSERT權限。
本文以
liss_test
用戶和liss_test.liss_test_plaintext
表為例。CREATE role liss_test with login; ALTER role liss_test with password 'lissTest****'; ALTER role liss_test CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); \c - liss_test CREATE DATABASE liss_test; \c liss_test CREATE SCHEMA liss_test; CREATE TABLE liss_test.liss_test_plaintext ( column_1 varchar(32), column_2 bigint, column_3 numeric, column_4 varchar(32), column_5 varchar(32) ) distributed by (column_1, column_2);
使用限制
云原生數據倉庫 AnalyticDB PostgreSQL 版6.0實例需為v6.6.0及以上版本。云原生數據倉庫 AnalyticDB PostgreSQL 版7.0實例需為v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式實例暫不支持。
實時數據消費目前僅支持INSERT、MERGE(UPSERT)、UPDATE三種語法,暫不支持DELETE與READ。
使用MERGE(UPSERT)或UPDATE時,需要云原生數據倉庫 AnalyticDB PostgreSQL 版表有主鍵索引。
使用實時數據消費,不同分區(Partition)之間需要使用主鍵列做分區因子,否則可能會造成全局死鎖錯誤,導致部分數據更新失敗。
實時數據消費當前僅支持Kafka消息隊列,暫不支持CDC格式的數據源。
當前的版本向導模式支持CSV和Delimited兩種格式的數據源,專業模式支持CSV、Delimited和protobuf三種格式的數據源。
操作步驟
步驟一:開啟實時數據服務
在控制臺左上角,選擇實例所在地域。
找到目標實例,單擊實例ID。
在控制臺左側導航欄單擊實時數據消費,再單擊左上角開啟實時數據服務。
在彈出的對話框中填寫名稱及服務描述并單擊確定。開通完成后,可在控制臺看到服務狀態和連接信息。
說明服務規格當前不可選,默認為8CU。
步驟二:新增實時數據源
- 登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺。
- 在控制臺左上角,選擇實例所在地域。
- 找到目標實例,單擊實例ID。
在左側導航欄,單擊實時數據消費。
在實時數據源卡片中,單擊新增數據源,并完成以下配置。
配置項
描述
關聯數據服務
在下拉框中選擇已創建的實時數據服務。
數據源名稱
自定義數據源名稱。
數據源描述
自定義數據源描述。
數據源類型
目前僅支持Kafka。
brokers
Kafka接入點信息。
阿里云的Kafka服務,可登錄阿里云控制臺獲取默認接入點。具體操作,請參見查看接入點。
自建的kafka服務,Brokers需要填寫Kafka服務具體的
`hostname:port`
或`ip:port`
信息。
topic
Kafka的Topic名稱。
format
當前版本向導模式支持CSV和Delimited兩種格式的數據源,專業模式支持CSV、Delimited和protobuf三種格式的數據源。
列分隔符
可設置任意單字符分隔符。
單擊確定。
步驟三:新增實時任務
在實時任務卡片中,單擊新增實時任務,并完成以下配置。
請根據業務需要選擇向導模式或專業模式。
向導模式:可以通過控制臺中的指引來快速搭建任務。
專業模式:可以通過提交YAML的方式向Streaming Server提交任務,功能相比于向導模式更豐富。
向導模式
配置項
描述
基本信息
任務名稱
定義任務的名稱,任務名稱不可以重復,必填。
任務描述
描述任務內容,選填。
配置模式
向導模式。
源端配置
數據源
選擇在新增實時數據源中配置的數據源,目前僅支持Kafka為源的數據源。
group_name
Kafka的消費者組。
failback_offset
消費位點。
earliest:從最早可用位點消費。
latest:從最新的位點開始消費。
投遞保證
流計算中的一致性語義,支持:
ATLEAST:在Kafka中的數據至少有一次被寫入云原生數據倉庫AnalyticDB PostgreSQL版。
EXACTLY:在Kafka中的數據有且僅有一次被寫入云原生數據倉庫AnalyticDB PostgreSQL版。
目標端配置
目標庫
需要寫入的云原生數據倉庫AnalyticDB PostgreSQL版目標數據庫名稱。
Schema
云原生數據倉庫AnalyticDB PostgreSQL版的模式名稱。
目標表
需要寫入的云原生數據倉庫AnalyticDB PostgreSQL版目標表名稱。
賬號
當前任務使用的云原生數據倉庫AnalyticDB PostgreSQL版數據庫賬號。
密碼
賬號密碼。
寫入模式
目前僅支持INSERT、UPDATE和MERGE三種寫入模式。
INSERT:將數據直接寫入目標表。
UPDATE:當輸入列中的MatchColumns與目標表中的列匹配,更新UpdateColumns中列出的目標表列。
MERGE:當寫入數據與目標表列的值相等時,使用寫入數據更新目標表列的現有數據。當寫入數據與目標表列的值不相等時,直接將數據寫入目標表。MERGE寫入模式可類比于UPSERT(UPDATE and INSERT),關于UPSERT的寫入方式,請參見使用INSERT ON CONFLICT覆蓋寫入數據。
說明MatchColumns與UpdateColumns的含義請參見下文字段類型的描述。
ErrorLimitCount
錯誤數據的容忍閾值。當寫入的錯誤數據到達ErrorLimitCount時,Streaming Server會自動停止將數據源的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。0表示Streaming Server遇到第一次錯誤數據時就會停止數據寫入。目前該參數未啟用,填0即可。
字段映射
源字段
Kafka消息中的Value字段名,需要按照在Value中出現的順序指定所有的字段名。
目標字段
云原生數據倉庫AnalyticDB PostgreSQL版目標表的字段名。
字段類型
目前支持以下三種類型:
MatchColumns:作為寫入時的Join條件列作為更新條件,用于判斷目標表中哪些行需要被更新。
UpdateColumns:如果某一行數據符合更新條件,那么在UpdateColumns中的列會被更新為新的值。
空(不填):即使某一行數據符合更新條件,該字段也不會被更新為新的值。
在UPDATE和MERGE寫入時,Streaming Server會先將數據寫入一個臨時表,然后利用MatchColumns作為條件列與目標表進行Join:
如果有匹配的數據,則會更新UpdateColumns中的數據。
如果沒有匹配的數據時,則會根據寫入模式有以下兩種情況:
寫入模式為UPDATE時,數據不會被寫入。
寫入模式為MERGE時,數據會被寫入。
專業模式
配置項
描述
基本信息
任務名稱
定義任務的名稱,任務名稱不可以重復,必填。
任務描述
描述任務內容,選填。
配置模式
專業模式。
數據源
選擇在新增實時數據源中配置的數據源,目前僅支持Kafka為源的數據源。
YAML
可以通過YAML配置更復雜的寫入邏輯。本文的YAML配置示例如下。更多詳情,請參見附錄:YAML配置說明。
DATABASE: liss_test USER: liss_test PASSWORD: lissTest**** HOST: gp-2ze517f9l7****-master.gpdb.rds-aliyun-pre.rds.aliyuncs.com PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: alikafka-post-cn-wwo3hflbo002-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-wwo3hflbo002-3-vpc.alikafka.aliyuncs.com:9092 TOPIC: test_topic FALLBACK_OFFSET: EARLIEST VALUE: COLUMNS: - NAME: column_1 TYPE: varchar(32) - NAME: column_2 TYPE: bigint - NAME: column_3 TYPE: numeric - NAME: column_4 TYPE: varchar(32) - NAME: column_5 TYPE: varchar(32) FORMAT: delimited DELIMITED_OPTION: DELIMITER: "|" ERROR_LIMIT: 20 OUTPUT: SCHEMA: liss_test TABLE: liss_test_plaintext MODE: MERGE MATCH_COLUMNS: - column_1 - column_2 UPDATE_COLUMNS: - column_3 - column_4 - column_5 MAPPING: - NAME: column_1 EXPRESSION: column_1 - NAME: column_2 EXPRESSION: column_2 - NAME: column_3 EXPRESSION: column_3 - NAME: column_4 EXPRESSION: column_4 - NAME: column_5 EXPRESSION: column_5 COMMIT: MAX_ROW: 1000 MINIMAL_INTERVAL: 1000 CONSISTENCY: ATLEAST POLL: BATCHSIZE: 1000 TIMEOUT: 1000 PROPERTIES: group.id: test_consumer_group
單擊確定,并等待實時任務狀態為運行中,即可將數據源中的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。
在任務啟動后會在目標端配置的Schema(專業模式為METADATA.SCHEMA中配置的schema)下生成任務的兩種輔助表,其格式分別為:
lissext_$UID
:本任務定義的gpfdist外表,用于將數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。lisskafka_mission_info_$UID
:用于存儲任務當前位點推進的情況,保障數據寫入的一致性。目前為了保障寫入任務的高可用,每個寫入任務會生成4個子任務,所以每啟動一個寫入任務,會生成4張表。UID是每個寫入任務的唯一標識ID。
附錄:YAML配置說明
YAML配置文件格式如下。
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <adbpg_port>
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[PARTITIONS: (<partition_numbers>)]
[FALLBACK_OFFSET: { earliest | latest }]
[VALUE:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[KEY:
COLUMNS:
- NAME: { <column_name> }
TYPE: <column_data_type>
[ ... ]
FORMAT: <key_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string> |
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries> ]
數據庫相關配置
參數 | 描述 | 是否必填 |
DATABASE | 目標端云原生數據倉庫AnalyticDB PostgreSQL版實例的數據庫名稱。 | 是 |
USER | 云原生數據倉庫AnalyticDB PostgreSQL版實例的賬號。 | 是 |
PASSWORD | 云原生數據倉庫AnalyticDB PostgreSQL版實例的賬號密碼。 | 是 |
HOST | 目標端云原生數據倉庫AnalyticDB PostgreSQL版實例的內網地址。 | 是 |
PORT | 云原生數據倉庫AnalyticDB PostgreSQL版實例的端口號。 | 是 |
VERSION | 當前采用的YAML文件格式版本,預留字段,無限制。 | 否 |
KAFKA:INPUT配置
KAFKA:INPUT:SOURCE
參數 | 描述 | 是否必填 | 參數值限制 |
BROKERS | Kafka接入點信息。
如有多個使用英文逗號( | 是 | 對應kafka consumer bootstrap.server 配置,需要填寫有效的Brokers地址,否則會報錯。 |
TOPIC | Kafka Topic名稱。 | 是 | 僅支持單個Topic。 |
PARTITIONS | 分區編號。 如有多個分區編號,使用英文逗號( | 否 | 例如:1,2,3,4,5 |
FALLBACK_OFFSET | 消費位點。
| 是 | 無 |
KAFKA:INPUT:KEY和KAFKA:INPUT:VALUE
Kafka消息的Key值字段名稱、數據類型和數據格式。
Kafka消息的Value字段名稱、數據類型和數據格式。
必須按照在Key和Value中出現的順序指定所有Kafka數據元素。
KAFKA:INPUT:KEY
和KAFKA:INPUT:VALUE
至少需要配置一個,如果兩個都未配置會報錯。
參數 | 描述 | 是否必填 | 參數值限制 |
COLUMNS | 如果定義 如果定義 | 是 | 無 |
NAME | 定義Kafka消息中的列名。該列名主要在 | 是 | 無 |
TYPE | 定義Kafka消息中列的類型,數據類型需要與這個列在目標數據庫中的類型保持一致。 由于Kafka消息中Key和Value的格式不透明,因此當前Streaming Server默認從Kafka消息中獲取的數據格式為文本形式。 | 是 | 云原生數據倉庫AnalyticDB PostgreSQL版支持的數據類型請參見數據類型。 如果Kafka消息的列與目標列的類型不一致,請在Mapping中的expression部分對類型進行轉換。 |
FORMAT | 定義Kafka消息數據的類型,當前支持CSV、Delimited和protobuf。 | 是 | 無 |
KAFKA:INPUT:META
META不是必填項,當您需要展示Message Meta信息時配置。
參數 | 描述 | 是否必填 | 參數值限制 |
COLUMNS | 定義Meta信息,為一組NAME,TYPE。 | 是 | 無 |
NAME | Meta名稱,可以指定為其他的名稱,默認使用 | 是 | 無 |
TYPE | 只能使用Text類型。 | 是 | Text |
FORMAT | 只能使用Text類型。 | 是 | Text |
KAFKA:INPUT:ERROR_LIMIT
錯誤數據的容忍閾值。當寫入的錯誤數據達到ERROR_LIMIT時,Streaming Server會退出當前任務,自動停止將數據源的數據寫入云原生數據倉庫AnalyticDB PostgreSQL版。默認值為0,即Streaming Server會在出現第一次錯誤數據時就退出當前任務,停止數據寫入。ERROR_LIMIT值必須大于1。
目前該參數未啟用,不選擇或者填0即可。
KAFKA:OUTPUT配置
數據庫相關配置
數據寫入到云原生數據倉庫AnalyticDB PostgreSQL版數據庫的相關配置,包括Kafka值到目標數據庫的映射、寫入模式等。
參數 | 描述 | 是否必填 |
SCHEMA | 寫入云原生數據倉庫AnalyticDB PostgreSQL版的目標表所在的Schema。 | 是 |
TABLE | 目標表的名稱。 | 是 |
MODE | 寫入模式,目前支持INSERT、UPDATE和MERGE三種方式。 | 是 |
MATCH_COLUMNS | 當寫入模式為UPDATE和MERGE時生效。 指定目標表的部分列,當寫入數據與目標表數據匹配時,目標表中這部分數據會根據UPDATE或MERGE模式對數據進行更新。 建議MATCH_COLUMNS使用目標表的主鍵或者唯一鍵。 | 否 |
ORDER_COLUMNS | 在寫入模式(MODE)為MERGE時生效。 當寫入數據根據MATCH_COLUMNS存在多個匹配行時,使用ORDER_COLUMNS對這些數據進行排序,以確定具有最大值的輸入行,Streaming Server使用該行來更新目標。 | 否 |
UPDATE_COLUMNS | 當寫入模式為UPDATE和MERGE時生效。 如果寫入數據能夠根據MATCH_COLUMNS匹配到目標表數據,則會基于UPDATE_COLUMNS更新對應的列。 | 否 |
在使用MERGE和UPDATE模式時,如果不指定ORDER_COLUMNS,當寫入數據根據MATCH_COLUMNS匹配到多行相同時,則會隨機取一條作為結果寫入。
在指定了ORDER_COLUMNS后,其排序結果是
a desc,b desc,c desc
。
KAFKA:OUTPUT:MAPPING
參數 | 描述 | 是否必填 |
NAME | 目標列名稱。 | 是 |
EXPRESSION | 可以是源端的列名( | 是 |
KAFKA:METADATA配置
參數 | 描述 | 是否必填 | 參數限制 |
schema | Streaming Server創建的外表和其他輔助表所在的Schema名稱。 | 否 | 默認取值 |
KAFKA:COMMIT配置
COMMIT用于控制向數據庫提交數據的行為。
參數 | 描述 | 是否必填 | 參數限制 |
MAX_ROW | 指定一次寫入目標庫的最大Batch Size。 | 否 | 單位為行,默認:500。 |
MINIMAL_INTERVAL | 在兩個Batch寫入之間的等待時間。如果超過該時間,會嘗試再寫一次。 | 否 | 單位為毫秒(ms),默認:1000。 |
CONSISTENCY | 數據一致性保證。 | 否 | 目前僅支持ATLEAST,即kafka中的數據至少會寫入目標數據庫一次。 |
KAFKA:POLL配置
POLL用于控制Kafka Consumer消費數據的行為。
參數 | 描述 | 是否必填 | 參數限制 |
BATCHSIZE | 一次從Topic中拿出的event數量。保留字段,目前沒有實現相關功能。 | 否 | 單位為行,默認:64。 |
TIMEOUT | Kafka Consumer從Kafka中獲取event等待的超時時間。 | 否 | 單位為毫秒(ms),默認:5000。 |
KAFKA:PROPERTIES配置
PROPERTIES用于配置Kafka Connect,當前采用白名單制,僅支持配置group.id
,auto.offset.reset
和isolation.level
。詳細信息,請參見Kafka Connect Configs。