Designer提供自定義Python腳本的功能,您可以使用Python腳本組件自定義安裝依賴包及運行自定義的Python函數。本文為您介紹Python腳本組件的配置方法及使用示例。
背景信息
Python腳本組件位于Designer組件的自定義算法組件文件夾下。
前提條件
已完成DLC相關權限授權,授權方法詳情請參見云產品依賴與授權:DLC。
由于Python腳本需要依賴于DLC作為底層計算資源,因此您需要在工作空間關聯DLC計算資源,詳情請參見管理工作空間。
由于Python腳本依賴OSS作為代碼存儲環境,因此您需要先創建OSS Bucket,詳情請參見創建存儲空間。
重要創建的OSS Bucket必須與Designer和DLC在同一地域。
已在工作空間中,為使用該組件的RAM賬號添加算法開發角色,詳情請參見管理工作空間成員。如果操作賬號還需同時使用MaxCompute作為數據源,還需要同時添加MaxCompute開發角色。
可視化配置組件
輸入樁
Python腳本組件共有4個輸入端口,均可以連接OSS路徑或MaxCompute表類型的數據。
OSS路徑輸入
來自上游組件的OSS輸入,會被掛載到Python腳本執行的節點上,系統會將掛載后的文件路徑,以arguments的形式,傳遞給Python腳本,無需手工配置。arguments的規范如
python main.py --input1 /ml/input/data/input1
,代表第一個輸入端口輸入的OSS路徑。在Python腳本中可以按照讀本地文件的方式訪問/ml/input/data/input1
來讀取掛載后的文件。MaxCompute表輸入
MaxCompute表的輸入不支持掛載,系統會將對應的表信息以URI的形式,作為arguments傳遞給Python腳本,無需手工配置。arguments的規范如
python main.py --input1 odps://some-project-name/tables/table
,代表第一個輸入端口輸入的MaxCompute表。對于MaxCompute URI形式的輸入,您可以使用該組件代碼模板內的parse_odps_url函數解析出對應的ProjectName、TableName和Partition等元信息,詳情請參見使用示例。
輸出樁
Python腳本組件共有4個輸出端口,其中輸出端口1和輸出端口2是OSS路徑輸出端口,輸出端口3和輸出端口4是MaxCompute表輸出端口。
OSS路徑輸出
該組件的腳本設置頁簽的任務輸出路徑參數配置的OSS路徑,會被系統自動掛載到
/ml/output/
下。該組件的輸出端口OSS輸出-1和OSS輸出-2,分別對應子目錄/ml/output/output1
和/ml/output/output2
。在腳本中可以按照寫本地文件的方式將需要傳遞給下游節點的文件寫到這兩個目錄中。MaxCompute表輸出
如果當前工作空間配置了MaxCompute項目,系統會自動傳遞一個臨時表URI到Python腳本,例如:
python main.py --output3 odps://<some-project-name>/tables/<output-table-name>
,您可以通過PyODPS來創建臨時表URI中指定的表,并將Python腳本處理完成的數據寫出到這個表,最后通過組件連線將表傳遞給下游組件,詳情可參考下文中的示例。
組件參數
腳本設置
參數
描述
任務輸出路徑
選擇任務輸出的OSS路徑。
配置好的OSS目錄會掛載到作業容器的
/ml/output/
路徑下,任務寫出到/ml/output/
路徑下的數據,會被持久化保存到對應的OSS目錄。組件的輸出端口OSS輸出-1和OSS輸出-2分別對應
/ml/output/
路徑下的子路徑output1和output2。當組件的OSS輸出端口接入下游組件時,下游組件接收到的數據為對應子路徑的數據。
設置代碼源
(任選一種即可)
編輯框提交
Python代碼:選擇代碼保存的OSS路徑,編輯框中寫入的代碼會保存在該OSS路徑下。Python代碼的文件名稱默認為main.py。
重要第一次單擊保存前,請確認指定的代碼保存路徑下無同名文件,避免文件被覆蓋。
Python代碼編輯器:編輯框內默認提供示例代碼,詳情請參見使用示例。您可以直接在編輯器內編寫代碼。
指定Git配置
Git地址:Git倉庫地址。
代碼分支:代碼分支,默認為master。
代碼Commit:Commit的優先級大于Branch,如果您填寫了該參數,則Branch不生效。
Git用戶名:如果您需要訪問私有代碼集,則需要指定該參數。
Git訪問Token:訪問私有代碼倉庫時必填。更多信息,請參見附錄:獲取GitHub賬號的Token。
選擇代碼配置
選擇代碼配置:選擇已創建的代碼配置。具體操作,請參見代碼配置。
代碼分支:代碼分支,默認為master。
代碼Commit:Commit的優先級大于Branch,如果您填寫了該參數,則Branch不生效。
從OSS中選擇文件或目錄
在OSS路徑選擇對應代碼上傳的路徑。
執行命令
在文本框中,輸入您需要執行的命令,比如:
python main.py
。說明系統會自動按照腳本名稱和組件輸入輸出端口的連接情況來生成執行命令,無需手動配置。
高級選項
第三方依賴庫:在文本框中,您可以通過添加腳本的方式安裝第三方依賴庫,格式與Python的requirement.txt相同,具體如下所示。節點執行前,會自動安裝文本框中配置的第三方依賴庫。
cycler==0.10.0 # via matplotlib kiwisolver==1.2.0 # via matplotlib matplotlib==3.2.1 numpy==1.18.5 pandas==1.0.4 pyparsing==2.4.7 # via matplotlib python-dateutil==2.8.1 # via matplotlib, pandas pytz==2020.1 # via pandas scipy==1.4.1 # via seaborn
是否開啟容錯監控:勾選該參數后,會出現容錯監控配置文本框,您可以在文本框中通過添加容錯監控具體參數,指定容錯監控的內容。參數詳情請參見AIMaster:彈性自動容錯引擎。
執行配置
參數
描述
選擇資源組
支持選擇DLC公共資源組或DLC專屬資源組:
當選擇公共資源組時,您需要配置選擇機器實例類型參數,支持配置CPU或GPU機器實例。默認為:ecs.c6.large。
當選擇專屬資源組時,您需要配置任務所需的CPU(核數)、內存(GB)、共享內存(GB)和GPU(卡數)。
默認為當前工作空間下的DLC云原生資源的默認資源組。
專有網絡配置
支持選擇已創建的專有網絡進行掛載。
安全組
支持選擇已創建的安全組進行掛載。
高級選項
選中該參數后,支持配置以下參數:
選擇機器實例個數:您可以按照實際需要配置機器實例個數,默認為1。
選擇作業鏡像:默認為開源的xgboost1.6.0版本,如果您需要使用深度學習框架,則需要修改鏡像。
選擇任務類型:僅當提交的代碼按照分布式實現時,才需要修改該參數,支持以下取值:
XGBoost/LightGBM Job
TensorFlow Job
PyTorch Job
MPI Job
使用示例
默認示例代碼解析
Python腳本組件默認提供的示例代碼如下。
import os
import argparse
import json
"""
Python V2 組件示例代碼
"""
# 當前工作空間下的默認MaxCompute執行環境,包含MaxCompute項目的名稱以及Endpoint。
# 需要當前的工作空間下有MaxCompute項目時,作業的執行環境才會注入。
# 示例: {"endpoint": "http://service.cn.maxcompute.aliyun-inc.com/api", "odpsProject": "lq_test_mc_project"}。
ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION"
def init_odps():
from odps import ODPS
# 當前工作空間的默認MaxCompute項目信息。
mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION])
o = ODPS(
access_id="<YourAccessKeyId>",
secret_access_key="<YourAccessKeySecret>",
# 請根據Project所在的Region選擇,比如:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api。
endpoint=mc_execution["endpoint"],
project=mc_execution["odpsProject"],
)
return o
def parse_odps_url(table_uri):
from urllib import parse
parsed = parse.urlparse(table_uri)
project_name = parsed.hostname
r = parsed.path.split("/", 2)
table_name = r[2]
if len(r) > 3:
partition = r[3]
else:
partition = None
return project_name, table_name, partition
def parse_args():
parser = argparse.ArgumentParser(description="PythonV2 component script example.")
parser.add_argument("--input1", type=str, default=None, help="Component input port 1.")
parser.add_argument("--input2", type=str, default=None, help="Component input port 2.")
parser.add_argument("--input3", type=str, default=None, help="Component input port 3.")
parser.add_argument("--input4", type=str, default=None, help="Component input port 4.")
parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.")
parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.")
parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.")
parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.")
args, _ = parser.parse_known_args()
return args
def write_table_example(args):
# 示例:通過執行SQL語句復制PAI提供的公共表數據,作為當前組件輸出端口3指定的臨時表。
output_table_uri = args.output3
o = init_odps()
project_name, table_name, partition = parse_odps_url(output_table_uri)
o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;")
def write_output1(args):
# 示例:將數據結果寫入掛載的OSS路徑(輸出端口1的子目錄),對應的結果可以通過連線傳遞到下游組件。
output_path = args.output1
os.makedirs(output_path, exist_ok=True)
p = os.path.join(output_path, "result.text")
with open(p, "w") as f:
f.write("TestAccuracy=0.88")
if __name__ == "__main__":
args = parse_args()
print("Input1={}".format(args.input1))
print("Output1={}".format(args.output1))
# write_table_example(args)
# write_output1(args)
常用函數說明:
init_odps():初始化一個ODPS實例,用來讀取MaxCompute表數據,需要填寫您的AccessKeyId和AccessKeySecret,關于如何獲取AccessKey,詳情請參見獲取AccessKey。
更多關于操作MaxCompute表的API,詳情請參見PyODPS。
parse_odps_url(table_uri):解析輸入的MaxCompute表的URI,返回解析完成得到的項目名稱、表名和分區。table_uri格式為:
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/
,比如:odps://test/tables/iris/pa=1/pb=1
,其中pa=1/pb=1為一個多級分區。parse_args():解析傳入腳本的arguments,輸入輸出數據會以arguments的方式傳遞給執行的腳本。
使用示例1:Python腳本組件與其他組件串聯使用
參考并修改心臟病預測案例模板,來說明Python腳本組件如何與Designer其他組件串聯使用。工作流配置說明:
創建心臟病預測案例模板并進入工作流,具體操作請參見心臟病預測。
將Python腳本組件拖入畫布并重命名為SMOTE,并配置以下代碼。
重要在我們使用的鏡像中沒有imblearn庫,需要在該組件的腳本設置頁簽的第三方依賴庫中配置imblearn。在節點執行前,會自動安裝該庫。
import argparse import json import os from odps.df import DataFrame from imblearn.over_sampling import SMOTE from urllib import parse from odps import ODPS ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): # 當前工作空間的默認MaxCompute項目信息。 mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<替換成您自己的AccessKey>", secret_access_key="<替換成您自己的AccessKeySecret>", # 請根據Project所在的Region選擇,比如:http://service.cn-shanghai.maxcompute.aliyun-inc.com/api。 endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def get_max_compute_table(table_uri, odps): parsed = parse.urlparse(table_uri) project_name = parsed.hostname table_name = parsed.path.split('/')[2] table = odps.get_table(project_name + "." + table_name) return table def run(): parser = argparse.ArgumentParser(description='PythonV2 component script example.') parser.add_argument( '--input1', type=str, default=None, help='Component input port 1.' ) parser.add_argument( '--output3', type=str, default=None, help='Component input port 1.' ) args, _ = parser.parse_known_args() print('Input1={}'.format(args.input1)) print('output3={}'.format(args.output3)) o = init_odps() imbalanced_table = get_max_compute_table(args.input1, o) df = DataFrame(imbalanced_table).to_pandas() sm = SMOTE(random_state=2) X_train_res, y_train_res = sm.fit_resample(df, df['ifhealth'].ravel()) new_table = o.create_table(get_max_compute_table(args.output3, o).name, imbalanced_table.schema, if_not_exists=True) with new_table.open_writer() as writer: writer.write(X_train_res.values.tolist()) if __name__ == '__main__': run()
其中access_id和secret_access_key需要配置您自己的AccessKey和AccessKeySecret。關于如何獲取AccessKey,詳情請參見獲取AccessKey。
將SMOTE組件接入拆分組件的下游,使用經典的SMOTE算法對拆分完得到的訓練數據做過采樣,對訓練集里樣本數量較少的類別進行過采樣,合成新的樣本來緩解類不平衡。
將SMOTE組件得到的新數據接入邏輯回歸二分類組件做訓練。
將訓練得到的模型與左側分支中的模型一樣,連接相同的預測數據和評估組件做橫向對比。組件運行成功后,單擊進入可視化頁面,查看最終評估結果。
額外做過采樣并未對模型效果有特別明顯的提升,說明原樣本分布及模型效果都比較好。
使用示例2:使用Designer做純DLC任務的編排
您可以在Designer中連接多個自定義Python腳本組件,來實現一組DLC任務的Pipeline編排和定時調度。以下圖為例,按照Directed Acyclic Graph(DAG)圖順序啟動4個DLC任務。
如果DLC的執行代碼不需要讀取上游節點數據,也不需要給下游節點傳遞數據,則節點之間的連線只表示調度執行的依賴關系和先后順序。
后續您可以將使用Designer開發完成的整個工作流,一鍵部署到DataWorks做定時調度,具體操作請參見使用DataWorks離線調度Designer工作流。
使用示例3:將全局變量傳入Python腳本組件
配置全局變量。
在Designer工作流頁面單擊空白畫布,在右側全局變量頁簽配置全局變量。
使用以下兩種方式將已配置的全局變量傳入Python腳本組件,您可以任意選擇一種方式。
單擊Python腳本組件節點,在右側腳本設置頁簽選中高級選項,在執行命令中配置傳入參數為全局變量。
修改Python代碼,使用argparser解析參數。
更新后的Python代碼如下,該代碼以步驟1配置的全局變量為例,您需要根據實際配置的全局變量更新代碼。后續您可以直接將更新后的代碼替換到Python腳本組件節點腳本設置頁簽的代碼編輯區域。
import os import argparse import json """ Python V2 組件示例代碼 """ ENV_JOB_MAX_COMPUTE_EXECUTION = "JOB_MAX_COMPUTE_EXECUTION" def init_odps(): from odps import ODPS mc_execution = json.loads(os.environ[ENV_JOB_MAX_COMPUTE_EXECUTION]) o = ODPS( access_id="<YourAccessKeyId>", secret_access_key="<YourAccessKeySecret>", endpoint=mc_execution["endpoint"], project=mc_execution["odpsProject"], ) return o def parse_odps_url(table_uri): from urllib import parse parsed = parse.urlparse(table_uri) project_name = parsed.hostname r = parsed.path.split("/", 2) table_name = r[2] if len(r) > 3: partition = r[3] else: partition = None return project_name, table_name, partition def parse_args(): parser = argparse.ArgumentParser(description="PythonV2 component script example.") parser.add_argument("--input1", type=str, default=None, help="Component input port 1.") parser.add_argument("--input2", type=str, default=None, help="Component input port 2.") parser.add_argument("--input3", type=str, default=None, help="Component input port 3.") parser.add_argument("--input4", type=str, default=None, help="Component input port 4.") parser.add_argument("--output1", type=str, default=None, help="Output OSS port 1.") parser.add_argument("--output2", type=str, default=None, help="Output OSS port 2.") parser.add_argument("--output3", type=str, default=None, help="Output MaxComputeTable 1.") parser.add_argument("--output4", type=str, default=None, help="Output MaxComputeTable 2.") # 根據已配置的全局變量,新增代碼。 parser.add_argument("--arg1", type=str, default=None, help="Argument 1.") parser.add_argument("--arg2", type=int, default=None, help="Argument 2.") args, _ = parser.parse_known_args() return args def write_table_example(args): output_table_uri = args.output3 o = init_odps() project_name, table_name, partition = parse_odps_url(output_table_uri) o.run_sql(f"create table {project_name}.{table_name} as select * from pai_online_project.heart_disease_prediction;") def write_output1(args): output_path = args.output1 os.makedirs(output_path, exist_ok=True) p = os.path.join(output_path, "result.text") with open(p, "w") as f: f.write("TestAccuracy=0.88") if __name__ == "__main__": args = parse_args() print("Input1={}".format(args.input1)) print("Output1={}".format(args.output1)) # 根據已配置的全局變量,新增代碼。 print("Argument1={}".format(args.arg1)) print("Argument2={}".format(args.arg2)) # write_table_example(args) # write_output1(args)