SmartData 3.0.x版本支持Flink可恢復(fù)性寫入OSS,SmartData 3.1.x版本支持Flink可恢復(fù)性寫入JindoFS或OSS。通過(guò)Flink自有的檢查點(diǎn)(Checkpoint)機(jī)制,當(dāng)寫入存儲(chǔ)介質(zhì)的作業(yè)發(fā)生局部失敗時(shí),作業(yè)可以迅速自動(dòng)恢復(fù),并繼續(xù)寫入。

背景信息

可恢復(fù)性寫入功能支持將數(shù)據(jù)以EXACTLY_ONCE語(yǔ)義寫入存儲(chǔ)介質(zhì),在大數(shù)據(jù)場(chǎng)景下保證了數(shù)據(jù)的安全性和一致性。

在Flink作業(yè)中的用法

  • 通用配置
    為了支持EXACTLY_ONCE語(yǔ)義寫入JindoFS或OSS,您需要執(zhí)行如下配置:
    1. 打開(kāi)Flink的檢查點(diǎn)(Checkpoint)。
      示例如下。
      1. 通過(guò)如下方式建立的StreamExecutionEnvironment。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. 執(zhí)行如下命令,啟動(dòng)Checkpoint。
        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. 使用可以重發(fā)的數(shù)據(jù)源,例如Kafka。
  • 便捷使用

    您無(wú)需額外引入依賴,只需使用帶有jfs://oss://前綴的路徑,就可以使用該功能。JindoFS可以自動(dòng)識(shí)別jfs://oss://前綴,并啟用該功能。

    例如,以DataStream<String>的對(duì)象OutputStream為例。
    1. 添加Sink。
      • 將其寫入JindoFS時(shí),您可以執(zhí)行如下命令添加Sink。
        String outputPath = "jfs://<user-defined-jfs-namespace>/<user-defined-jfs-dir>"
        StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(outputPath),
                new SimpleStringEncoder<String>("UTF-8")
        ).build();
        outputStream.addSink(sink);
      • 將其寫入OSS,您可以執(zhí)行如下命令添加Sink。
        String outputPath = "oss://<user-defined-oss-bucket>/<user-defined-oss-dir>"
        StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
                new Path(outputPath),
                new SimpleStringEncoder<String>("UTF-8")
        ).build();
        outputStream.addSink(sink);
    2. 使用env.execute()執(zhí)行Flink作業(yè)即可。

自定義配置

您在提交Flink作業(yè)時(shí),可以自定義參數(shù),以開(kāi)啟或控制特定功能。

例如,以yarn-cluster模式提交Flink作業(yè)時(shí),通過(guò)-yD配置。示例如下。
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
SmartData 支持通過(guò)配置開(kāi)啟熵注入(Entropy Injection)功能或控制分片上傳(Multipart Upload)的并行度。
  • 熵注入(SmartData 3.1.x及其后續(xù)版本)
    該功能可以匹配寫入路徑的一段特定字符串,用一段隨機(jī)的字符串進(jìn)行替換,以削弱所謂片區(qū)效應(yīng),提高寫入效率。
    • 如果是寫入JindoFS(Block或Cache模式),則需要提供下列配置。
      jfs.entropy.key=<user-defined-key>
      jfs.entropy.length=<user-defined-length>
    • 如果是寫入OSS,則需要提供下列配置。
      oss.entropy.key=<user-defined-key>
      oss.entropy.length=<user-defined-length>
  • 分片上傳(SmartData 3.0.x及其后續(xù)版本)

    當(dāng)寫入場(chǎng)景為OSS或JindoFS Cache模式時(shí),可恢復(fù)性讀寫功能會(huì)自動(dòng)調(diào)用高效的分片上傳機(jī)制,將待上傳的文件分為多個(gè)數(shù)據(jù)塊分別上傳,最后組合。目前支持配置參數(shù)oss.upload.max.concurrent.uploads,用來(lái)控制上傳數(shù)據(jù)塊的并行度,如果設(shè)置較高的數(shù)值則可能會(huì)提高寫入效率(但也會(huì)占用更多資源)。默認(rèn)情況下,該值為當(dāng)前可用的處理器數(shù)量。