日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

任務類型

1. 背景

? 一個作業(Job)由一組任務(Task)及其依賴關系組成,每個任務可以有一個或多個執行實例(Instance)。具體詳情看名詞解釋。目前的任務類型分為兩種:并發任務和 DAG(Directed Acyclic Graph) 任務。

2. 任務概述

2.1 并發任務

作業中的一個任務可以指定在多個實例上運行程序,這些實例運行的任務程序都是一樣的,但是可以處理不同的數據。

2.2 DAG任務

作業中的多個任務之間可以有 DAG 依賴關系。即前面的任務運行完成后, 后面的任務才開始運行。

3. 任務實現

這兩種任務是在提交的 Job 中指定相關字段實現的,下面以 Python SDK 為例給出實現方式,代碼的完整程序見快速開始。

3.1 并發任務實現

在提交的 Job 中,填寫 InstanceCount 字段。指明任務需要的實例數。該字段就是實現任務的并發功能。

from batchcompute.resources import (
    JobDescription, TaskDescription, DAG
)
# create my_task
 my_task = TaskDescription()
 my_task.InstanceCount = 3 #指定需要實例數:3臺VM

如果并發任務需要處理不同片段的數據,這個時候在需要運行的任務程序中使用環境變量:BATCH_COMPUTE_DAG_INSTANCE_ID(實例 ID)來區分,就可以處理不同片段的數據。下面的示例程序是快速開始的count代碼,假設輸入數據已經放在OSS中。您需要下載OSS的sdk

import oss2 #oss sdk
from conf import conf
import os 
import json

endpoint = os.environ.get('BATCH_COMPUTE_OSS_HOST') #OSS Host
auth = oss2.Auth(conf['access_key_id'], conf['access_key_secret'])

def download_file(oss_path, filename):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.get_object_to_file(key, filename)

def upload_file(filename, oss_path):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.put_object_from_file(key,filename)

def put_data(data, oss_path):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.put_object(key, data)

def parse_oss_path(oss_path):
    s = oss_path[len('oss://'):]
    [bucket, key] = s.split('/',1)
    return (bucket,key)

def main():
    # instance_id: should be start from 0
    instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
    data_path = conf['data_path']
    split_results = 'split_results'
    filename = 'part_%s.txt' %  instance_id
    pre = data_path[0: data_path.rfind('/')]
    print('download form: %s/%s/' % (pre, split_results))

    # 1. download a part
    download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
    # 2. parse, calculate
    with open(filename) as f:
        txt = f.read()
    m = {
        'INFO': 0,
        'WARN': 0,
        'ERROR': 0,
        'DEBUG': 0
    }
    for k in m:
       m[k] = len(re.findall(k, txt))
    print(m)
    # 3. upload result to oss
    upload_to = '%s/count_results/%s.json' % (pre, instance_id )
    print('upload to %s' % upload_to)
    put_data(json.dumps(m), upload_to)

3.2 DAG任務實現

在提交的job中,填寫 Dependencies 字段。指明任務之間的依賴關系。下面的圖中,首先理清各個任務之間的依賴關系,count1 和 count2 是并行的任務,它們依賴 split 任務,merge任務依賴 count1 和 count2。

img

依據上面的依賴關系,在Job中可以這樣描述:

from batchcompute.resources import (
    JobDescription, TaskDescription, DAG, AutoCluster
)

job_desc = JobDescription()
#以下省略task的描述內容
split = TaskDescription()
count1 = TaskDescription() 
count2 = TaskDescription()
merge = TaskDescription()

task_dag = DAG()
task_dag.add_task(task_name="split", task=split)
task_dag.add_task(task_name="count1", task=count1)
task_dag.add_task(task_name="count2", task=count2)
task_dag.add_task(task_name="merge", task=merge)
task_dag.Dependencies = {
  'split': ['count1', 'count2'],
  'count1': ['merge'],
  'count2': ['merge']
}

job_desc.DAG = task_dag

整個作業的任務執行順序是:

  • split 運行完成后,count1 和 count2 同時開始運行,count1 和 count2 都完成后,merge 才開始運行。

  • merge 運行完成,整個作業結束。