本文通過示例為您介紹如何在E-MapReduce on ACK的Flink集群中配置OSS來存儲Flink作業的Checkpoint和Savepoint。

前提條件

已在E-MapReduce on ACK控制臺創建Flink集群,詳情請參見快速入門。

操作步驟

Flink on ACK使用的默認鏡像已處理好讀寫OSS所需的依賴,您只需按文檔配置好相應的參數即可。

  1. 通過kubectl連接Kubernetes集群,詳情請參見通過kubectl工具連接集群。
    您也可以通過API等方式連接Kubernetes集群,詳情請參見使用Kubernetes API。
  2. 新建basic-emr-oss-example.yaml文件,文件內容如下。
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-oss-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: oss://xxxxx
        state.checkpoints.dir: oss://xxxxx
        fs.oss.endpoint: <endpoint, e.g. oss-cn-hangzhou-internal.aliyuncs.com>
        fs.oss.accessKeyId: <accessKeyId>
        fs.oss.accessKeySecret: <accessKeySecret>
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    說明
    • 文件名您可以自定義,本文以basic-emr-oss-example.yaml為例介紹。
    • 本文以Flink 1.13版本為例,如果您使用其他版本請修改flinkVersion的配置。
    以下參數需要您手動替換。
    參數 描述
    state.savepoints.dir Savepoint的保存目錄。
    state.checkpoints.dir Checkpoint的保存目錄。
    fs.oss.endpoint OSS的Endpoint。例如,oss-cn-***-internal.aliyuncs.com。
    fs.oss.accessKeyId OSS的AccessKey ID。
    fs.oss.accessKeySecret OSS的AccessKey Secret。
  3. 執行以下命令,提交作業。
    kubectl apply -f basic-emr-oss-example.yaml

    提交成功后,您可以通過OSS或者Flink Web UI查看Checkpoint的使用與更新。Flink Web UI的訪問方式請參見訪問Flink Web UI。