Hologres與實時計算Blink獨享模式(原產品線)深度融合,支持使用Connector的方式寫入數據至Hologres結果表,您可以立即查詢寫入的數據。本文為您介紹實時計算Blink獨享模式(原產品線)如何寫入數據至Hologres結果表。
使用限制
不同Blink獨享模式的版本開發語義不同,在使用之前,請先確定Blink獨享模式的版本,并根據版本示例使用。
請確保開通的實時計算與Hologres地域一致,以免連接失敗。
Blink獨享模式3.6之前的版本未內置Hologres Connector,實時寫入數據至Hologres需要引用JAR文件,請您使用自助升級或加入Hologres釘釘交流群反饋,詳情請參見如何獲取更多的在線支持?。
說明建議您升級至3.6及以上的版本進行作業。
Blink獨享模式3.7版本支持自動創建Hologres分區表,但是您需要在作業中配置
createparttable='true'
。同時,使用分區表的注意事項如下:Hologres當前僅支持List分區。
創建分區表時,需要顯示指定的分區列。目前僅支持text和int4類型的分區列,并且分區的值不能包含短劃線(-),例如
2020-09-12
。如果分區表配置了主鍵(pk),則分區列必須是pk的一部分。
創建分區子表時,子表分區列的值必須為固定值。
寫入分區子表的數據對應的分區列值,必須與子表創建時定義的值嚴格匹配,否則會報錯。
當前不支持DEFAULT分區功能。
當導入數據的Hologres目標表設置了主鍵,實時寫入的默認語義不會按照主鍵進行更新,后續導入的主鍵數據如果重復,則會被丟棄。
Hologres為異步寫入數據,您在進行作業時需要添加
blink.checkpoint.fail_on_checkpoint_error=true
配置,當作業發生異常時才會觸發Failover。Blink3.7.6及以上版本不需要添加該參數。
DDL語義
創建Hologres結果表的語句如下。
create table Hologres_sink(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='hologres',
dbname='<yourDbname>', --Hologres的數據庫名稱。
tablename='<yourTablename>', --Hologres用于接收數據的表名稱。
username='<yourUsername>', --當前阿里云賬號的AccessKey ID。
password='<yourPassword>', --當前阿里云賬號的AccessKey Secret。
endpoint='<yourEndpoint>'); --當前Hologres實例VPC網絡的Endpoint。
WITH參數
參數 | 描述 | 示例 |
type | 結果表的類型。 固定值為hologres。 | hologres |
endpoint | Hologres實例的VPC網絡地址。 進入Hologres管理控制臺,在目標實例詳情頁的網絡信息區域獲取Endpoint。Endpoint需包含端口號,格式為ip:port。 | demo-cn-hangzhou-vpc.hologres.aliyuncs.com:80 |
username | AccessKey ID 您可以單擊AccessKey 管理,獲取AccessKey ID。 | xxxxm3FMWaxxxx |
password | AccessKey Secret 您可以單擊AccessKey 管理,獲取AccessKey Secret。 | xxxxm355fffaxxxx |
dbname | 當前Hologres的數據庫名稱。 | Holodb |
tablename | 當前Hologres數據庫的表名稱。 | blink_test |
arraydelimiter | Hologres Sink支持將一個STRING字段按照field_delimiter切分為數組導入Hologres。 默認值為\u0002。 | \u0002 |
mutatetype | 數據寫入模式,詳情請參見實時數倉Hologres結果表。 默認值為insertorignore。 | insertorignore |
ignoredelete | 是否忽略回撤消息。
說明 該參數僅在使用流式語義時生效。 默認為false。 通常Flink的Groupby會產生回撤消息,回撤消息傳輸到Hologres Connector時會產生Delete請求。 | false |
partitionrouter | 是否寫入分區表。
默認為false。 | false |
createparttable | 當寫入分區表時,是否根據分區值自動創建分區表。Blink獨享 3.7及以上版本支持該功能。 默認值為false。 重要 請您謹慎使用該功能,確保分區值不會出現臟數據,從而導致創建了錯誤的分區表。 | false |
arraydelimiter、mutatetype、ignoredelete、partitionrouter及createparttable參數未在DDL示例語句中展示,如果您在實際應用中需要使用相應參數,可參考上述表格中的參數描述。
實時寫入數據至Hologres普通結果表
Hologres創建表。
在Hologres中創建一張用于接收數據的表。示例建表SQL語句如下。
create table blink_test (a int, b text, c text, d float8, e bigint);
創建實時計算作業。
登錄實時計算控制臺。
創建實時計算作業。
實時計算Blink 3.6及以上版本已支持Hologres數據源,您可以直接調用,示例SQL語句如下。
create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random'); create table test ( a int, b VARCHAR, c VARCHAR, PRIMARY KEY (a) ) with ( type = 'hologres', `endpoint` = '$ip:$port', --當前Hologres的VPC網絡地址以及端口號。 `username` = '當前阿里云賬號的AccessKey ID', `password` = '當前阿里云賬號的AccessKey Secret', `dbname` = '當前Hologres的數據庫名稱', `tablename` = 'blink_test'--當前Hologres接收數據的表名稱。 ); insert into test select a,b,c from randomSource;
上線作業。
完成新建作業后,單擊編輯框的語法檢查,如果顯示成功,則表明語法正確。
單擊保存保存作業。
單擊上線,提交作業至生產環境。根據業務情況填寫作業的上線配置。
啟動作業。
提交作業至生產環境后,您需要手動啟動作業。
在阿里實時計算開發平臺頁面頂部菜單欄右側,單擊運維,跳轉至運維界面,選擇需要啟動的作業,單擊右上角的啟動。
Hologres實時查詢數據。
查詢Hologres中用于接收數據的表,就可以實時獲取到已寫入的數據。示例查詢SQL語句如下。
select * from blink_test;
如何使用寬表Merge/局部更新功能
對于常見的多個流的數據寫入至一張Hologres寬表的場景,具體使用方法如下:
假設Hologres有一張寬表WIDE_TABLE,有A、B、C、D、E幾列,其中A字段是主鍵,Flink一個流包含數據A、B、C,另一個流包含數據A、D、E。
使用Flink SQL聲明兩張Hologres結果表,其中一張表只聲明字段A、B、C,另一張表只聲明字段A、D、E,這兩張表都映射至《WIDE_TABLE》。
兩張結果表的mutatetype屬性都設置成insertorupdate。
兩張結果表的ignoredelete屬性都設置成true,防止回撤消息產生Delete請求。
將兩個流的數據分別Insert至兩張結果表中。
該場景的具體使用限制如下:
寬表必須有主鍵。
每個流的數據都必須包含完整主鍵字段。
列存表的寬表Merge場景在高RPS的情況下,CPU使用率會偏高,建議關閉表中字段的Dictionary encoding。
實時寫入數據至Hologres的分區結果表
Hologres支持通過調用實時數據API接口,直接將數據寫入分區父表中,對應的分區數據將會自動路由至分區子表。您可以直接寫入數據至分區表。實時數據API的描述,詳情請參見實時數據API。
使用限制如下:
Hologres當前版本僅支持List分區。
創建分區表時,需要顯示指定的分區列,分區列的類型僅支持text和 int4。
如果設置了主鍵,分區列必須為主鍵的一部分。
創建分區子表時,子表分區列的值必須為固定值。
寫入分區子表的數據對應的分區列值,必須嚴格與創建子表時定義的值匹配,否則會報錯。
Hologres當前不支持默認分區。
Hologres創建分區表。
在Hologres中創建一張用于接收數據的分區表,并創建對應的分區子表。示例建表SQL語句如下。
--創建分區父表test_message和對應的分區子表。 drop table if exists test_message; begin; create table test_message ( "bizdate" text NOT NULL, "tag" text NOT NULL, "id" int4 NOT NULL, "title" text NOT NULL, "body" text, PRIMARY KEY (bizdate,tag,id) ) PARTITION BY LIST (bizdate); commit;
說明執行命令時,
${bizdate}
參數需要替換為實際值。Blink獨享模式3.7版本才支持自動創建分區,如果您使用的是Blink獨享模式3.7以下的版本,需要在Hologres中提前創建分區子表,否則會導入數據失敗。
Blink獨享創建作業。
在Blink獨享模式中創建作業的示例語句如下。
說明以下示例適用于獨享在Blink獨享模式3.7及以上版本。如果您使用的是在Blink獨享模式3.7以下版本,請升級至3.7及以上版本,或者刪除自動創建分區子表的配置
`createparttable` = 'true'
。create table test_message_src( tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'random', `interval` = '10', `count` = '100' ); create table test_message_sink ( bizdate VARCHAR, tag VARCHAR, id INTEGER, title VARCHAR, body VARCHAR ) with ( type = 'hologres', `endpoint` = '$ip:$port', --Hologres實例的VPC網絡地址。 `username` ='<AccessID>', --當前阿里云賬號的AccessKey ID。 `password` = '<AccessKey>', --當前阿里云賬號的AccessKey Secret。 `dbname` = '<DBname>', --當前Hologres的數據庫名稱。 `tablename` = '<Tablename>', --當前Hologres數據庫的表名稱。 `partitionrouter` = 'true', --寫入數據至Hologres的分區表。 `createparttable` = 'true', --自動創建Hologres的分區子表。 ); insert into test_message_sink select "20200327",* from test_message_src; insert into test_message_sink select "20200328",* from test_message_src;
上線并啟動作業。
請參考實時寫入數據至Hologres結果表章節中的上線作業和啟動作業步驟。
Hologres實時查詢數據。
查詢Hologres中用于接收數據的表,就可以實時獲取到已寫入的數據。示例查詢SQL語句如下。
select * from test_message; select * from test_message where bizdate = '20200327';
數據類型映射
Blink獨享與Hologres的數據類型映射,請參見數據類型匯總。