本文為您介紹Hudi CDC功能的相關參數和使用示例。
背景信息
CDC(Change Data Capture)定義了一種場景,即識別并捕獲數據庫表中數據的變更,并交付給下游進一步處理。Hudi CDC能夠將Hudi表作為Source,直接獲取變更的數據信息。
使用限制
僅EMR-3.45.0及后續版本和EMR-5.11.0及后續版本的集群,并且Hudi版本為0.12.2時,支持使用Hudi CDC功能。
相關參數
CDC寫參數
參數 | 說明 |
hoodie.table.cdc.enabled | 是否開啟CDC,取值如下:
|
hoodie.table.cdc.supplemental.logging.mode | CDC文件存儲模式,共有三種等級:
|
CDC讀參數
參數 | 說明 |
hoodie.datasource.query.type | 查詢類型,使用CDC功能需配置為 默認值為snapshot。 |
hoodie.datasource.query.incremental.format | 增量查詢類型,使用CDC功能需配置為 默認值為latest_state。 |
hoodie.datasource.read.begin.instanttime | 增量查詢起始時間。 |
hoodie.datasource.read.end.instanttime | 增量查詢截止時間,可選參數。 |
使用示例
Spark SQL
在Spark服務配置頁面的spark-defaults.conf頁簽中,新增配置項參數spark.serializer,參數值為org.apache.spark.serializer.KryoSerializer。新增配置項的具體操作,請參見添加配置項。
執行以下命令,新建表。
create table hudi_cdc_test ( id bigint, name string, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts', 'hoodie.table.cdc.enabled' = 'true', 'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after' );
執行以下命令,向表中寫入數據并查看表信息。
insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001); select * from hudi_cdc_test;
返回信息如下。
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 1 a1 1000 20230129220605215 20230129220605215_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 2 a2 1001
從.hoodie目錄中獲取上一次commit的時間戳,進行CDC查詢。
獲取上一次commit的時間戳。
-rw-r--r-- 1 zxy staff 1.2K 1 29 22:06 20230129220605215.commit -rw-r--r-- 1 zxy staff 0B 1 29 22:06 20230129220605215.commit.requested -rw-r--r-- 1 zxy staff 798B 1 29 22:06 20230129220605215.inflight
執行以下命令,進行CDC查詢。
由于查詢區間為左開右閉,所以將時間戳減1作為起始時間。
select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");
返回信息如下。
i 20230129220605215 NULL {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1} i 20230129220605215 NULL {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
執行以下命令,再次寫入數據并查看表信息。
insert into hudi_cdc_test values (2, 'a2', 1002); select * from hudi_cdc_test;
返回信息如下。
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 1 a1 1000 20230129221304930 20230129221304930_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 2 a2 1002
參見步驟3,獲取上一次commit的時間戳并減1,進行CDC查詢。
例如,獲取到的時間戳為
20230129221304930
。執行以下命令,進行CDC查詢。select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");
返回信息如下。
u 20230129221304930 {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}
Dataframe
準備工作。
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase} val spark: SparkSession = SparkSession.builder() .master("local[4]") .withExtensions(new HoodieSparkSessionExtension) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ val basePath = "/tmp/test/hudi_cdc_test" val writeOpts = Map( "hoodie.table.name" -> "hudi_cdc_test", "hoodie.datasource.write.recordkey.field" -> "id", "hoodie.datasource.write.precombine.field" -> "ts", "hoodie.table.cdc.enabled" -> "true", "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only" ) val readOpts = Map( "hoodie.datasource.query.type" -> "incremental", "hoodie.datasource.query.incremental.format" -> "cdc" )
使用df1寫入數據。
val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts") df1.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df1.show(false)
返回信息如下。
+---+----+----+ |id |name|ts | +---+----+----+ |1 |a1 |1000| |2 |a2 |1001| +---+----+----+
讀取cdc1的數據。
val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf()) .build() val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc1 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString) .load(basePath) cdc1.show(false)
返回信息如下。
+---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before|after | +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |i |20230128030951890|null |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}| |i |20230128030951890|null |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}| +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
使用df2寫入數據。
val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts") df2.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df2.show(false)
返回信息如下。
+---+----+----+ |id |name|ts | +---+----+----+ |2 |a2 |1002| +---+----+----+
讀取cdc2的數據。
val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc2 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString) .load(basePath) cdc2.show(false)
返回信息如下。
+---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before |after | +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |u |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}| +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+