EMR-3.38.3及后續版本的DataFlow集群,可以通過數據湖元數據DLF(Data Lake Formation)作為元數據讀取DataLake集群或自定義集群中的數據。本文為您介紹Dataflow集群如何連接DLF,并讀取Hudi全量數據。
前提條件
使用限制
DataFlow集群和DataLake集群需要在同一VPC下。
操作流程
步驟一:環境準備
拷貝DataLake集群中${HIVE_CONF_DIR}
下的hive-site.xml到DataFlow集群。
例如,${HIVE_CONF_DIR}
為/etc/taihao-apps/hive-conf/。
mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/
步驟二:啟動Flink SQL
重要
- 務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。
- 無需關注Datalake集群中的Hive版本,Hive依賴均使用2.3.6版本的。
執行以下命令,啟動Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
測試時可設置以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
步驟三:創建并驗證Catalog
進入Flink SQL后,分別創建DLF Catalog和Hive Catalog用于讀取Hudi表和Hive表。
- 執行以下命令,創建Catalog。
- 創建DLF Catalog
CREATE CATALOG dlf_catalog WITH ( 'type' = 'dlf', 'access.key.id' = '<yourAccessKeyId>', --您阿里云賬號的AccessKey ID。 'access.key.secret' = '<yourAccessKeyId>', --您阿里云賬號的AccessKey Secret。 'warehouse' = 'oss://oss-bucket/warehouse/test.db', 'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中獲取。 'dlf.endpoint' = '<dlf.endpoint>', --從${HIVE_CONF_DIR}/hive-site.xml中獲取。 'dlf.region-id' = '<dlf.region-id>' --從${HIVE_CONF_DIR}/hive-site.xml中獲取。 );
- 創建Hive Catalog重要 無需關注Datalake集群中的Hive版本,
hive-version
均使用2.3.6。CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/etc/taihao-apps/hive-conf/', 'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/' );
Catalog創建成功后,均會返回以下信息。[INFO] Execute statement succeed.
- 創建DLF Catalog
- 執行以下命令,驗證Catalog。
- 驗證DLF Catalog
select * from dlf_catalog.test.hudi_table;
- 驗證Hive Catalog
select * from hive_catalog.test.hive_table;
- 驗證DLF Catalog
步驟四:Flink SQL寫入Hudi
- 場景一:數據入湖使用Datagen Connector隨機生成上游Source數據,入湖Hudi表。
-- 構建上游Source數據 CREATE TABLE datagen_source ( uuid int, age int, ts bigint, ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 創建Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl1( id int not null, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl1', -- required, Hive新建的表名。 'hive_sync.db'='test', -- required, Hive新建的數據庫名。 'hive_sync.mode' = 'hms' -- required, 將hive sync mode設置為hms, 默認jdbc。 ); --入湖 insert into dlf_catalog.test.hudi_tbl1 select uuid as id, age, ts from default_catalog.default_database.datagen_source; -- 查詢驗證 select * from dlf_catalog.test.hudi_tbl1;
- 場景二:維度打寬入湖
使用ODS層的Hudi數據和Hive的維度表關聯打寬填充維度字段,最后寫入新的Hudi表。
例如,已有表dlf_catalog.test.hive_dim_tbl,表結構如下。id int name string
詳細示例如下。-- 創建目標Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl2( id int not null, name string, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl2', -- required, Hive新建的表名。 'hive_sync.db'='test', -- required, Hive新建的數據庫名。 'hive_sync.mode' = 'hms' -- required, 將hive sync mode設置為hms, 默認jdbc。 ); -- 關聯Hive維度表后入湖 insert into dlf_catalog.test.hudi_tbl2 select s.id, name, age, ts from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;
步驟五:DataLake集群查詢Hudi
登錄DataLake集群查詢Hudi數據。登錄集群詳情請參見登錄集群。
- Spark查詢
Spark查詢詳情,請參見Hudi與Spark SQL集成。
- 執行以下命令,啟動spark-sql。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
如果您集群的Spark是Spark 3,且Hudi為0.11及以上版本,則需額外添加以下配置。--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
- 執行以下命令,驗證表信息。
- 查詢hudi_tbl1
select * from test.hudi_tbl1;
- 查詢hudi_tbl2
select * from test.hudi_tbl2;
- 查詢hudi_tbl1
- 執行以下命令,啟動spark-sql。
- Hudi查詢
- 執行以下命令,啟動Hive CLI。
hive
- 執行以下命令,驗證表信息。
- 查詢hudi_tbl1
select * from test.hudi_tbl1;
- 查詢hudi_tbl2
select * from test.hudi_tbl2;
- 查詢hudi_tbl1
- 執行以下命令,啟動Hive CLI。