支持流式入庫的系統都基本遵循了一個思路,流式數據按照小批量數據寫小文件到存儲系統,然后定時合并這些文件。例如,Hive和Delta Lake。Kudu也支持流式入庫,但是Kudu的存儲是自己設計的,不屬于基于大數據存儲系統之上的解決方案。本文以Kafka數據源為例介紹流式入庫操作。
前提條件
使用限制
創建的DataLake集群或Custom集群需要與DataFlow集群在同一VPC和交換機下,不支持跨VPC。
流式入庫演變
階段 | 詳細情況 |
以前 | 以前針對流式入庫的需求,通常都是自己動手,事實表按照時間劃分Partition,粒度比較細。例如,五分鐘一個Partition,每當一個Partition運行完成,觸發一個INSERT OVERWRITE動作,合并該Partition內的文件重新寫入分區。但是這么做有以下幾個問題:
Hive從0.13版本提供了事務支持,并且從2.0版本開始提供了Hive Streaming功能來實現流式入庫的支持。但是在實際使用Hive Streaming功能的案例并不多見。其主要原因如下:
|
現在 | 有了Delta,可以很方便地應對流式入庫的場景。只需要以下四個動作:
|
Delta實例展示
從上游Kafka中讀取數據,寫入Delta表。
使用SSH方式登錄DataFlow集群,詳情請參見登錄集群。
執行以下命令,創建Kafka Topic。
sudo su - kafka kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
說明core-1-1
為DataFlow集群中Broker節點的內網IP地址或主機名。準備一個Python腳本,不斷向Kafka內發送數據。
#! /usr/bin/env python3 import json import time from kafka import KafkaProducer from kafka.errors import KafkaError bootstrap = ['core-1-1:9092'] topic = 'delta_stream_sample' def gnerator(): id = 0 line = {} while True: line['id'] = id line['date'] = '2019-11-11' line['name'] = 'Robert' line['sales'] = 123 yield line id = id + 1 def sendToKafka(): producer = KafkaProducer(bootstrap_servers=bootstrap) for line in gnerator(): data = json.dumps(line).encode('utf-8') # Asynchronous by default future = producer.send(topic, data) # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError as e: # Decide what to do if produce request failed pass time.sleep(0.1) sendToKafka()
為了方便,數據只有
id
不一樣。{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
啟動一個Spark Streaming作業,從Kafka讀數據,寫入Delta表。
編寫Spark代碼。
以Scala版代碼為例,代碼示例如下。
import org.apache.spark.SparkConf import org.apache.spark.sql.{SparkSession, functions} import org.apache.spark.sql.types.{DataTypes, StructField} object StreamToDelta { def main(args: Array[String]): Unit = { val targetDir = "/tmp/delta_table" val checkpointLocation = "/tmp/delta_table_checkpoint" // 192.168.XX.XX 為kafka內網IP地址 val bootstrapServers = "192.168.XX.XX:9092" val topic = "delta_stream_sample" val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("date", DataTypes.DateType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("sales", DataTypes.StringType, false))) val sparkConf = new SparkConf() //StreamToDelta為scala的類名 val spark = SparkSession .builder() .config(sparkConf) .appName("StreamToDelta") .getOrCreate() val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .option("maxOffsetsPerTrigger", 1000) .option("startingOffsets", "earliest") .option("failOnDataLoss", value = false) .load() .select(functions.from_json(functions.col("value").cast("string"), schema).as("json")) .select("json.*") val query = lines.writeStream .outputMode("append") .format("delta") .option("checkpointLocation", checkpointLocation) .start(targetDir) query.awaitTermination() } }
打包程序并部署到DataLake集群。
本地調試完成后,通過以下命令打包。
mvn clean install
使用SSH方式登錄DataLake集群,詳情信息請參見登錄集群。
上傳JAR包至DataLake集群。
本示例是上傳到DataLake集群的根目錄下。
提交運行Spark作業。
執行以下命令,通過spark-submit提交Spark作業。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \ --class com.aliyun.delta.StreamToDelta \ delta-demo-1.0.jar
說明delta-demo-1.0.jar為您打包好的JAR包。--class和JAR包請根據您實際信息修改。
另外新建一個spark-shell,確認已經讀到數據。
Scala
執行以下命令,進入spark-shell客戶端。
spark-shell --master local
執行以下Scala語句,查詢數據。
val df = spark.read.format("delta").load("/tmp/delta_table") df.select("*").orderBy("id").show(10000)
SQL
執行以下命令,進入streaming-sql客戶端。
streaming-sql --master local
執行以下SQL語句,查詢數據。
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
現在已經寫入了2285條數據。
|2295|2019-11-11|Robert| 123| |2296|2019-11-11|Robert| 123| |2297|2019-11-11|Robert| 123| |2275|2019-11-11|Robert| 123| |2276|2019-11-11|Robert| 123| |2277|2019-11-11|Robert| 123| |2278|2019-11-11|Robert| 123| |2279|2019-11-11|Robert| 123| |2280|2019-11-11|Robert| 123| |2281|2019-11-11|Robert| 123| |2282|2019-11-11|Robert| 123| |2283|2019-11-11|Robert| 123| |2284|2019-11-11|Robert| 123| |2285|2019-11-11|Robert| 123| +----+----------+------+-----+
Exactly-Once測試
停掉Spark Streaming作業,再重新啟動。重新讀一下表,讀數據正常的話,數據能夠從上次斷掉的地方銜接上。
Scala
df.select("*").orderBy("id").show(10000)
SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
|2878|2019-11-11|Robert| 123| |2879|2019-11-11|Robert| 123| |2880|2019-11-11|Robert| 123| |2881|2019-11-11|Robert| 123| |2882|2019-11-11|Robert| 123| |2883|2019-11-11|Robert| 123| |2884|2019-11-11|Robert| 123| |2885|2019-11-11|Robert| 123| |2886|2019-11-11|Robert| 123| |2887|2019-11-11|Robert| 123| |2888|2019-11-11|Robert| 123| |2889|2019-11-11|Robert| 123| |2890|2019-11-11|Robert| 123| |2891|2019-11-11|Robert| 123|