日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開源Flink寫入OSS-HDFS服務

開源Flink不支持流式寫入OSS-HDFS服務,也不支持以EXACTLY_ONCE語義寫入存儲介質。當您希望開源Flink以EXACTLY_ONCE語義流式寫入OSS-HDFS服務,需要結合JindoSDK。

說明

如果您不希望通過Flink流式寫入OSS-HDFS服務前部署JindoSDK,您可以選擇阿里云實時計算Flink完成OSS-HDFS服務讀寫需求。更多信息,請參見實時計算Flink讀寫OSS或者OSS-HDFS

前提條件

  • 已創建ECS實例。具體步驟,請參見選購ECS實例

  • 已開通并授權訪問OSS-HDFS服務。具體操作,請參見開通并授權訪問OSS-HDFS服務

  • 已自行下載并安裝開源版本Flink,且版本不低于1.10.1。Flink 1.16.0及更高版本的可用性尚未得到驗證。關于Apache Flink的安裝包及版本說明,請參見Apache Flink

配置JindoSDK

  1. 登錄已創建的ECS實例。具體操作,請參見連接ECS實例

  2. 下載并解壓最新版本JindoSDK JAR包。下載地址,請參見GitHub

  3. 將JindoSDK解壓縮后的plugins/flink/目錄下的jindo-flink-${version}-full.jar文件移動至Flink所在根目錄下的lib文件夾。

    mv plugins/flink/jindo-flink-${version}-full.jar lib/
重要
  • 如果存在Apache Flink自帶的Flink OSS Connector,需將其移除,即從Flink的lib目錄或者plugins/oss-fs-hadoop路徑下移除flink-oss-fs-hadoop-${flink-version}.jar

  • JindoSDK配置完成后,無需額外配置即支持以常規Flink流式作業的方法進行使用。寫入OSS-HDFS服務以及OSS服務須使用相同的前綴oss://,JindoSDK會自動識別寫入的內容。

示例

  1. 通用配置

    為了支持EXACTLY_ONCE語義寫入OSS-HDFS,您需要執行如下配置:

    1. 打開Flink的檢查點(Checkpoint)。

      示例如下。

      1. 通過如下方式建立的StreamExecutionEnvironment。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      2. 執行如下命令,啟動Checkpoint。

        env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
    2. 使用可以重發的數據源,例如Kafka。

  2. 便捷使用

    您無需額外引入依賴,只需攜帶oss://前綴的路徑,并使用OSS-HDFS服務的Bucket及Endpoint,即可啟用Flink。

    1. 添加Sink。

      以將DataStream<String>的對象OutputStream寫入OSS-HDFS為例。

      String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>"
      StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(
              new Path(outputPath),
              new SimpleStringEncoder<String>("UTF-8")
      ).build();
      outputStream.addSink(sink);
      重要

      在OSS-HDFS服務的Bucket中帶有.<oss-hdfs-endpoint>的字段為可選項。如果您希望省略該字段,請確保已在Flink或Hadoop組件中正確配置了OSS-HDFS服務的Endpoint。

    2. 使用env.execute()執行Flink作業。

(可選)自定義配置

您在提交Flink作業時,可以自定義參數,以開啟或控制特定功能。

例如,通過-yD配置以yarn-cluster模式提交Flink作業時,示例如下:

<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...

您可以開啟熵注入(Entropy Injection)功能。熵注入可以匹配寫入路徑的一段特定字符串,用一段隨機的字符串進行替換,以削弱所謂片區效應,提高寫入效率。

當寫入場景為OSS-HDFS時,需要完成下列配置。

oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>

寫入新文件時,路徑中與<user-defined-key>相同的字符串會被替換為一個隨機字符串,隨機串的長度為<user-defined-length>,且<user-defined-length>必須大于零。