0 背景
AttachCluster作業是批量計算最新推出的作業類型。它結合了固定集群作業和AutoCluster作業的優勢,既能自動管理集群生命周期,彈性伸縮資源,又能使用分布式緩存節省資源。本文的目的在于介紹在阿里云批量計算服務上運行AttachCluster作業。
1 使用限制
- 支持創建集群時自定義系統盤和數據盤大小
不支持作業中自定義系統盤
創建默認集群中定義實例系統盤大小為SystemDiskSize之后,提交到該集群中的所有AttachCluster作業都默認設置為SystemDiskSize。提交job的時候,該字段填0或者不填寫。
不支持作業中自定義數據
創建默認集群中定義實例系統盤大小為SystemDiskSize之后,提交到該集群中的所有AttachCluster作業都默認設置為DataDiskSize。提交job的時候,該字段填0或者不填寫。
不支持APP作業模式, 支持DAG作業模式
作業中填寫的鏡像必須是m-xxx開頭的鏡像, 不是img開頭的鏡像(提交job的時候務必仔細檢查這里!)
2 準備工作
2.1 開通阿里云批量計算服務
要使用批量計算服務,請根據官方文檔里面的指導開通批量計算和其依賴的相關服務,如OSS等。
2.2 升級Python SDK
若您未安裝批量計算Python SDK,請您參照安裝方法安裝該SDK。如果您檢查已經安裝之后,請您參照Python SDK升級方法, 升級批量計算Python SDK至最新版。
3 創建集群
AttachCluster作業首次使用時,需要創建一個集群,創建方法可參考官方文檔 。該集群對配置沒有特殊需求,實例數可設置為0。以下是創建集群的Python源代碼。
import time
import random
import string
import batchcompute
from batchcompute import CN_SHENZHEN as REGION
from batchcompute import Client, ClientError
from batchcompute.resources import (
JobDescription, TaskDescription, DAG,
GroupDescription, ClusterDescription,
Configs, Networks, VPC, Classic, Mounts, Notification, Topic
)
ACCESS_KEY_ID = 'Your Access Key Id'
ACCESS_KEY_SECRET = 'Your Access Key Secret'
IMAGE_ID = 'img-ubuntu'
INSTANCE_TYPE = 'ecs.sn2ne.large'
client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
def create_cluster(idempotent_token=''):
try:
# Cluster description.
cluster_desc = ClusterDescription()
cluster_desc.Name = "test-cluster"
cluster_desc.Description = "demo"
cluster_desc.ImageId = IMAGE_ID
cluster_desc.InstanceType = INSTANCE_TYPE
#Group description
group_desc1 = GroupDescription()
group_desc1.DesiredVMCount = 4
group_desc1.InstanceType = 'ecs.sn1ne.large' #user group special instance type
group_desc1.ResourceType = 'OnDemand'
cluster_desc.add_group('group1', group_desc1)
#cluster_desc.add_group('group2', group_desc2)
#Configs
configs = Configs()
#Configs.Disks
configs.add_system_disk(50, 'cloud_efficiency')
configs.add_data_disk(500, 'cloud_efficiency', '/home/my-data-disk')
#Configs.Networks
networks = Networks()
vpc = VPC()
vpc.CidrBlock = '192.168.0.0/16'
#vpc.VpcId = 'vpc-xxxxx'
networks.VPC = vpc
configs.Networks = networks
cluster_desc.Configs = configs
print cluster_desc
rsp = client.create_cluster(cluster_desc, idempotent_token)
# get cluster id for attach cluster job
return rsp.Id
except ClientError, e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
return ""
if __name__ == '__main__':
#Not Use idempotent token
cluster_id = create_cluster()
print cluster_id
3 創建作業
在創建作業的時候需要步驟2中的集群Id,填入task的AutoCluster的ClusterId字段中。以下是創建作業的Python源代碼。
from batchcompute import Client, ClientError
from batchcompute import CN_SHENZHEN as REGION
from batchcompute.resources import (
ClusterDescription, GroupDescription, Configs, Networks, VPC,
JobDescription, TaskDescription, DAG,Mounts,
AutoCluster,Disks,Notification,
)
access_key_id = "" # your access key id
access_key_secret = "" # your access key secret
image_id = "m-8vbd8lo9xxxx" # the id of a image created before,鏡像需要確保已經注冊給批量計算,且必須是m-xx開頭的鏡像,不是img開頭的鏡像
instance_type = "ecs.sn1.medium" # instance type
inputOssPath = "oss://xxx/input/" # your input oss path
outputOssPath = "oss://xxx/output/" #your output oss path
stdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss path
stderrOssPath = "oss://xxx/log/stderr/" #your stderr oss path
def getAutoClusterDesc():
auto_desc = AutoCluster()
# attach cluster這里里填入上一步創建的集群Id
auto_desc.ClusterId = cls-xxxxx
auto_desc.ImageId = image_id
auto_desc.ReserveOnFail = False
# 實例規格
auto_desc.InstanceType = instance_type
#case1 設置上限價格的競價實例;
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotWithPriceLimit"
# auto_desc.SpotPriceLimit = 0.5
#case2 系統自動出價,最高按量付費價格
# auto_desc.ResourceType = "Spot"
# auto_desc.SpotStrategy = "SpotAsPriceGo"
#case3 按量
auto_desc.ResourceType = "OnDemand"
#Configs
configs = Configs()
#Configs.Networks
networks = Networks()
vpc = VPC()
#case1 只給CidrBlock
vpc.CidrBlock = '192.168.0.0/16'
#case2 CidrBlock和VpcId 都傳入,必須保證VpcId的CidrBlock 和傳入的CidrBlock保持一致
# vpc.CidrBlock = '172.26.0.0/16'
# vpc.VpcId = "vpc-8vbfxdyhxxxx"
networks.VPC = vpc
configs.Networks = networks
# 不支持設置系統盤
#configs.add_system_disk(size=0, type_='cloud_efficiency')
#不支持設置數據盤
# case1 linux環境
# configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='/path/to/mount/')
# case2 windows環境
# configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='E:')
# 設置節點個數
configs.InstanceCount = 1
auto_desc.Configs = configs
return auto_desc
def getDagJobDesc(clusterId = None):
job_desc = JobDescription()
dag_desc = DAG()
mounts_desc = Mounts()
job_desc.Name = "testBatchSdkJob"
job_desc.Description = "test job"
job_desc.Priority = 1
# 訂閱job完成或者失敗事件
noti_desc = Notification()
noti_desc.Topic['Name'] = "test-topic"
noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"
noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]
# job_desc.Notification = noti_desc
job_desc.JobFailOnInstanceFail = False
# 作業運行成功后戶自動會被立即釋放掉
job_desc.AutoRelease = False
job_desc.Type = "DAG"
echo_task = TaskDescription()
# echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",
# "oss://xxx/test/file": "/home/test/test/file"}
echo_task.InputMapping = {inputOssPath: "/home/test/input/"}
echo_task.OutputMapping = {"/home/test/output/":outputOssPath}
#觸發程序運行的命令行
#case1 執行linux命令行
echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"
#case2 執行Windows CMD.exe
# echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"
#case3 輸入可執行文件
# PackagePath存放commandLine中的可執行文件或者二進制包
# echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"
# echo_task.Parameters.Command.CommandLine = "sh test.sh"
# 設置程序運行過程中相關環境變量信息
echo_task.Parameters.Command.EnvVars["key1"] = "value1"
echo_task.Parameters.Command.EnvVars["key2"] = "value2"
# 設置程序的標準輸出地址,程序中的print打印會實時上傳到指定的oss地址
echo_task.Parameters.StdoutRedirectPath = stdoutOssPath
# 設置程序的標準錯誤輸出地址,程序拋出的異常錯誤會實時上傳到指定的oss地址
echo_task.Parameters.StderrRedirectPath = stderrOssPath
# 設置任務的超時時間
echo_task.Timeout = 600
# 設置任務所需實例個數
# 環境變量BATCH_COMPUTE_INSTANCE_ID為0到InstanceCount-1
# 在執行程序中訪問BATCH_COMPUTE_INSTANCE_ID,實現數據訪問的切片實現單任務并發執行
echo_task.InstanceCount = 1
# 設置任務失敗后重試次數
echo_task.MaxRetryCount = 0
# NAS數據掛載
#采用NAS時必須保證網絡和NAS在同一個VPC內
nasMountEntry = {
"Source": "nas://xxxx.nas.aliyuncs.com:/",
"Destination": "/home/mnt/",
"WriteSupport":True,
}
mounts_desc.add_entry(nasMountEntry)
mounts_desc.Locale = "utf-8"
mounts_desc.Lock = False
# echo_task.Mounts = mounts_desc
# attach cluster作業該集群字段設置為空
echo_task.ClusterId = ""
echo_task.AutoCluster = getAutoClusterDesc()
# 添加任務
dag_desc.add_task('echoTask', echo_task)
# 可以設置多個task,每個task可以根據需求進行設置各項參數
# dag_desc.add_task('echoTask2', echo_task)
# Dependencies設置多個task之間的依賴關系,echoTask2依賴echoTask;echoTask3依賴echoTask2
# dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}
job_desc.DAG = dag_desc
return job_desc
if __name__ == "__main__":
client = Client(REGION, access_key_id, access_key_secret)
try:
job_desc = getDagJobDesc()
job_id = client.create_job(job_desc).Id
print('job created: %s' % job_id)
except ClientError,e:
print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
AttachCluster作業創建已經完成。