日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

文檔

Hudi CDC構建增量數倉

更新時間:

本文為您介紹Hudi CDC功能的相關參數和使用示例。

背景信息

CDC(Change Data Capture)定義了一種場景,即識別并捕獲數據庫表中數據的變更,并交付給下游進一步處理。Hudi CDC能夠將Hudi表作為Source,直接獲取變更的數據信息。

使用限制

僅EMR-3.45.0及后續版本和EMR-5.11.0及后續版本的集群,并且Hudi版本為0.12.2時,支持使用Hudi CDC功能。

相關參數

CDC寫參數

參數

說明

hoodie.table.cdc.enabled

是否開啟CDC,取值如下:

  • true:開啟CDC。

  • false(默認值):不開啟CDC。

hoodie.table.cdc.supplemental.logging.mode

CDC文件存儲模式,共有三種等級:

  • op_key_only:存儲記錄的主鍵和操作類型。

  • data_before:存儲記錄的主鍵、操作類型、記錄修改前的值。

  • data_before_after(默認值):存儲記錄的主鍵、操作類型、記錄修改前的值和修改后的值。

CDC讀參數

參數

說明

hoodie.datasource.query.type

查詢類型,使用CDC功能需配置為incremental

默認值為snapshot。

hoodie.datasource.query.incremental.format

增量查詢類型,使用CDC功能需配置為cdc

默認值為latest_state。

hoodie.datasource.read.begin.instanttime

增量查詢起始時間。

hoodie.datasource.read.end.instanttime

增量查詢截止時間,可選參數。

使用示例

Spark SQL

  1. 在Spark服務配置頁面的spark-defaults.conf頁簽中,新增配置項參數spark.serializer,參數值為org.apache.spark.serializer.KryoSerializer。新增配置項的具體操作,請參見添加配置項

  2. 執行以下命令,新建表。

    create table hudi_cdc_test (
      id bigint,
      name string,
      ts bigint
    ) using hudi
    tblproperties (
      type = 'cow',
      primaryKey = 'id',
      preCombineField = 'ts',
      'hoodie.table.cdc.enabled' = 'true',
      'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after'
    );
  3. 執行以下命令,向表中寫入數據并查看表信息。

    insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001);
    select * from hudi_cdc_test;

    返回信息如下。

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    1    a1    1000
    20230129220605215    20230129220605215_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    2    a2    1001
  4. .hoodie目錄中獲取上一次commit的時間戳,進行CDC查詢。

    1. 獲取上一次commit的時間戳。

      -rw-r--r--  1 zxy  staff   1.2K  1 29 22:06 20230129220605215.commit
      -rw-r--r--  1 zxy  staff     0B  1 29 22:06 20230129220605215.commit.requested
      -rw-r--r--  1 zxy  staff   798B  1 29 22:06 20230129220605215.inflight
    2. 執行以下命令,進行CDC查詢。

      由于查詢區間為左開右閉,所以將時間戳減1作為起始時間。

      select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");

      返回信息如下。

      i    20230129220605215    NULL    {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1}
      i    20230129220605215    NULL    {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
  5. 執行以下命令,再次寫入數據并查看表信息。

    insert into hudi_cdc_test values (2, 'a2', 1002);
    select * from hudi_cdc_test;

    返回信息如下。

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    1    a1    1000
    20230129221304930    20230129221304930_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    2    a2    1002
  6. 參見步驟3,獲取上一次commit的時間戳并減1,進行CDC查詢。

    例如,獲取到的時間戳為20230129221304930。執行以下命令,進行CDC查詢。

    select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");

    返回信息如下。

    u    20230129221304930    {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}

Dataframe

  1. 準備工作。

    import org.apache.hudi.common.table.HoodieTableMetaClient
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase}
    
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .withExtensions(new HoodieSparkSessionExtension)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._
    
    val basePath = "/tmp/test/hudi_cdc_test"
    
    val writeOpts = Map(
      "hoodie.table.name" -> "hudi_cdc_test",
      "hoodie.datasource.write.recordkey.field" -> "id",
      "hoodie.datasource.write.precombine.field" -> "ts",
      "hoodie.table.cdc.enabled" -> "true",
      "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only"
    )
    
    val readOpts = Map(
      "hoodie.datasource.query.type" -> "incremental",
      "hoodie.datasource.query.incremental.format" -> "cdc"
    )
  2. 使用df1寫入數據。

    val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts")
    df1.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df1.show(false)

    返回信息如下。

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |1  |a1  |1000|
    |2  |a2  |1001|
    +---+----+----+
  3. 讀取cdc1的數據。

    val metaClient = HoodieTableMetaClient.builder()
      .setBasePath(basePath)
      .setConf(spark.sessionState.newHadoopConf())
      .build()
    
    val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc1 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString)
      .load(basePath)
    cdc1.show(false)

    返回信息如下。

    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before|after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |i  |20230128030951890|null  |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}|
    |i  |20230128030951890|null  |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}|
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  4. 使用df2寫入數據。

    val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts")
    df2.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df2.show(false)

    返回信息如下。

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |2  |a2  |1002|
    +---+----+----+
  5. 讀取cdc2的數據。

    val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc2 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString)
      .load(basePath)
    cdc2.show(false)

    返回信息如下。

    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before                                                                                                                                                                                                                                                                                    |after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |u  |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}|
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+