任務(wù)實(shí)例
Task是MaxCompute的基本計(jì)算單元,例如SQL Task。本文為您介紹PyODPS中任務(wù)實(shí)例的基本操作。
基本操作
Task在執(zhí)行時(shí)會(huì)被實(shí)例化, 以MaxCompute實(shí)例(下文簡(jiǎn)稱為實(shí)例或Instance)的形式存在。實(shí)例常見(jiàn)的操作如下:
list_instances()
:獲取項(xiàng)目空間下的所有的Instance。exist_instance()
:判斷某實(shí)例是否存在。get_instance()
:獲取實(shí)例。stop_instance()
:停止實(shí)例。僅支持對(duì)運(yùn)行中的實(shí)例進(jìn)行停止實(shí)例操作,否則會(huì)報(bào)錯(cuò)。說(shuō)明通過(guò)Instance對(duì)象調(diào)用
stop
方法也可以用于停止Instance,如o.get_instance('instance id').stop
。
示例
獲取Instancede的ID
for instance in o.list_instances(): print(instance.id)
判斷Instance是否存在
print(o.exist_instance('my_instance_id'))
獲取Logview地址
對(duì)于SQL等任務(wù),通過(guò)Instance對(duì)象調(diào)用
get_logview_address
方法即可獲取Logview地址。#從已有的instance對(duì)象獲取Logview地址。 instance = o.run_sql('desc pyodps_iris') print(instance.get_logview_address()) #從instance id獲取Logview地址。 instance = o.get_instance('my_instance_id') print(instance.get_logview_address())
對(duì)于PAI類型任務(wù),需要先枚舉其子任務(wù),再獲取子任務(wù)的Logview。
instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'}) for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items(): print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
任務(wù)實(shí)例狀態(tài)
一個(gè)Instance的狀態(tài)可以是Running
、Suspended
或者 Terminated
。關(guān)于任務(wù)實(shí)例狀態(tài)的方法有以下幾種:
status
:屬性來(lái)獲取狀態(tài)。is_terminated
:返回當(dāng)前Instance是否已經(jīng)執(zhí)行完畢的結(jié)果。is_successful
:返回當(dāng)前Instance是否正確地執(zhí)行完畢的結(jié)果。任務(wù)處于運(yùn)行中或者執(zhí)行失敗都會(huì)返回False。
代碼示例如下。
獲取Instance狀態(tài)。
instance=o.get_instance('my_instance_id') print(instance.status) print(instance.status.value)
返回值示例:
Status.TERMINATED Terminated
判斷當(dāng)前示例是否已經(jīng)執(zhí)行完畢。
instance=o.get_instance('my_instance_id') from odps.models import Instance print(instance.status == Instance.Status.TERMINATED)
返回值為
True
表示已執(zhí)行完畢。
調(diào)用wait_for_completion
方法會(huì)阻塞直到Instance執(zhí)行完成。wait_for_success
方法同樣會(huì)阻塞,但如果最終任務(wù)執(zhí)行失敗,會(huì)拋出相關(guān)異常。
子任務(wù)操作
一個(gè)Instance在運(yùn)行時(shí),可能包含一個(gè)或者多個(gè)子任務(wù),這些子任務(wù)稱為Task(注意:這里的Task不同于MaxCompute的計(jì)算單元)。關(guān)于Task的方法有以下幾種:
get_task_names
:通過(guò)此方法獲取所有的Task任務(wù)。該方法返回一個(gè)所有子任務(wù)的名稱列表。instance=o.get_instance('my_instance_id') instance.get_task_names()
返回值示例:
['jdbc_sql_task']
get_task_result
:指定Task名稱,獲取該Task的執(zhí)行結(jié)果。該方法是以字典的形式返回每個(gè)Task的執(zhí)行結(jié)果。 代碼示例如下:獲取子任務(wù)名稱。
instance=o.execute_sql('select*frompyodps_irislimit1') print(instance.get_task_names())
返回值示例:
['AnonymousSQLTask']
獲取子任務(wù)運(yùn)行結(jié)果。
print(instance.get_task_result('AnonymousSQLTask'))
返回值示例:
"sepallength","sepalwidth","petallength","petalwidth","name" 4.9,3.0,1.4,0.2,"Iris-setosa"
獲取子任務(wù)結(jié)果。
print(instance.get_task_results())
返回值示例:
OrderedDict([('AnonymousSQLTask', '"sepallength","sepalwidth","petallength","petalwidth","name"\n4.9,3.0,1.4,0.2,"Iris-setosa"\n')])
get_task_progress
:該方法獲取在任務(wù)實(shí)例運(yùn)行時(shí)Task當(dāng)前的運(yùn)行進(jìn)度。instance=o.get_instance('20160519101349613gzbzufck2') while not instance.is_terminated(): for task_name in instance.get_task_names(): print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string()) time.sleep(10)
返回值示例:
20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]