1. 背景
? 一個作業(Job)由一組任務(Task)及其依賴關系組成,每個任務可以有一個或多個執行實例(Instance)。具體詳情看名詞解釋。目前的任務類型分為兩種:并發任務和 DAG(Directed Acyclic Graph) 任務。
2. 任務概述
2.1 并發任務
作業中的一個任務可以指定在多個實例上運行程序,這些實例運行的任務程序都是一樣的,但是可以處理不同的數據。
2.2 DAG任務
作業中的多個任務之間可以有 DAG 依賴關系。即前面的任務運行完成后, 后面的任務才開始運行。
3. 任務實現
這兩種任務是在提交的 Job 中指定相關字段實現的,下面以 Python SDK 為例給出實現方式,代碼的完整程序見快速開始。
3.1 并發任務實現
在提交的 Job 中,填寫 InstanceCount 字段。指明任務需要的實例數。該字段就是實現任務的并發功能。
from batchcompute.resources import (
JobDescription, TaskDescription, DAG
)
# create my_task
my_task = TaskDescription()
my_task.InstanceCount = 3 #指定需要實例數:3臺VM
如果并發任務需要處理不同片段的數據,這個時候在需要運行的任務程序中使用環境變量:BATCH_COMPUTE_DAG_INSTANCE_ID(實例 ID)來區分,就可以處理不同片段的數據。下面的示例程序是快速開始的count代碼,假設輸入數據已經放在OSS中。您需要下載OSS的sdk。
import oss2 #oss sdk
from conf import conf
import os
import json
endpoint = os.environ.get('BATCH_COMPUTE_OSS_HOST') #OSS Host
auth = oss2.Auth(conf['access_key_id'], conf['access_key_secret'])
def download_file(oss_path, filename):
(bucket, key) = parse_oss_path(oss_path)
bucket_tool = oss2.Bucket(auth, endpoint, bucket)
bucket_tool.get_object_to_file(key, filename)
def upload_file(filename, oss_path):
(bucket, key) = parse_oss_path(oss_path)
bucket_tool = oss2.Bucket(auth, endpoint, bucket)
bucket_tool.put_object_from_file(key,filename)
def put_data(data, oss_path):
(bucket, key) = parse_oss_path(oss_path)
bucket_tool = oss2.Bucket(auth, endpoint, bucket)
bucket_tool.put_object(key, data)
def parse_oss_path(oss_path):
s = oss_path[len('oss://'):]
[bucket, key] = s.split('/',1)
return (bucket,key)
def main():
# instance_id: should be start from 0
instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
data_path = conf['data_path']
split_results = 'split_results'
filename = 'part_%s.txt' % instance_id
pre = data_path[0: data_path.rfind('/')]
print('download form: %s/%s/' % (pre, split_results))
# 1. download a part
download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
# 2. parse, calculate
with open(filename) as f:
txt = f.read()
m = {
'INFO': 0,
'WARN': 0,
'ERROR': 0,
'DEBUG': 0
}
for k in m:
m[k] = len(re.findall(k, txt))
print(m)
# 3. upload result to oss
upload_to = '%s/count_results/%s.json' % (pre, instance_id )
print('upload to %s' % upload_to)
put_data(json.dumps(m), upload_to)
3.2 DAG任務實現
在提交的job中,填寫 Dependencies 字段。指明任務之間的依賴關系。下面的圖中,首先理清各個任務之間的依賴關系,count1 和 count2 是并行的任務,它們依賴 split 任務,merge任務依賴 count1 和 count2。
依據上面的依賴關系,在Job中可以這樣描述:
from batchcompute.resources import (
JobDescription, TaskDescription, DAG, AutoCluster
)
job_desc = JobDescription()
#以下省略task的描述內容
split = TaskDescription()
count1 = TaskDescription()
count2 = TaskDescription()
merge = TaskDescription()
task_dag = DAG()
task_dag.add_task(task_name="split", task=split)
task_dag.add_task(task_name="count1", task=count1)
task_dag.add_task(task_name="count2", task=count2)
task_dag.add_task(task_name="merge", task=merge)
task_dag.Dependencies = {
'split': ['count1', 'count2'],
'count1': ['merge'],
'count2': ['merge']
}
job_desc.DAG = task_dag
整個作業的任務執行順序是:
split 運行完成后,count1 和 count2 同時開始運行,count1 和 count2 都完成后,merge 才開始運行。
merge 運行完成,整個作業結束。