EMR-3.38.3及后續版本的DataFlow集群,可以通過數據湖元數據DLF(Data Lake Formation)作為元數據讀取DataLake集群或自定義集群中的數據。本文為您介紹Dataflow集群如何連接DLF,并讀取Hudi全量數據。

前提條件

  • 已在E-MapReduce控制臺上創建DataFlow集群和DataLake集群,詳情請參見創建集群
    重要 創建DataLake集群時,元數據需為DLF統一元數據
  • 已開通數據湖構建DLF,詳情請參見快速入門

使用限制

DataFlow集群和DataLake集群需要在同一VPC下。

操作流程

  1. 步驟一:環境準備
  2. 步驟二:啟動Flink SQL
  3. 步驟三:創建并驗證Catalog
  4. 步驟四:Flink SQL寫入Hudi
  5. 步驟五:DataLake集群查詢Hudi

步驟一:環境準備

拷貝DataLake集群中${HIVE_CONF_DIR}下的hive-site.xml到DataFlow集群。

例如,${HIVE_CONF_DIR}/etc/taihao-apps/hive-conf/

mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

步驟二:啟動Flink SQL

重要
  • 務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。
  • 無需關注Datalake集群中的Hive版本,Hive依賴均使用2.3.6版本的。
執行以下命令,啟動Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
測試時可設置以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;

步驟三:創建并驗證Catalog

進入Flink SQL后,分別創建DLF Catalog和Hive Catalog用于讀取Hudi表和Hive表。

  1. 執行以下命令,創建Catalog。
    • 創建DLF Catalog
      CREATE CATALOG dlf_catalog WITH (
           'type' = 'dlf',
           'access.key.id' = '<yourAccessKeyId>', --您阿里云賬號的AccessKey ID。
           'access.key.secret' = '<yourAccessKeyId>', --您阿里云賬號的AccessKey Secret。
           'warehouse' = 'oss://oss-bucket/warehouse/test.db',
           'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中獲取。
           'dlf.endpoint' = '<dlf.endpoint>', --從${HIVE_CONF_DIR}/hive-site.xml中獲取。
           'dlf.region-id' = '<dlf.region-id>' --從${HIVE_CONF_DIR}/hive-site.xml中獲取。
       );
    • 創建Hive Catalog
      重要 無需關注Datalake集群中的Hive版本,hive-version均使用2.3.6。
      CREATE CATALOG hive_catalog WITH (
           'type' = 'hive',
           'default-database' = 'default',
           'hive-version' = '2.3.6',
           'hive-conf-dir' = '/etc/taihao-apps/hive-conf/',
           'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/'
       );
    Catalog創建成功后,均會返回以下信息。
    [INFO] Execute statement succeed.
  2. 執行以下命令,驗證Catalog。
    • 驗證DLF Catalog
      select * from dlf_catalog.test.hudi_table;
    • 驗證Hive Catalog
      select * from hive_catalog.test.hive_table;

步驟四:Flink SQL寫入Hudi

  • 場景一:數據入湖
    使用Datagen Connector隨機生成上游Source數據,入湖Hudi表。
    -- 構建上游Source數據
    CREATE TABLE datagen_source (
      uuid int,
      age int,
      ts bigint,
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10'
    );
    
    -- 創建Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl1(
      id int not null,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl1',    -- required, Hive新建的表名。
      'hive_sync.db'='test',            -- required, Hive新建的數據庫名。
      'hive_sync.mode' = 'hms'          -- required, 將hive sync mode設置為hms, 默認jdbc。
    );
    
    --入湖
    insert into dlf_catalog.test.hudi_tbl1
    select uuid as id, age, ts
    from default_catalog.default_database.datagen_source;
    
    -- 查詢驗證
    select * from dlf_catalog.test.hudi_tbl1;
  • 場景二:維度打寬入湖

    使用ODS層的Hudi數據和Hive的維度表關聯打寬填充維度字段,最后寫入新的Hudi表。

    例如,已有表dlf_catalog.test.hive_dim_tbl,表結構如下。
    id                      int
    name                    string
    詳細示例如下。
    -- 創建目標Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl2(
      id int not null,
      name string,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl2',    -- required, Hive新建的表名。
      'hive_sync.db'='test',            -- required, Hive新建的數據庫名。
      'hive_sync.mode' = 'hms'          -- required, 將hive sync mode設置為hms, 默認jdbc。
    );
    
    -- 關聯Hive維度表后入湖
    insert into dlf_catalog.test.hudi_tbl2
    select s.id, name, age, ts
    from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;

步驟五:DataLake集群查詢Hudi

登錄DataLake集群查詢Hudi數據。登錄集群詳情請參見登錄集群

  • Spark查詢

    Spark查詢詳情,請參見Hudi與Spark SQL集成

    1. 執行以下命令,啟動spark-sql。
      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
      如果您集群的Spark是Spark 3,且Hudi為0.11及以上版本,則需額外添加以下配置。
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 執行以下命令,驗證表信息。
      • 查詢hudi_tbl1
        select * from test.hudi_tbl1;
      • 查詢hudi_tbl2
        select * from test.hudi_tbl2;
  • Hudi查詢
    1. 執行以下命令,啟動Hive CLI。
      hive
    2. 執行以下命令,驗證表信息。
      • 查詢hudi_tbl1
        select * from test.hudi_tbl1;
      • 查詢hudi_tbl2
        select * from test.hudi_tbl2;