通過實(shí)時計算Flink版讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版
本文介紹如何通過阿里云實(shí)時計算Flink版實(shí)時讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版(原分析型數(shù)據(jù)庫PostgreSQL版)數(shù)據(jù),包括版本限制、語法示例、創(chuàng)建和運(yùn)行Flink作業(yè)、WITH參數(shù)、CACHE參數(shù)、類型映射和參數(shù)支持等。
版本限制
創(chuàng)建3.6.0及以上版本實(shí)時計算集群。
創(chuàng)建6.0版本云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例(實(shí)時計算集群和云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例需要位于同一VPC下,且云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例的白名單規(guī)則允許Flink集群網(wǎng)段訪問)。
語法示例
CREATE TABLE dim_adbpg(
id int,
username varchar,
INDEX(id)
) with(
type='custom',
tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
url='jdbc:postgresql://內(nèi)網(wǎng)連接串/databasename',
tableName='tablename',
userName='username',
password='password',
joinMaxRows='100',
maxRetryTimes='1',
connectionMaxActive='5',
retryWaitTime='100',
targetSchema='public',
caseSensitive='0',
cache='LRU',
cacheSize='1000',
cacheTTLMs='10000',
cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
partitionedJoin='true'
);
-- join時需要指定在代碼中加入維表標(biāo)識 FOR SYSTEM_TIME AS OF PROCTIME()
INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;
WITH參數(shù)
參數(shù)名 | 參數(shù)含義 | 備注 |
URL | 云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例的連接地址 | 必填,需要填寫格式為jdbc:postgresql://<云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版內(nèi)網(wǎng)連接串>/databaseName 的內(nèi)網(wǎng)連接地址。 |
type | 表類型 | 必填。 |
tableName | 云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版源表名 | 必填,填寫維表對應(yīng)的云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)倉庫中的表名。 |
userName | 云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)庫賬號 | 必填。 |
password | 云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版密碼 | 必填。 |
joinMaxRows | 左表一條記錄連接右表的最大記錄數(shù) | 非必填,表示在一對多連接時,左表一條記錄連接右表的最大記錄數(shù)(默認(rèn)值為1024)。在一對多連接的記錄數(shù)過多時,可能會極大地影響流任務(wù)的性能,因此您需要增大Cache的內(nèi)存(cacheSize限制的是左表key的個數(shù))。 |
maxRetryTimes | 單次SQL失敗后重試次數(shù) | 非必填,實(shí)際執(zhí)行時,可能會因為各種因素造成執(zhí)行失敗,比如網(wǎng)絡(luò)或者IO不穩(wěn)定,超時等原因,云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版維表支持SQL執(zhí)行失敗后自動重試,用maxRetryTimes參數(shù)可以設(shè)定重試次數(shù)。默認(rèn)值為3。 |
connectionMaxActive | 連接池最大連接數(shù) | 非必填,云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版維表中內(nèi)置連接池,設(shè)置合理的連接池最大連接數(shù)可以兼顧效率和安全性,默認(rèn)值為5。 |
retryWaitTime | 重試休眠時間 | 非必填,每次SQL失敗重試之間的sleep間隔,單位ms,默認(rèn)值100。 |
targetSchema | 查詢的ADBPG schema | 非必填,默認(rèn)值public。 |
caseSensitive | 是否大小寫敏感 | 非必填,默認(rèn)值0,即不敏感;填1可以設(shè)置為敏感。 |
CACHE參數(shù)
參數(shù)名 | 參數(shù)含義 | 備注 |
cache | 緩存策略 | 目前云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版支持以下三種緩存策略:
|
cacheSize | 設(shè)置LRU緩存的最大行數(shù) | 非必填,默認(rèn)為10000行。 |
cacheTTLMs | 緩存更新時間間隔。系統(tǒng)會根據(jù)您設(shè)置的緩存更新時間間隔,重新加載一次維表中的最新數(shù)據(jù),保證源表能JOIN到維表的最新數(shù)據(jù)。 | 非必填,單位為毫秒。默認(rèn)不設(shè)置此參數(shù),表示不重新加載維表中的新數(shù)據(jù)。 |
cacheReloadTimeBlackList | 更新時間黑名單。在緩存策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內(nèi)做Cache更新(例如雙11場景)。 | 非必填,默認(rèn)空,格式為 '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00' 。其中分割符使用情況如下:
|
partitionedJoin | 是否開啟partitionedJoin。在開啟partitionedJoin優(yōu)化時,主表會在關(guān)聯(lián)維表前,先按照J(rèn)oin KEY進(jìn)行Shuffle,這樣做有以下優(yōu)點(diǎn):
| 非必填,默認(rèn)情況下為false,表示不開啟partitionedJoin。 |
類型映射
實(shí)時計算字段類型 | ADB PG版字段類型 |
BOOLEAN | BOOLEAN |
TINYINT | SMALLINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DOUBLE | DOUBLE PRECISION |
VARCHAR | TEXT |
DATETIME | TIMESTAMP |
DATE | DATE |
FLOAT | REAL |
DECIMAL | DOUBLE PRECISION |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
創(chuàng)建和運(yùn)行Flink作業(yè)
登錄實(shí)時計算控制臺,在頁面頂部菜單欄上,鼠標(biāo)懸停在用戶頭像上,單擊項目管理。在項目管理>項目列表頁面,單擊項目名進(jìn)入自己創(chuàng)建的項目。
單擊開發(fā)>新建作業(yè),創(chuàng)建數(shù)據(jù)寫入的Flink SQL作業(yè)。
目前采用Flink自定義維表的方式支持讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版目標(biāo)表數(shù)據(jù),使用自定義維表功能上線前需要在資源引用界面上傳及引用.jar包,編寫完作業(yè)后點(diǎn)擊資源引用>新建資源>上傳JAR包>更多>引用。下載JAR包。
完成作業(yè)開發(fā)后,依次點(diǎn)擊保存、上線,即可上線該任務(wù)。
繼續(xù)點(diǎn)擊運(yùn)維,啟動對應(yīng)項目即可啟動任務(wù)。
代碼示例
這里給出讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)打印到Flink日志中的Flink SQL示例:
--SQL
--********************************************************************--
--Author: zihua
--CreateTime: 2019-09-07 10:34:34
--********************************************************************--
CREATE TABLE s_member_cart
(
a1 int,
a2 tinyint ,
a3 smallint ,
a4 int,
a5 boolean,
a6 FLOAT ,
a7 DECIMAL ,
a8 double,
a9 date ,
a10 time ,
a11 timestamp ,
a12 tinyint
) WITH (
type='random'
);
CREATE VIEW s_member_cart_view AS
SELECT MOD(a1, 10) c1, a2, a3, a4, a5, a6, a6, a8, a9, a10, a11, case when a12 >0 then 'test1' else 'test5' end as b12,'{ "customer": "中文56", "items": {"product": "Beer","qty": 6}}' a13
FROM s_member_cart;
--adbpg dim index
CREATE TABLE dim_adbpg(
id int,
username varchar,
INDEX(id)
) with(
type='custom',
tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
url='jdbc:postgresql://內(nèi)網(wǎng)連接串/databasename',
tableName='tablename',
userName='username',
password='password',
joinMaxRows='100',
maxRetryTimes='1',
connectionMaxActive='5',
retryWaitTime='100',
targetSchema='public',
caseSensitive='0',
cache='LRU',
cacheSize='1000',
cacheTTLMs='10000',
cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
partitionedJoin='true'
);
-- ads sink.
CREATE TABLE print_sink (
B1 int,
B2 tinyint ,
B3 smallint ,
B4 int,
B5 boolean,
B6 FLOAT ,
B7 FLOAT ,
B8 double,
B9 date ,
B10 time ,
B11 timestamp ,
B12 varchar,
B15 varchar,
PRIMARY KEY(B1)
) with (
type='print'
);
INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;