MaxCompute不提供Graph開發(fā)插件,您可以基于Eclipse開發(fā)MaxCompute Graph程序。

開發(fā)流程如下:
  1. 編寫Graph代碼,使用本地調(diào)試進行基本的測試。
  2. 進行集群調(diào)試,驗證結(jié)果。

開發(fā)示例

本節(jié)以SSSP算法為例,為您介紹如何使用Eclipse開發(fā)和調(diào)試Graph程序。

操作步驟:
  1. 創(chuàng)建Java工程,工程名為graph_examples
  2. 將MaxCompute客戶端lib目錄下的Jar包加到Eclipse工程的Java Build Path中。
  3. 開發(fā)MaxCompute Graph程序。

    實際開發(fā)過程中,通常會先復(fù)制一個例子(例如單源最短距離),然后進行修改。在本示例中,僅修改了Package的路徑為package com.aliyun.odps.graph.example

  4. 編譯打包。

    在Eclipse環(huán)境中,右鍵單擊源代碼目錄(圖中的src目錄),Export > Java > JAR file生成Jar包,選擇目標(biāo)Jar包的保存路徑,例如D:\\odps\\clt\\odps-graph-example-sssp.jar

  5. 使用MaxCompute客戶端運行SSSP,詳情請參見運行Graph

本地調(diào)試

MaxCompute Graph支持本地調(diào)試模式,您可以使用Eclipse進行斷點調(diào)試。

操作步驟:
  1. 下載一個odps-graph-local的Maven包。
  2. 選擇Eclipse工程,右鍵單擊Graph作業(yè)主程序(包含main函數(shù))文件,配置其運行參數(shù)(Run As > Run Configurations)。
  3. Arguments頁簽中,設(shè)置Program arguments參數(shù)為1 sssp_in sssp_out,作為主程序的輸入?yún)?shù)。
  4. Arguments頁簽中,設(shè)置VM arguments參數(shù)如下。
    -Dodps.runner.mode=local
    -Dodps.project.name=<project.name>
    -Dodps.end.point=<end.point>
    -Dodps.access.id=<access.id> 
    -Dodps.access.key=<access.key>
    參數(shù)配置
  5. 對于本地模式(即不指定odps.end.point參數(shù)),需要在warehouse創(chuàng)建sssp_insssp_out表,為輸入表sssp_in添加數(shù)據(jù),輸入數(shù)據(jù)如下所示。
    1,"2:2,3:1,4:4"
    2,"1:2,3:2,4:1"
    3,"1:1,2:2,5:1"
    4,"1:4,2:1,5:1"
    5,"3:1,4:1"

    關(guān)于warehouse的介紹請參見本地運行

  6. 單擊Run,即可本地跑SSSP。
    說明 參數(shù)設(shè)置可參見MaxCompute客戶端中conf/odps_config.ini的設(shè)置,上述為幾個常用參數(shù),其他參數(shù)說明如下:
    • odps.runner.mode:取值為local,本地調(diào)試功能必須指定。
    • odps.project.name:指定當(dāng)前Project,必須指定。
    • odps.end.point:指定當(dāng)前MaxCompute服務(wù)的地址,可以不指定。如果不指定,只從warehouse讀取表或資源的meta和數(shù)據(jù),不存在則拋異常。如果指定,會先從warehouse讀取,不存在時會遠程連接MaxCompute讀取。
    • odps.access.id:連接MaxCompute服務(wù)的AccessKey ID,只在指定odps.end.point時有效。
    • odps.access.key:連接MaxCompute服務(wù)的AccessKey Secret,只在指定odps.end.point時有效。
    • odps.cache.resources:指定使用的資源列表,效果與Jar命令的-resources相同。
    • odps.local.warehouse:本地warehouse路徑,不指定時默認為./warehouse
    在Eclipse中本地運行SSSP的調(diào)試輸出信息,如下所示。
    Counters: 3
             com.aliyun.odps.graph.local.COUNTER
                     TASK_INPUT_BYTE=211
                     TASK_INPUT_RECORD=5
                     TASK_OUTPUT_BYTE=161
                     TASK_OUTPUT_RECORD=5
     graph task finish
    說明 上述示例中,需要本地warehouse下有sssp_insssp_out表。sssp_insssp_out的詳細信息請參見編寫Graph(可選)

本地作業(yè)臨時目錄

每運行一次本地調(diào)試,都會在Eclipse工程目錄下新建一個臨時目錄。
本地運行的Graph作業(yè),臨時目錄包括以下幾個目錄和文件:
  • counters:存放作業(yè)運行的一些計數(shù)信息。
  • inputs:存放作業(yè)的輸入數(shù)據(jù),優(yōu)先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務(wù)端讀取(如果設(shè)置了odps.end.point),默認一個input只讀10條數(shù)據(jù),可以通過-Dodps.mapred.local.record.limit參數(shù)進行修改,但是也不能超過1萬條記錄。
  • outputs:存放作業(yè)的輸出數(shù)據(jù),如果本地warehouse中存在輸出表,outputs中的結(jié)果數(shù)據(jù)在作業(yè)執(zhí)行完后會覆蓋本地warehouse中對應(yīng)的表。
  • resources:存放作業(yè)使用的資源,與輸入類似,優(yōu)先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務(wù)端讀取(如果設(shè)置了odps.end.point)。
  • job.xml:作業(yè)配置。
  • superstep:存放每一輪迭代的持久化信息。
說明 如果需要本地調(diào)試時輸出詳細日志,需要在src目錄下放一個log4j的配置文件log4j.properties_odps_graph_cluster_debug

集群調(diào)試

通過本地的調(diào)試后,您即可提交作業(yè)到集群進行測試。

操作步驟如下:
  1. 配置MaxCompute客戶端。
  2. 使用add jar /path/work.jar -f;命令更新Jar包。
  3. 使用Jar命令運行作業(yè),查看運行日志和結(jié)果數(shù)據(jù)。
說明 集群運行Graph的詳細介紹請參見編寫Graph(可選)

性能調(diào)優(yōu)

對性能有所影響的Graph Job配置項如下:
  • setSplitSize(long):輸入表切分大小,單位MB,大于0,默認64。
  • setNumWorkers(int):設(shè)置作業(yè)Worker數(shù)量,范圍為[1,1000],默認值為1。Worker數(shù)由作業(yè)輸入字節(jié)數(shù)和splitSize決定。
  • setWorkerCPU(int):Map CPU資源,100為1CPU核,[50,800]之間,默認值為200。
  • setWorkerMemory(int):Map內(nèi)存資源,單位MB,范圍為[256,12288],默認值為4096。
  • setMaxIteration(int):設(shè)置最大迭代次數(shù),默認為-1,小于或等于0時表示最大迭代次數(shù)不作為作業(yè)終止條件。
  • setJobPriority(int):設(shè)置作業(yè)優(yōu)先級,范圍為[0,9],默認值為9,數(shù)值越大優(yōu)先級越小。
通常情況下的調(diào)優(yōu)建議:
  • 可以考慮使用setNumWorkers方法增加Worker數(shù)目。
  • 可以考慮使用setSplitSize方法減少切分大小,提高作業(yè)載入數(shù)據(jù)速度。
  • 加大Worker的CPU或內(nèi)存。
  • 設(shè)置最大迭代次數(shù),如果有些應(yīng)用結(jié)果精度要求不高,可以考慮減少迭代次數(shù),盡快結(jié)束。
接口setNumWorkerssetSplitSize配合使用,可以提高數(shù)據(jù)的載入速度。假設(shè)setNumWorkersworkerNumsetSplitSizesplitSize,總輸入字節(jié)數(shù)為inputSize,則輸入被切分后的塊數(shù)splitNum=inputSize/splitSizeworkerNumsplitNum之間的關(guān)系如下:
  • 情況一:splitNum=workerNum,每個Worker負責(zé)載入1個Split。
  • 情況二:splitNum>workerNum,每個Worker負責(zé)載入1個或多個Split。
  • 情況三:splitNum<workerNum,每個Worker負責(zé)載入0個或1一個Split。

因此,應(yīng)調(diào)節(jié)workerNumsplitSize,在滿足前兩種情況時,數(shù)據(jù)載入比較快。迭代階段只需調(diào)節(jié)workerNum。如果設(shè)置runtime partitioning為False,則建議直接使用setSplitSize控制Worker數(shù)量,或者保證滿足前兩種情況。當(dāng)出現(xiàn)第三種情況時,部分Worker上的切片會為0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; setNumWorkers,和setSplitSize等效。

另外一種常見的性能問題為數(shù)據(jù)傾斜,反應(yīng)到Counters就是某些Worker處理的點或邊數(shù)量遠超過其他Worker。數(shù)據(jù)傾斜的原因通常是某些Key對應(yīng)的點、邊,或者消息的數(shù)量遠超過其他Key,這些Key被分到少量的Worker處理,導(dǎo)致這些Worker運行時間較長。解決方法:
  • 嘗試Combiner,將這些Key對應(yīng)點的消息進行本地聚合,減少消息發(fā)生。

    開發(fā)人員可定義Combiner以減少存儲消息的內(nèi)存和網(wǎng)絡(luò)數(shù)據(jù)流量,縮短作業(yè)的執(zhí)行時間。

  • 改進業(yè)務(wù)邏輯。
    數(shù)據(jù)量大時,讀取磁盤中的數(shù)據(jù)可能耗費一部分處理時間。減少需要讀取的數(shù)據(jù)字節(jié)數(shù)可以提高總體的吞吐量,提高作業(yè)性能。您可通過以下方法進行改進:
    • 減少輸入數(shù)據(jù)量:對某些決策性質(zhì)的應(yīng)用,處理數(shù)據(jù)采樣后子集所得到的結(jié)果只可能影響結(jié)果的精度,而并不會影響整體的準(zhǔn)確性,因此可以考慮先對數(shù)據(jù)進行特定采樣后再導(dǎo)入輸入表中進行處理。
    • 避免讀取用不到的字段:MaxCompute Graph框架的TableInfo類支持讀取指定的列(以列名數(shù)組方式傳入),而非整個表或表分區(qū),這樣也可以減少輸入的數(shù)據(jù)量,提高作業(yè)性能。

內(nèi)置Jar包

以下Jar包會默認加載到運行Graph程序的JVM中,您不必上傳這些資源,也不必在命令行的-libjars帶上這些Jar包。
  • commons-codec-1.3.jar
  • commons-io-2.0.1.jar
  • commons-lang-2.5.jar
  • commons-logging-1.0.4.jar
  • commons-logging-api-1.0.4.jar
  • guava-14.0.jar
  • json.jar
  • log4j-1.2.15.jar
  • slf4j-api-1.4.3.jar
  • slf4j-log4j12-1.4.3.jar
  • xmlenc-0.52.jar
說明 在JVM的Classpath中,上述內(nèi)置Jar包會位于您的Jar包之前,可能產(chǎn)生版本沖突。例如您的程序中使用了commons-codec-1.5.jar某個類的函數(shù),但是這個函數(shù)不在commons-codec-1.3.jar中,這時如果1.3版本無法實現(xiàn)您的需求,則只能等待MaxCompute升級新版本。