日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

流式入庫

支持流式入庫的系統都基本遵循了一個思路,流式數據按照小批量數據寫小文件到存儲系統,然后定時合并這些文件。例如,Hive和Delta Lake。Kudu也支持流式入庫,但是Kudu的存儲是自己設計的,不屬于基于大數據存儲系統之上的解決方案。本文以Kafka數據源為例介紹流式入庫操作。

前提條件

  • 已在E-MapReduce控制臺上,創建選擇了DeltaLake服務的DataLake集群或Custom集群,詳情請參見創建集群

  • 已在E-MapReduce控制臺上,創建選擇了Kafka服務的DataFlow集群,詳情請參見創建集群

使用限制

創建的DataLake集群或Custom集群需要與DataFlow集群在同一VPC和交換機下,不支持跨VPC。

流式入庫演變

階段

詳細情況

以前

以前針對流式入庫的需求,通常都是自己動手,事實表按照時間劃分Partition,粒度比較細。例如,五分鐘一個Partition,每當一個Partition運行完成,觸發一個INSERT OVERWRITE動作,合并該Partition內的文件重新寫入分區。但是這么做有以下幾個問題:

  • 缺少讀寫隔離,易造成讀端失敗或者產生數據準確性問題。

  • 流式作業沒有Exactly-Once保證,入庫作業失敗后需要人工介入,確保數據不會寫重或者寫漏(如果是SparkStreaming,有At-Least-Once保證)。

Hive從0.13版本提供了事務支持,并且從2.0版本開始提供了Hive Streaming功能來實現流式入庫的支持。但是在實際使用Hive Streaming功能的案例并不多見。其主要原因如下:

  • Hive事務的實現修改了底層文件,導致公共的存儲格式等僅能夠被Hive讀取,導致很多使用SparkSQL、Presto等進行數據分析的用戶無法使用該功能。

  • Hive事務目前僅支持ORC。

  • Hive的模式為Merge-on-read,需要對小文件進行Sort-Merge。小文件數量增多之后讀性能急劇下降,所以用戶需要及時進行小文件的合并。而小文件的合并作業經常失敗,影響用戶業務效率。

  • Hive這種模式無法拓展到Data Lake場景,僅僅停留在Data Warehouse場景。在Data Lake場景中,數據來源以及數據需求都是多樣性的。

現在

有了Delta,可以很方便地應對流式入庫的場景。只需要以下四個動作:

  1. 建表。

  2. 啟動Spark Streaming任務寫入數據。

  3. 定時Optimize(例如:每個Partition寫入完成)。

  4. 定時Vacuum(例如:每天)。

Delta實例展示

從上游Kafka中讀取數據,寫入Delta表。

  1. 使用SSH方式登錄DataFlow集群,詳情請參見登錄集群

  2. 執行以下命令,創建Kafka Topic。

    sudo su - kafka
    kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --create
    說明

    core-1-1為DataFlow集群中Broker節點的內網IP地址或主機名。

  3. 準備一個Python腳本,不斷向Kafka內發送數據。

    #! /usr/bin/env python3
    
    import json
    import time
    
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    
    bootstrap = ['core-1-1:9092']
    topic = 'delta_stream_sample'
    
    def gnerator():
        id = 0
        line = {}
        while True:
            line['id'] = id
            line['date'] = '2019-11-11'
            line['name'] = 'Robert'
            line['sales'] = 123
            yield line
            id = id + 1  
    
    def sendToKafka():
        producer = KafkaProducer(bootstrap_servers=bootstrap)
    
        for line in gnerator():
            data = json.dumps(line).encode('utf-8')
    
            # Asynchronous by default
            future = producer.send(topic, data)
    
            # Block for 'synchronous' sends
            try:
                record_metadata = future.get(timeout=10)
            except KafkaError as e:
                # Decide what to do if produce request failed
                pass
            time.sleep(0.1)
    
    sendToKafka()

    為了方便,數據只有id不一樣。

    {"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
    {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
  4. 啟動一個Spark Streaming作業,從Kafka讀數據,寫入Delta表。

    1. 編寫Spark代碼。

      以Scala版代碼為例,代碼示例如下。

      import org.apache.spark.SparkConf
      import org.apache.spark.sql.{SparkSession, functions}
      import org.apache.spark.sql.types.{DataTypes, StructField}
      
      object StreamToDelta {
        def main(args: Array[String]): Unit = {
          val targetDir = "/tmp/delta_table"
          val checkpointLocation = "/tmp/delta_table_checkpoint"
          // 192.168.XX.XX 為kafka內網IP地址
          val bootstrapServers = "192.168.XX.XX:9092"
          val topic = "delta_stream_sample"
      
          val schema = DataTypes.createStructType(Array[StructField](
            DataTypes.createStructField("id", DataTypes.LongType, false),
            DataTypes.createStructField("date", DataTypes.DateType, false),
            DataTypes.createStructField("name", DataTypes.StringType, false),
            DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
          val sparkConf = new SparkConf()
      
          //StreamToDelta為scala的類名
          val spark = SparkSession
            .builder()
            .config(sparkConf)
            .appName("StreamToDelta")
            .getOrCreate()
      
          val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option("subscribe", topic)
            .option("maxOffsetsPerTrigger", 1000)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", value = false)
            .load()
            .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
            .select("json.*")
      
          val query = lines.writeStream
            .outputMode("append")
            .format("delta")
            .option("checkpointLocation", checkpointLocation)
            .start(targetDir)
      
          query.awaitTermination()
        }
      }
    2. 打包程序并部署到DataLake集群。

      1. 本地調試完成后,通過以下命令打包。

        mvn clean install
      2. 使用SSH方式登錄DataLake集群,詳情信息請參見登錄集群

      3. 上傳JAR包至DataLake集群。

        本示例是上傳到DataLake集群的根目錄下。

    3. 提交運行Spark作業。

      執行以下命令,通過spark-submit提交Spark作業。

      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \
       --class com.aliyun.delta.StreamToDelta \
       delta-demo-1.0.jar
      說明

      delta-demo-1.0.jar為您打包好的JAR包。--class和JAR包請根據您實際信息修改。

  5. 另外新建一個spark-shell,確認已經讀到數據。

    • Scala

      1. 執行以下命令,進入spark-shell客戶端。

        spark-shell --master local 
      2. 執行以下Scala語句,查詢數據。

        val df = spark.read.format("delta").load("/tmp/delta_table")
        df.select("*").orderBy("id").show(10000)
    • SQL

      1. 執行以下命令,進入streaming-sql客戶端。

        streaming-sql --master local
      2. 執行以下SQL語句,查詢數據。

        SELECT * FROM delta_table ORDER BY id LIMIT 10000;

        現在已經寫入了2285條數據。

        |2295|2019-11-11|Robert|  123|
        |2296|2019-11-11|Robert|  123|
        |2297|2019-11-11|Robert|  123|
        |2275|2019-11-11|Robert|  123|
        |2276|2019-11-11|Robert|  123|
        |2277|2019-11-11|Robert|  123|
        |2278|2019-11-11|Robert|  123|
        |2279|2019-11-11|Robert|  123|
        |2280|2019-11-11|Robert|  123|
        |2281|2019-11-11|Robert|  123|
        |2282|2019-11-11|Robert|  123|
        |2283|2019-11-11|Robert|  123|
        |2284|2019-11-11|Robert|  123|
        |2285|2019-11-11|Robert|  123|
        +----+----------+------+-----+

Exactly-Once測試

停掉Spark Streaming作業,再重新啟動。重新讀一下表,讀數據正常的話,數據能夠從上次斷掉的地方銜接上。

  • Scala

    df.select("*").orderBy("id").show(10000)
  • SQL

    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|