SmartData 3.0.x版本支持Flink可恢復(fù)性寫入OSS,SmartData 3.1.x版本支持Flink可恢復(fù)性寫入JindoFS或OSS。通過(guò)Flink自有的檢查點(diǎn)(Checkpoint)機(jī)制,當(dāng)寫入存儲(chǔ)介質(zhì)的作業(yè)發(fā)生局部失敗時(shí),作業(yè)可以迅速自動(dòng)恢復(fù),并繼續(xù)寫入。
背景信息
可恢復(fù)性寫入功能支持將數(shù)據(jù)以EXACTLY_ONCE語(yǔ)義寫入存儲(chǔ)介質(zhì),在大數(shù)據(jù)場(chǎng)景下保證了數(shù)據(jù)的安全性和一致性。
在Flink作業(yè)中的用法
- 通用配置
為了支持EXACTLY_ONCE語(yǔ)義寫入JindoFS或OSS,您需要執(zhí)行如下配置:
- 打開(kāi)Flink的檢查點(diǎn)(Checkpoint)。
示例如下。
- 通過(guò)如下方式建立的StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 執(zhí)行如下命令,啟動(dòng)Checkpoint。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
- 通過(guò)如下方式建立的StreamExecutionEnvironment。
- 使用可以重發(fā)的數(shù)據(jù)源,例如Kafka。
- 打開(kāi)Flink的檢查點(diǎn)(Checkpoint)。
- 便捷使用
您無(wú)需額外引入依賴,只需使用帶有jfs://或oss://前綴的路徑,就可以使用該功能。JindoFS可以自動(dòng)識(shí)別jfs://或oss://前綴,并啟用該功能。
例如,以DataStream<String>的對(duì)象OutputStream為例。- 添加Sink。
- 將其寫入JindoFS時(shí),您可以執(zhí)行如下命令添加Sink。
String outputPath = "jfs://<user-defined-jfs-namespace>/<user-defined-jfs-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
- 將其寫入OSS,您可以執(zhí)行如下命令添加Sink。
String outputPath = "oss://<user-defined-oss-bucket>/<user-defined-oss-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
- 將其寫入JindoFS時(shí),您可以執(zhí)行如下命令添加Sink。
- 使用
env.execute()
執(zhí)行Flink作業(yè)即可。
- 添加Sink。
自定義配置
您在提交Flink作業(yè)時(shí),可以自定義參數(shù),以開(kāi)啟或控制特定功能。
-yD
配置。示例如下。<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
- 熵注入(SmartData 3.1.x及其后續(xù)版本)
該功能可以匹配寫入路徑的一段特定字符串,用一段隨機(jī)的字符串進(jìn)行替換,以削弱所謂片區(qū)效應(yīng),提高寫入效率。
- 如果是寫入JindoFS(Block或Cache模式),則需要提供下列配置。
jfs.entropy.key=<user-defined-key> jfs.entropy.length=<user-defined-length>
- 如果是寫入OSS,則需要提供下列配置。
oss.entropy.key=<user-defined-key> oss.entropy.length=<user-defined-length>
- 如果是寫入JindoFS(Block或Cache模式),則需要提供下列配置。
- 分片上傳(SmartData 3.0.x及其后續(xù)版本)
當(dāng)寫入場(chǎng)景為OSS或JindoFS Cache模式時(shí),可恢復(fù)性讀寫功能會(huì)自動(dòng)調(diào)用高效的分片上傳機(jī)制,將待上傳的文件分為多個(gè)數(shù)據(jù)塊分別上傳,最后組合。目前支持配置參數(shù)oss.upload.max.concurrent.uploads,用來(lái)控制上傳數(shù)據(jù)塊的并行度,如果設(shè)置較高的數(shù)值則可能會(huì)提高寫入效率(但也會(huì)占用更多資源)。默認(rèn)情況下,該值為當(dāng)前可用的處理器數(shù)量。