通過實(shí)時(shí)計(jì)算Flink版寫入數(shù)據(jù)到云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版
本文介紹如何通過阿里云實(shí)時(shí)計(jì)算Flink版寫入數(shù)據(jù)到AnalyticDB PostgreSQL版。
使用限制
該功能暫不支持AnalyticDB PostgreSQL版Serverless模式。
僅Flink實(shí)時(shí)計(jì)算引擎VVR 6.0.0及以上版本支持云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版連接器。
僅Flink實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版7.0版本。
說明如果您使用了自定義連接器,具體操作請參見管理自定義連接器。
前提條件
已創(chuàng)建Flink全托管工作空間。具體操作,請參見開通Flink全托管。
已創(chuàng)建AnalyticDB PostgreSQL版實(shí)例。具體操作,請參見創(chuàng)建實(shí)例。
AnalyticDB PostgreSQL版實(shí)例和Flink全托管工作空間需要位于同一VPC下。
配置AnalyticDB PostgreSQL版實(shí)例
- 登錄云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版控制臺(tái)。
將Flink工作空間所屬的網(wǎng)段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設(shè)置白名單。
單擊登錄數(shù)據(jù)庫,連接數(shù)據(jù)庫的更多方式,請參見客戶端連接。
在AnalyticDB PostgreSQL版實(shí)例上創(chuàng)建一張表。
建表SQL示例如下:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
配置實(shí)時(shí)計(jì)算Flink
在Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)。
在左側(cè)導(dǎo)航欄,單擊數(shù)據(jù)連接。
在數(shù)據(jù)連接頁面,單擊創(chuàng)建自定義連接器。
上傳自定義連接器JAR文件。
說明獲取AnalyticDB PostgreSQL版自定義Flink Connector的JAR包,請參見AnalyticDB PostgreSQL Connector。
JAR包的版本需要與實(shí)時(shí)計(jì)算平臺(tái)的Flink引擎版本一致。
上傳完成后,單擊下一步。
系統(tǒng)會(huì)對您上傳的自定義連接器內(nèi)容進(jìn)行解析。如果解析成功,您可以繼續(xù)下一步。如果解析失敗,請確認(rèn)您上傳的自定義連接器代碼是否符合Flink社區(qū)標(biāo)準(zhǔn)。
單擊完成。
創(chuàng)建完成的自定義連接器會(huì)出現(xiàn)在連接器列表中。
創(chuàng)建Flink作業(yè)
登錄實(shí)時(shí)計(jì)算控制臺(tái),在Flink全托管頁簽,單擊目標(biāo)工作空間操作列下的控制臺(tái)。
在左側(cè)導(dǎo)航欄,單擊SQL開發(fā),單擊新建,選擇空白的流作業(yè)草稿,單擊下一步。
在新建文件草稿對話框,填寫作業(yè)配置信息。
作業(yè)參數(shù)
說明
示例
文件名稱
作業(yè)的名稱。
說明作業(yè)名稱在當(dāng)前項(xiàng)目中必須保持唯一。
adbpg-test
存儲(chǔ)位置
指定該作業(yè)的代碼文件所屬的文件夾。
您還可以在現(xiàn)有文件夾右側(cè),單擊圖標(biāo),新建子文件夾。
作業(yè)草稿
引擎版本
當(dāng)前作業(yè)使用的Flink的引擎版本。引擎版本號(hào)含義、版本對應(yīng)關(guān)系和生命周期重要時(shí)間點(diǎn)詳情請參見引擎版本介紹。
vvr-6.0.7-flink-1.15
單擊創(chuàng)建。
寫入數(shù)據(jù)到AnalyticDB PostgreSQL版
編寫作業(yè)代碼。
創(chuàng)建隨機(jī)源表
datagen_source
和對應(yīng)AnalyticDB PostgreSQL版的表test_adbpg_table
,將以下作業(yè)代碼拷貝到作業(yè)文本編輯區(qū)。CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE test_adbpg_table ( `B1` bigint , `B2` bigint , `B3` VARCHAR , `B4` VARCHAR, PRIMARY KEY(B1) not ENFORCED ) with ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'xxx', 'tablename' = 'test_adbpg_table', 'username' = 'xxxx', 'url' = 'jdbc:postgresql://url:5432/schema', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
其中
datagen_source
表的參數(shù)無需修改,test_adbpg_table
表的參數(shù)需要根據(jù)您實(shí)際情況進(jìn)行修改,參數(shù)說明如下:參數(shù)
是否必填
說明
connector
是
connector名稱,固定為
adbpg-nightly-版本號(hào)
,例如adbpg-nightly-1.13
。url
是
AnalyticDB PostgreSQL版的JDBC連接地址。格式為
jdbc:postgresql://<內(nèi)網(wǎng)地址>:<端口>/<連接的數(shù)據(jù)庫名稱>
,示例如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的數(shù)據(jù)庫賬號(hào)。
password
是
AnalyticDB PostgreSQL版的數(shù)據(jù)庫賬號(hào)密碼。
maxretrytimes
否
SQL執(zhí)行失敗后重試次數(shù),默認(rèn)為3次。
batchsize
否
一次批量寫入的最大條數(shù),默認(rèn)為50000條。
exceptionmode
否
數(shù)據(jù)寫入過程中出現(xiàn)異常時(shí)的處理策略。支持以下兩種處理策略:
ignore(默認(rèn)值):忽略出現(xiàn)異常時(shí)寫入的數(shù)據(jù)。
strict:數(shù)據(jù)寫入異常時(shí),故障轉(zhuǎn)移(Failover)并報(bào)錯(cuò)。
conflictmode
否
當(dāng)出現(xiàn)主鍵沖突或者唯一索引沖突時(shí)的處理策略。支持以下四種處理策略:
ignore :忽略主鍵沖突,保留之前的數(shù)據(jù)。
strict:主鍵沖突時(shí),故障轉(zhuǎn)移(Failover)并報(bào)錯(cuò)。
update:主鍵沖突時(shí),更新新到的數(shù)據(jù)。
upsert(默認(rèn)值):主鍵沖突時(shí),采用UPSERT方式寫入數(shù)據(jù)。
AnalyticDB PostgreSQL版通過INSERT ON CONFLICT和COPY ON CONFLICT實(shí)現(xiàn)UPSERT寫入數(shù)據(jù),如果目標(biāo)表為分區(qū)表,則需要內(nèi)核小版本為V6.3.6.1及以上,如何升級(jí)內(nèi)核小版本,請參見版本升級(jí)。
targetschema
否
AnalyticDB PostgreSQL版的Schema,默認(rèn)為public。
writemode
否
寫入方式。取值說明:
0 :采用BATCH INSERT方式寫入數(shù)據(jù)。
1(默認(rèn)值):采用COPY API寫入數(shù)據(jù)。
2:采用BATCH UPSERT方式寫入數(shù)據(jù)。
verbose
否
是否輸出connector運(yùn)行日志。取值說明:
0(默認(rèn)):不輸出運(yùn)行日志。
1:輸出運(yùn)行日志。
retrywaittime
否
出現(xiàn)異常重試時(shí)間隔的時(shí)間。單位為ms,默認(rèn)值為100。
batchwritetimeoutms
否
攢批寫入數(shù)據(jù)時(shí)最長攢批時(shí)間,超過此事件會(huì)觸發(fā)這一批的寫入。單位為毫秒(ms),默認(rèn)值為50000。
connectionmaxactive
否
連接池參數(shù),單個(gè)Task manager中連接池同一時(shí)刻最大連接數(shù)。默認(rèn)值為5。
casesensitive
否
列名和表名是否區(qū)分大小寫,取值說明:
0(默認(rèn)):不區(qū)分大小寫。
1:區(qū)分大小寫。
說明支持參數(shù)和類型映射,詳情請參見連接器云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版。
啟動(dòng)作業(yè)。
在作業(yè)開發(fā)頁面頂部,單擊部署,在彈出的對話框中,單擊確定。
說明Session集群適用于非生產(chǎn)環(huán)境的開發(fā)測試環(huán)境,您可以使用Session集群模式調(diào)試作業(yè),提高作業(yè)JM(Job Manager)資源利用率和作業(yè)啟動(dòng)速度。但不推薦您將作業(yè)提交至Session集群中,因?yàn)闀?huì)存在業(yè)務(wù)穩(wěn)定性問題,詳情請參見作業(yè)調(diào)試。
在作業(yè)運(yùn)維頁面,單擊目標(biāo)作業(yè)操作列的啟動(dòng)。
單擊啟動(dòng)。
觀察同步結(jié)果
連接AnalyticDB PostgreSQL版數(shù)據(jù)庫。具體操作,請參見客戶端連接。
執(zhí)行以下語句查詢
test_adbpg_table
表。SELECT * FROM test_adbpg_table;
數(shù)據(jù)正常寫入到AnalyticDB PostgreSQL版中,返回示例如下。