日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過實(shí)時計算Flink版讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版

本文介紹如何通過阿里云實(shí)時計算Flink版實(shí)時讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版(原分析型數(shù)據(jù)庫PostgreSQL版)數(shù)據(jù),包括版本限制、語法示例、創(chuàng)建和運(yùn)行Flink作業(yè)、WITH參數(shù)、CACHE參數(shù)、類型映射和參數(shù)支持等。

版本限制

  • 創(chuàng)建3.6.0及以上版本實(shí)時計算集群。

  • 創(chuàng)建6.0版本云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例(實(shí)時計算集群和云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例需要位于同一VPC下,且云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例的白名單規(guī)則允許Flink集群網(wǎng)段訪問)。

語法示例

CREATE TABLE dim_adbpg(
        id int,
        username varchar,
        INDEX(id)
) with(
        type='custom',
        tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
        url='jdbc:postgresql://內(nèi)網(wǎng)連接串/databasename',
        tableName='tablename',
        userName='username',
        password='password',
        joinMaxRows='100',
        maxRetryTimes='1',
        connectionMaxActive='5',
        retryWaitTime='100',
        targetSchema='public',
        caseSensitive='0',
        cache='LRU',
        cacheSize='1000',
        cacheTTLMs='10000',
        cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
        partitionedJoin='true'
);

-- join時需要指定在代碼中加入維表標(biāo)識 FOR SYSTEM_TIME AS OF PROCTIME()
INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;

WITH參數(shù)

參數(shù)名

參數(shù)含義

備注

URL

云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版實(shí)例的連接地址

必填,需要填寫格式為jdbc:postgresql://<云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版內(nèi)網(wǎng)連接串>/databaseName 的內(nèi)網(wǎng)連接地址。

type

表類型

必填。

tableName

云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版源表名

必填,填寫維表對應(yīng)的云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)倉庫中的表名。

userName

云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)庫賬號

必填。

password

云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版密碼

必填。

joinMaxRows

左表一條記錄連接右表的最大記錄數(shù)

非必填,表示在一對多連接時,左表一條記錄連接右表的最大記錄數(shù)(默認(rèn)值為1024)。在一對多連接的記錄數(shù)過多時,可能會極大地影響流任務(wù)的性能,因此您需要增大Cache的內(nèi)存(cacheSize限制的是左表key的個數(shù))。

maxRetryTimes

單次SQL失敗后重試次數(shù)

非必填,實(shí)際執(zhí)行時,可能會因為各種因素造成執(zhí)行失敗,比如網(wǎng)絡(luò)或者IO不穩(wěn)定,超時等原因,云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版維表支持SQL執(zhí)行失敗后自動重試,用maxRetryTimes參數(shù)可以設(shè)定重試次數(shù)。默認(rèn)值為3。

connectionMaxActive

連接池最大連接數(shù)

非必填,云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版維表中內(nèi)置連接池,設(shè)置合理的連接池最大連接數(shù)可以兼顧效率和安全性,默認(rèn)值為5。

retryWaitTime

重試休眠時間

非必填,每次SQL失敗重試之間的sleep間隔,單位ms,默認(rèn)值100。

targetSchema

查詢的ADBPG schema

非必填,默認(rèn)值public。

caseSensitive

是否大小寫敏感

非必填,默認(rèn)值0,即不敏感;填1可以設(shè)置為敏感。

CACHE參數(shù)

參數(shù)名

參數(shù)含義

備注

cache

緩存策略

目前云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版支持以下三種緩存策略:

  • None(默認(rèn)值):無緩存。

  • LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表來一條數(shù)據(jù),系統(tǒng)會先查找Cache,如果沒有找到,則去物理維表中查詢。

  • ALL:緩存維表里的所有數(shù)據(jù)。在Job運(yùn)行前,系統(tǒng)會將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查詢都會通過Cache進(jìn)行。如果在Cache中無法找到數(shù)據(jù),則KEY不存在,并在Cache過期后重新加載一遍全量Cache。

cacheSize

設(shè)置LRU緩存的最大行數(shù)

非必填,默認(rèn)為10000行。

cacheTTLMs

緩存更新時間間隔。系統(tǒng)會根據(jù)您設(shè)置的緩存更新時間間隔,重新加載一次維表中的最新數(shù)據(jù),保證源表能JOIN到維表的最新數(shù)據(jù)。

非必填,單位為毫秒。默認(rèn)不設(shè)置此參數(shù),表示不重新加載維表中的新數(shù)據(jù)。

cacheReloadTimeBlackList

更新時間黑名單。在緩存策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內(nèi)做Cache更新(例如雙11場景)。

非必填,默認(rèn)空,格式為 '2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00' 。其中分割符使用情況如下:

  • 用逗號(,)來分隔多個黑名單。

  • 用箭頭(->)來分割黑名單的起始結(jié)束時間。

partitionedJoin

是否開啟partitionedJoin。在開啟partitionedJoin優(yōu)化時,主表會在關(guān)聯(lián)維表前,先按照J(rèn)oin KEY進(jìn)行Shuffle,這樣做有以下優(yōu)點(diǎn):

  • 在緩存策略為LRU時,可以提高緩存命中率。

  • 在緩存策略為ALL時,節(jié)省內(nèi)存資源,因為每個并發(fā)只緩存自己并發(fā)所需要的數(shù)據(jù)。

非必填,默認(rèn)情況下為false,表示不開啟partitionedJoin。

類型映射

實(shí)時計算字段類型

ADB PG版字段類型

BOOLEAN

BOOLEAN

TINYINT

SMALLINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DOUBLE

DOUBLE PRECISION

VARCHAR

TEXT

DATETIME

TIMESTAMP

DATE

DATE

FLOAT

REAL

DECIMAL

DOUBLE PRECISION

TIME

TIME

TIMESTAMP

TIMESTAMP

創(chuàng)建和運(yùn)行Flink作業(yè)

  1. 登錄實(shí)時計算控制臺,在頁面頂部菜單欄上,鼠標(biāo)懸停在用戶頭像上,單擊項目管理。在項目管理>項目列表頁面,單擊項目名進(jìn)入自己創(chuàng)建的項目。4-1

  2. 單擊開發(fā)>新建作業(yè),創(chuàng)建數(shù)據(jù)寫入的Flink SQL作業(yè)。4-24-3

  3. 目前采用Flink自定義維表的方式支持讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版目標(biāo)表數(shù)據(jù),使用自定義維表功能上線前需要在資源引用界面上傳及引用.jar包,編寫完作業(yè)后點(diǎn)擊資源引用>新建資源>上傳JAR包>更多>引用。3下載JAR包。

  4. 完成作業(yè)開發(fā)后,依次點(diǎn)擊保存、上線,即可上線該任務(wù)。4-4

  5. 繼續(xù)點(diǎn)擊運(yùn)維,啟動對應(yīng)項目即可啟動任務(wù)。18609001

代碼示例

這里給出讀取云原生數(shù)據(jù)倉庫AnalyticDB PostgreSQL版數(shù)據(jù)打印到Flink日志中的Flink SQL示例:

--SQL
--********************************************************************--
--Author: zihua
--CreateTime: 2019-09-07 10:34:34
--********************************************************************--

CREATE TABLE s_member_cart
(
        a1 int,
        a2 tinyint    ,
        a3 smallint  ,
        a4 int,
        a5 boolean,
        a6 FLOAT     ,
        a7 DECIMAL   ,
        a8 double,
        a9 date   ,
        a10 time      ,
        a11 timestamp ,
        a12 tinyint
) WITH (
        type='random'
);

CREATE VIEW s_member_cart_view AS
SELECT MOD(a1, 10) c1, a2, a3, a4, a5, a6, a6, a8, a9, a10, a11, case when a12 >0 then 'test1' else 'test5' end as b12,'{ "customer": "中文56", "items": {"product": "Beer","qty": 6}}' a13
FROM s_member_cart;

--adbpg dim index
CREATE TABLE dim_adbpg(
        id int,
        username varchar,
        INDEX(id)
) with(
        type='custom',
        tableFactoryClass='com.alibaba.blink.customersink.ADBPGCustomSourceFactory',
        url='jdbc:postgresql://內(nèi)網(wǎng)連接串/databasename',
        tableName='tablename',
        userName='username',
        password='password',
        joinMaxRows='100',
        maxRetryTimes='1',
        connectionMaxActive='5',
        retryWaitTime='100',
        targetSchema='public',
        caseSensitive='0',
        cache='LRU',
        cacheSize='1000',
        cacheTTLMs='10000',
        cacheReloadTimeBlackList='2017-10-24 14:00 -> 2017-10-24 15:00',
        partitionedJoin='true'
);

-- ads sink.
CREATE TABLE print_sink (
        B1 int,
        B2 tinyint  ,
        B3 smallint  ,
        B4 int,
        B5 boolean,
        B6 FLOAT     ,
        B7 FLOAT   ,
        B8 double,
        B9 date   ,
        B10 time      ,
        B11 timestamp ,
        B12 varchar,
        B15 varchar,
        PRIMARY KEY(B1)
) with (
        type='print'
);


INSERT INTO print_sink
SELECT R.c1, R.a2, R.a3, R.a4, R.a5, R.a6, R.a6, R.a8, R.a9, R.a10, R.a11, R.a13, T.username
FROM s_member_cart_view AS R
left join
dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS T
on R.c1 = T.id;