本文以一個示例為您演示開源Flink如何實時寫入數據至Hologres。
前提條件
操作步驟
Hologres創建結果表。
Holoogres實例連接開發工具后,您需要創建一張結果表,用于接收實時寫入的數據。示例語句如下。
begin; create table order_details(user_id bigint, user_name text, item_id bigint, item_name text, price numeric(38, 2), province text, city text, ip text, longitude text, latitude text, sale_timestamp timestamptz not null, primary key(user_id, item_id)); call set_table_property ('order_details', 'segment_key', 'sale_timestamp'); commit;
下載并編譯Flink的JAR文件。
下載并安裝Hologres Connector依賴的JAR文件hologres-flink-connector-1.10-jar-with-dependencies.jar,示例語句如下。
mvn install:install-file -Dfile=hologres-flink-connector-1.10-jar-with-dependencies.jar -DgroupId=org.apache.flink -DartifactId=hologres-flink-connector -Dversion=1.10 -Dpackaging=jar -DgeneratePom=true
進入Hologres的Git官方示例庫,下載并編譯JAR文件,示例語句如下。
git clone https://github.com/hologres/hologres-flink-examples.git cd hologres-flink-examples git checkout -b example mvn package -DskipTests
提交Flink作業。
編譯完JAR文件后,配置作業參數,提交Flink作業,示例語句如下。
說明示例使用命令行方式提交Flink作業,您也可以選擇使用Flink Web頁面提交作業。
flink run -c io.hologres.flink.example.HologresSinkExample ../hologres-flink-example/target/hologres-flink-examples-1.0.0-jar-with-dependencies.jar --endpoint $ENDPOINT --username $USERNAME --password $PASSWORD --database $DATABASE --tablename order_details
參數說明如下表所示。
參數
描述
示例
endpoint
Hologres的Endpoint地址。
進入Hologres管理控制臺的實例詳情頁,從網絡信息獲取Endpoint。
說明本地Flink請使用Hologres的公共網絡地址,阿里云VPC網絡請使用Hologres的VPC網絡地址。
ssseeee-cn-hangzhou.hologres.aliyuncs.com:80
username
當前阿里云賬號的AccessKey ID。
您可以單擊AccessKey 管理,獲取AccessKey ID。
無
password
當前阿里云賬號的AccessKey Secret。
您可以單擊AccessKey 管理,獲取AccessKey Secret。
無
database
連接的Hologres數據庫名稱。
hologres_demo
tablename
Hologres接收數據的表名稱。
order_details
Hologres查詢數據。
成功啟動任務后,您可以在Hologres中實時查詢寫入的數據。示例語句如下。
select count(1) from order_details; select item_id, sum(price) as total from order_details group by item_id limit 10;