MaxCompute不提供Graph開發(fā)插件,您可以基于Eclipse開發(fā)MaxCompute Graph程序。
- 編寫Graph代碼,使用本地調(diào)試進行基本的測試。
- 進行集群調(diào)試,驗證結(jié)果。
開發(fā)示例
本節(jié)以SSSP算法為例,為您介紹如何使用Eclipse開發(fā)和調(diào)試Graph程序。
- 創(chuàng)建Java工程,工程名為graph_examples。
- 將MaxCompute客戶端lib目錄下的Jar包加到Eclipse工程的Java Build Path中。
- 開發(fā)MaxCompute Graph程序。
實際開發(fā)過程中,通常會先復(fù)制一個例子(例如單源最短距離),然后進行修改。在本示例中,僅修改了Package的路徑為package com.aliyun.odps.graph.example。
- 編譯打包。
在Eclipse環(huán)境中,右鍵單擊源代碼目錄(圖中的src目錄), 生成Jar包,選擇目標(biāo)Jar包的保存路徑,例如D:\\odps\\clt\\odps-graph-example-sssp.jar。
- 使用MaxCompute客戶端運行SSSP,詳情請參見運行Graph。
本地調(diào)試
MaxCompute Graph支持本地調(diào)試模式,您可以使用Eclipse進行斷點調(diào)試。
- 下載一個odps-graph-local的Maven包。
- 選擇Eclipse工程,右鍵單擊Graph作業(yè)主程序(包含
main
函數(shù))文件,配置其運行參數(shù)( )。 - 在Arguments頁簽中,設(shè)置Program arguments參數(shù)為1 sssp_in sssp_out,作為主程序的輸入?yún)?shù)。
- 在Arguments頁簽中,設(shè)置VM arguments參數(shù)如下。
-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>
- 對于本地模式(即不指定odps.end.point參數(shù)),需要在warehouse創(chuàng)建sssp_in、sssp_out表,為輸入表sssp_in添加數(shù)據(jù),輸入數(shù)據(jù)如下所示。
1,"2:2,3:1,4:4" 2,"1:2,3:2,4:1" 3,"1:1,2:2,5:1" 4,"1:4,2:1,5:1" 5,"3:1,4:1"
關(guān)于warehouse的介紹請參見本地運行。
- 單擊Run,即可本地跑SSSP。
說明 參數(shù)設(shè)置可參見MaxCompute客戶端中conf/odps_config.ini的設(shè)置,上述為幾個常用參數(shù),其他參數(shù)說明如下:
- odps.runner.mode:取值為local,本地調(diào)試功能必須指定。
- odps.project.name:指定當(dāng)前Project,必須指定。
- odps.end.point:指定當(dāng)前MaxCompute服務(wù)的地址,可以不指定。如果不指定,只從warehouse讀取表或資源的meta和數(shù)據(jù),不存在則拋異常。如果指定,會先從warehouse讀取,不存在時會遠程連接MaxCompute讀取。
- odps.access.id:連接MaxCompute服務(wù)的AccessKey ID,只在指定odps.end.point時有效。
- odps.access.key:連接MaxCompute服務(wù)的AccessKey Secret,只在指定odps.end.point時有效。
- odps.cache.resources:指定使用的資源列表,效果與Jar命令的
-resources
相同。 - odps.local.warehouse:本地warehouse路徑,不指定時默認為./warehouse。
在Eclipse中本地運行SSSP的調(diào)試輸出信息,如下所示。Counters: 3 com.aliyun.odps.graph.local.COUNTER TASK_INPUT_BYTE=211 TASK_INPUT_RECORD=5 TASK_OUTPUT_BYTE=161 TASK_OUTPUT_RECORD=5 graph task finish
本地作業(yè)臨時目錄
- counters:存放作業(yè)運行的一些計數(shù)信息。
- inputs:存放作業(yè)的輸入數(shù)據(jù),優(yōu)先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務(wù)端讀取(如果設(shè)置了odps.end.point),默認一個input只讀10條數(shù)據(jù),可以通過
-Dodps.mapred.local.record.limit
參數(shù)進行修改,但是也不能超過1萬條記錄。 - outputs:存放作業(yè)的輸出數(shù)據(jù),如果本地warehouse中存在輸出表,outputs中的結(jié)果數(shù)據(jù)在作業(yè)執(zhí)行完后會覆蓋本地warehouse中對應(yīng)的表。
- resources:存放作業(yè)使用的資源,與輸入類似,優(yōu)先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務(wù)端讀取(如果設(shè)置了odps.end.point)。
- job.xml:作業(yè)配置。
- superstep:存放每一輪迭代的持久化信息。
集群調(diào)試
通過本地的調(diào)試后,您即可提交作業(yè)到集群進行測試。
- 配置MaxCompute客戶端。
- 使用
add jar /path/work.jar -f;
命令更新Jar包。 - 使用Jar命令運行作業(yè),查看運行日志和結(jié)果數(shù)據(jù)。
性能調(diào)優(yōu)
setSplitSize(long)
:輸入表切分大小,單位MB,大于0,默認64。setNumWorkers(int)
:設(shè)置作業(yè)Worker數(shù)量,范圍為[1,1000],默認值為1。Worker數(shù)由作業(yè)輸入字節(jié)數(shù)和splitSize
決定。setWorkerCPU(int)
:Map CPU資源,100為1CPU核,[50,800]之間,默認值為200。setWorkerMemory(int)
:Map內(nèi)存資源,單位MB,范圍為[256,12288],默認值為4096。setMaxIteration(int)
:設(shè)置最大迭代次數(shù),默認為-1,小于或等于0時表示最大迭代次數(shù)不作為作業(yè)終止條件。setJobPriority(int)
:設(shè)置作業(yè)優(yōu)先級,范圍為[0,9],默認值為9,數(shù)值越大優(yōu)先級越小。
- 可以考慮使用
setNumWorkers
方法增加Worker數(shù)目。 - 可以考慮使用
setSplitSize
方法減少切分大小,提高作業(yè)載入數(shù)據(jù)速度。 - 加大Worker的CPU或內(nèi)存。
- 設(shè)置最大迭代次數(shù),如果有些應(yīng)用結(jié)果精度要求不高,可以考慮減少迭代次數(shù),盡快結(jié)束。
setNumWorkers
與setSplitSize
配合使用,可以提高數(shù)據(jù)的載入速度。假設(shè)setNumWorkers
為workerNum
,setSplitSize
為splitSize
,總輸入字節(jié)數(shù)為inputSize
,則輸入被切分后的塊數(shù)splitNum=inputSize/splitSize
。workerNum
和splitNum
之間的關(guān)系如下:
- 情況一:
splitNum
=workerNum
,每個Worker負責(zé)載入1個Split。 - 情況二:
splitNum
>workerNum
,每個Worker負責(zé)載入1個或多個Split。 - 情況三:
splitNum
<workerNum
,每個Worker負責(zé)載入0個或1一個Split。
因此,應(yīng)調(diào)節(jié)workerNum
和splitSize
,在滿足前兩種情況時,數(shù)據(jù)載入比較快。迭代階段只需調(diào)節(jié)workerNum
。如果設(shè)置runtime partitioning
為False,則建議直接使用setSplitSize
控制Worker數(shù)量,或者保證滿足前兩種情況。當(dāng)出現(xiàn)第三種情況時,部分Worker上的切片會為0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>;
與setNumWorkers
,和setSplitSize
等效。
- 嘗試Combiner,將這些Key對應(yīng)點的消息進行本地聚合,減少消息發(fā)生。
開發(fā)人員可定義Combiner以減少存儲消息的內(nèi)存和網(wǎng)絡(luò)數(shù)據(jù)流量,縮短作業(yè)的執(zhí)行時間。
- 改進業(yè)務(wù)邏輯。
數(shù)據(jù)量大時,讀取磁盤中的數(shù)據(jù)可能耗費一部分處理時間。減少需要讀取的數(shù)據(jù)字節(jié)數(shù)可以提高總體的吞吐量,提高作業(yè)性能。您可通過以下方法進行改進:
- 減少輸入數(shù)據(jù)量:對某些決策性質(zhì)的應(yīng)用,處理數(shù)據(jù)采樣后子集所得到的結(jié)果只可能影響結(jié)果的精度,而并不會影響整體的準(zhǔn)確性,因此可以考慮先對數(shù)據(jù)進行特定采樣后再導(dǎo)入輸入表中進行處理。
- 避免讀取用不到的字段:MaxCompute Graph框架的TableInfo類支持讀取指定的列(以列名數(shù)組方式傳入),而非整個表或表分區(qū),這樣也可以減少輸入的數(shù)據(jù)量,提高作業(yè)性能。
內(nèi)置Jar包
-libjars
帶上這些Jar包。
- commons-codec-1.3.jar
- commons-io-2.0.1.jar
- commons-lang-2.5.jar
- commons-logging-1.0.4.jar
- commons-logging-api-1.0.4.jar
- guava-14.0.jar
- json.jar
- log4j-1.2.15.jar
- slf4j-api-1.4.3.jar
- slf4j-log4j12-1.4.3.jar
- xmlenc-0.52.jar