通過(guò)向OSS上傳文件觸發(fā)工作流
本文介紹如何集成阿里云對(duì)象存儲(chǔ)OSS與阿里云輕量消息隊(duì)列(原 MNS),通過(guò)將數(shù)據(jù)上傳文件至對(duì)象存儲(chǔ)OSS中,自動(dòng)觸發(fā)工作流運(yùn)行文件,并生成結(jié)果。
前提條件
已開(kāi)通以下服務(wù)與功能。
開(kāi)啟事件驅(qū)動(dòng)功能。具體操作,請(qǐng)參見(jiàn)開(kāi)啟事件驅(qū)動(dòng)功能。
開(kāi)通輕量消息隊(duì)列(原 MNS)。該功能會(huì)涉及輕量消息隊(duì)列(原 MNS)相關(guān)計(jì)費(fèi),具體收費(fèi)情況,請(qǐng)參見(jiàn)計(jì)費(fèi)說(shuō)明。
開(kāi)通對(duì)象存儲(chǔ)OSS。該功能會(huì)涉及OSS相關(guān)計(jì)費(fèi),具體收費(fèi)情況,請(qǐng)參見(jiàn)計(jì)費(fèi)概述。
已創(chuàng)建工作流集群并下載阿里云Argo CLI。
創(chuàng)建工作流集群,請(qǐng)參見(jiàn)創(chuàng)建工作流集群。
下載并安裝阿里云Argo CLI,請(qǐng)參見(jiàn)阿里云Argo CLI。
創(chuàng)建資源入口:連接工作流集群后,在本地終端操作。具體操作,請(qǐng)參見(jiàn)獲取集群KubeConfig并通過(guò)kubectl工具連接集群。
步驟一:配置OSS Bucket事件通知
單擊Bucket 列表,創(chuàng)建OSS Bucket或者選擇已有Bucket。如需創(chuàng)建,請(qǐng)參見(jiàn)創(chuàng)建存儲(chǔ)空間。
在文件管理頁(yè)面,選擇數(shù)據(jù)處理 > 事件通知,單擊創(chuàng)建規(guī)則,完成相關(guān)參數(shù)配置后,單擊確定。如需自定義規(guī)則,請(qǐng)參見(jiàn)通過(guò)事件通知實(shí)時(shí)處理OSS文件變動(dòng)。
配置項(xiàng)
示例
規(guī)則名稱
upload-complete
事件類型
PutObject, PostObject
資源描述
選擇前后綴,并設(shè)置后綴為
complete
,即上傳以.complete
結(jié)尾的文件,觸發(fā)事件后運(yùn)行工作流。接收終端
選擇隊(duì)列,名稱設(shè)置為
oss-event-queue
。規(guī)則創(chuàng)建后,在輕量消息隊(duì)列(原 MNS)中會(huì)自動(dòng)創(chuàng)建一個(gè)主題與之對(duì)應(yīng)。
單擊隊(duì)列列表,創(chuàng)建隊(duì)列oss-event-queue,創(chuàng)建方法,請(qǐng)參見(jiàn)創(chuàng)建隊(duì)列。創(chuàng)建完成后,在隊(duì)列詳情頁(yè)面的接入點(diǎn)區(qū)域獲取Endpoint。
重要創(chuàng)建隊(duì)列的名稱需要與步驟3中配置的接收終端的隊(duì)列名稱保持一致。
步驟二:創(chuàng)建Event Bus
Event Bus可以被命名空間中的事件驅(qū)動(dòng)工作流共享。如果已經(jīng)創(chuàng)建,請(qǐng)執(zhí)行步驟三:創(chuàng)建Event Source。
方式一:使用NATS
創(chuàng)建
event-bus.yaml
文件。Event Bus示例代碼如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: nats: native: replicas: 3 auth: token
執(zhí)行以下命令,創(chuàng)建EventBus。
kubectl apply -f event-bus.yaml
說(shuō)明命令執(zhí)行成功后,會(huì)在default命名空間下創(chuàng)建Event Bus Pod。后續(xù)操作需在同一命名空間下。
執(zhí)行以下命令,查看Event Bus Pod是否正常啟動(dòng)。
kubectl get pod
方式二:使用輕量消息隊(duì)列(原 MNS)
在主題列表頁(yè)面創(chuàng)建主題argoeventbus,并在主題詳情頁(yè)面的接入點(diǎn)區(qū)域獲取Endpoint。
使用RAM管理員登錄RAM控制臺(tái)。
創(chuàng)建RAM用戶,授權(quán)
AliyunMNSFullAccess
,并獲取RAM用戶的AK和SK。執(zhí)行以下命令,創(chuàng)建Secret用于存儲(chǔ)AK和SK。
kubectl create secret generic mns-secret\ --from-literal=accesskey=*** \ --from-literal=secretkey=***
創(chuàng)建
event-bus-mns.yaml
文件,EventBus示例代碼如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventBus metadata: name: default spec: alimns: accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret topic: argoeventbus # 對(duì)應(yīng)輕量消息隊(duì)列(原 MNS)中的主題名稱。 endpoint: http://165***368.mns.<region>.aliyuncs.com
執(zhí)行以下命令,創(chuàng)建
event-bus-mns.yaml
。kubectl apply -f event-bus.yaml
使用輕量消息隊(duì)列(原 MNS)方式創(chuàng)建Event Bus時(shí),不會(huì)創(chuàng)建Pod。
如需使用Trigger功能,請(qǐng)使用NATS方式創(chuàng)建EventBus。目前輕量消息隊(duì)列(原 MNS)方式不支持開(kāi)源Argo Event的Sensor Trigger。
步驟三:創(chuàng)建Event Source
使用RAM管理員登錄RAM控制臺(tái)。
創(chuàng)建RAM用戶,為其授予
AliyunMNSFullAccess
權(quán)限,并獲取RAM用戶的AK和SK。具體操作,請(qǐng)參見(jiàn)創(chuàng)建RAM用戶、為RAM用戶授權(quán)、創(chuàng)建AccessKey和查看RAM用戶的AccessKey信息。執(zhí)行以下命令,創(chuàng)建Secret用于存儲(chǔ)AK和SK。
kubectl create secret generic mns-secret\ --from-literal=accesskey=*** \ --from-literal=secretkey=***
創(chuàng)建
event-source.yaml
文件,Event Source示例代碼如下所示:apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: name: ali-mns spec: mns: example: jsonBody: true accessKey: key: accesskey name: mns-secret secretKey: key: secretkey name: mns-secret queue: oss-event-queue # 步驟一中創(chuàng)建的輕量消息隊(duì)列(原 MNS)名稱。 waitTimeSeconds: 20 endpoint: http://165***368.mns.<region>.aliyuncs.com # 步驟一中創(chuàng)建的輕量消息隊(duì)列(原 MNS)接入點(diǎn)。
執(zhí)行以下命令,創(chuàng)建Event Source。
kubectl apply -f event-source.yaml
執(zhí)行以下命令,查看Event Source Pod是否正常啟動(dòng)。
kubectl get pod
步驟四:創(chuàng)建Event Sensor
創(chuàng)建
event-sensor.yaml
文件,在Event Sensor中嵌入待執(zhí)行的工作流定義。Event Sensor示例代碼如下所示:apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: name: process-oss-file spec: template: serviceAccountName: default dependencies: - name: dep1 eventSourceName: ali-mns eventName: example triggers: - template: name: process-oss-file-workflow k8s: operation: create source: resource: apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: process-oss-file- namespaces: default spec: entrypoint: process-oss-file volumes: - name: workdir persistentVolumeClaim: claimName: pvc-oss arguments: parameters: - name: message # this is the value that should be overridden value: event message templates: - name: process-oss-file steps: - - name: parse-event-body template: parse-event-body - - name: process-file template: process-file arguments: parameters: - name: file-name value: "{{steps.parse-event-body.outputs.parameters.file-name}}" - name: parse-event-body container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine command: [sh,-c] args: - echo "Event body:"; echo {{workflow.parameters.message}} | base64 -d; TriggerFileName=$(echo {{workflow.parameters.message}} | base64 -d | jq .events[0].oss.object.key | cut -c2- | rev | cut -c2- |rev); echo "" && echo "TriggerFileName from event is $TriggerFileName"; Tmp=${TriggerFileName%%.complete} && DataFileName=${Tmp##*/}; echo "DataFileName after cutting .complete is $DataFileName, and pass file name to next step"; echo $DataFileName > /tmp/file-name.txt outputs: parameters: - name: file-name valueFrom: path: /tmp/file-name.txt - name: process-file inputs: parameters: - name: file-name container: image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/jq-alpine imagePullPolicy: Always command: [sh,-c] args: - echo "Show data-file:" && echo ""; ls -l /mnt/vol/{{inputs.parameters.file-name}}; echo "Content of data file:" && echo ""; cat /mnt/vol/{{inputs.parameters.file-name}} ; echo "" && echo "Finished" ; volumeMounts: - name: workdir mountPath: /mnt/vol parameters: - src: dependencyName: dep1 dataKey: body dest: spec.arguments.parameters.0.value
執(zhí)行以下命令,創(chuàng)建Event Sensor。
kubectl apply -f event-sensor.yaml
執(zhí)行以下命令,查看Event Sensor Pod是否正常啟動(dòng)。
kubectl get pod
使用輕量消息隊(duì)列(原 MNS)方式創(chuàng)建Eventbus時(shí),在Event Sensor創(chuàng)建完成后,會(huì)自動(dòng)創(chuàng)建一個(gè)輕量消息隊(duì)列(原 MNS)與之對(duì)應(yīng),隊(duì)列命名格式為:ackone-argowf-<namespace>-<sensor-name>-<sensor-uid>。
步驟五:驗(yàn)證向OSS上傳文件觸發(fā)工作流
向步驟一:配置OSS Bucket事件通知中的OSS Bucket中上傳以下2個(gè)文件(該文件需自備),觸發(fā)工作流運(yùn)行。
datafile:數(shù)據(jù)文件,文本格式,內(nèi)容自定義。
datafile.complete:trigger文件,可以是空文件。
執(zhí)行以下命令,在工作流集群中查看工作流運(yùn)行情況。
argo list
預(yù)期輸出如下:
NAME STATUS AGE DURATION PRIORITY process-oss-file-kmb4k Running 13s 13s 0
執(zhí)行以下命令,獲取工作流日志,查看消息內(nèi)容。
argo logs process-oss-file-kmb4k
重要該命令中的工作流名稱必須和上一步驟中返回的工作流名稱一致,
ali-mns-workflow-5prz7
僅為示例值,請(qǐng)您修改為實(shí)際環(huán)境中的返回值。消息內(nèi)容使用Base64編碼。
預(yù)期輸出如下:
步驟六:清除Event相關(guān)資源
依次執(zhí)行以下命令,清除Event相關(guān)資源。
kubectl delete sensor process-oss-file kubectl delete eventsource ali-mns kubectl delete eventbus default
執(zhí)行以下命令查看Pod,確認(rèn)所有資源已清除。
kubectl get pod