可視化MapReduce模型
可視化MapReduce模型在MapReduce模型的基礎(chǔ)上,新增了可視化可運(yùn)維的能力。您無(wú)需修改后端代碼,只需在SchedulerX控制臺(tái)將分布式模型改為可視化MapReduce,即可新增一個(gè)子任務(wù)列表頁(yè)面,并且可以查看每個(gè)子任務(wù)的詳情、結(jié)果和日志,同時(shí)支持每個(gè)子任務(wù)級(jí)別的重跑。
注意事項(xiàng)
僅專(zhuān)業(yè)版支持。
子任務(wù)個(gè)數(shù)不能超過(guò)1000個(gè)。
單個(gè)子任務(wù)的大小不能超過(guò)64 KB。
子任務(wù)顯示自定義標(biāo)簽信息時(shí),子任務(wù)對(duì)象需要實(shí)現(xiàn)指定接口。
ProcessResult的result返回值不能超過(guò)1000 Byte。
如果使用reduce,所有子任務(wù)結(jié)果會(huì)緩存在Master節(jié)點(diǎn),該情況對(duì)Master節(jié)點(diǎn)內(nèi)存壓力較大,建議子任務(wù)個(gè)數(shù)和result返回值不要太大。如果沒(méi)有reduce需求,使用MapJobProcessor接口即可。
SchedulerX不保證子任務(wù)絕對(duì)執(zhí)行一次。在特殊條件下會(huì)failover,可能導(dǎo)致子任務(wù)重復(fù)執(zhí)行,需要業(yè)務(wù)方自行實(shí)現(xiàn)冪等。
接口
繼承MapReduce模型所有接口,任務(wù)處理代碼開(kāi)發(fā)模型與MapReduce模型完全一致。具體信息,請(qǐng)參見(jiàn)MapReduce模型。
(可選)在MapReduce模型接口基礎(chǔ)上,支持設(shè)置每個(gè)子任務(wù)的標(biāo)簽展示(子任務(wù)對(duì)象需要實(shí)現(xiàn)com.alibaba.schedulerx.worker.processor.BizSubTask接口)。
接口
解釋
是否必選
public Map<String, String> labelMap()
實(shí)現(xiàn)輸出子任務(wù)標(biāo)簽信息,用于展示對(duì)應(yīng)子任務(wù)對(duì)象的業(yè)務(wù)自定義特征信息(如:賬戶(hù)名、商品Code、城市區(qū)域等)。
否
與MapReduce對(duì)比
對(duì)比項(xiàng) | MapReduce | 可視化MapReduce |
子任務(wù)數(shù)量 | 可支持百萬(wàn)級(jí) | 小于等于1000。 |
任務(wù)開(kāi)發(fā)模式 | 兩者相同 | |
子任務(wù)列表 | 不支持 | 支持。 |
子任務(wù)運(yùn)行詳情 | 不支持 | 支持,單個(gè)子任務(wù)執(zhí)行記錄、執(zhí)行狀態(tài)、日志、鏈路追蹤、運(yùn)行堆棧。 |
子任務(wù)標(biāo)簽 | 不支持 | 支持,子任務(wù)實(shí)現(xiàn)BizSubTask接口可查看業(yè)務(wù)標(biāo)簽信息。 |
子任務(wù)操作 | 不支持 | 支持,單個(gè)子任務(wù)支持停止、重跑。 |
任務(wù)開(kāi)發(fā)演示
賬戶(hù)批量處理
案例描述:對(duì)一批銀行賬戶(hù)進(jìn)行批量處理,每個(gè)賬號(hào)作為獨(dú)立的子任務(wù)在整個(gè)集群中進(jìn)行全局并行處理,并且每一個(gè)子任務(wù)在執(zhí)行列表中需要顯示其對(duì)應(yīng)的賬戶(hù)信息以便查看,可以方便的掌握每一個(gè)賬號(hào)地處理狀態(tài)及其執(zhí)行詳細(xì)信息。如下將提供相應(yīng)demo代碼供參考使用。
自定義賬號(hào)信息子任務(wù)對(duì)象,每個(gè)子任務(wù)對(duì)象支持展示其標(biāo)簽信息,需實(shí)現(xiàn)接口com.alibaba.schedulerx.worker.processor.BizSubTask,并實(shí)現(xiàn)labelMap方法。
public class ParallelAccountInfo implements BizSubTask { /** * 主鍵 */ private long id; private String name; private String accountId; public ParallelAccountInfo(long id, String name, String accountId) { this.id = id; this.name = name; this.accountId = accountId; } /** * 實(shí)現(xiàn)labelMap方法,用于設(shè)置對(duì)應(yīng)子任務(wù)的標(biāo)簽信息 * @return */ @Override public Map<String, String> labelMap() { Map<String, String> labelMap = new HashMap(); labelMap.put("戶(hù)名", name); return labelMap; } }
子任務(wù)對(duì)象實(shí)現(xiàn)對(duì)應(yīng)接口后,子任務(wù)列表才可展示出每個(gè)子任務(wù)對(duì)象獨(dú)有的標(biāo)簽信息(例如:案例中的戶(hù)名)用于區(qū)分每一個(gè)賬戶(hù)對(duì)象的業(yè)務(wù)處理情況,且支持按標(biāo)簽搜索。
賬號(hào)業(yè)務(wù)任務(wù)處理Processor,實(shí)現(xiàn)對(duì)單個(gè)賬號(hào)的業(yè)務(wù)邏輯處理,繼承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor。
public class ParallelJob extends MapReduceJobProcessor { private static final Logger logger = LoggerFactory.getLogger("schedulerx"); @Override public ProcessResult reduce(JobContext context) throws Exception { return new ProcessResult(true); } @Override public ProcessResult process(JobContext context) throws Exception { if(isRootTask(context)){ logger.info("構(gòu)建并行計(jì)算的子任務(wù)列表..."); List<ParallelAccountInfo> list = new LinkedList(); /** * 判斷如果是rootTask的情況下,構(gòu)建并行計(jì)算子任務(wù)對(duì)象列表 * 在實(shí)際業(yè)務(wù)場(chǎng)景中,用戶(hù)可自行根據(jù)業(yè)務(wù)場(chǎng)景加載子任務(wù)對(duì)象且該業(yè)務(wù)對(duì)象實(shí)現(xiàn)BizSubTask接口 * 場(chǎng)景案例: * 1、從數(shù)據(jù)庫(kù)中加載未被處理的客戶(hù)賬戶(hù)信息 * 2、構(gòu)建省份城市地區(qū)信息列表,按區(qū)域分發(fā)任務(wù)處理 * 3、根據(jù)業(yè)務(wù)標(biāo)簽作為子任務(wù)分類(lèi),如:電器、日用品、食品等 * 4、可根據(jù)時(shí)間作為子任務(wù)分類(lèi),如:按月(1月、2月...) */ for(int i=0; i < 20; i++){ list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"), "AC"+StringUtils.leftPad(i+"", 12, "0"))); } return map(list, "transfer"); }else { /** * 非rootTask,用戶(hù)可以獲取對(duì)應(yīng)的子任務(wù)信息進(jìn)行相應(yīng)的業(yè)務(wù)處理 */ ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask(); // 針對(duì)獲取的 obj子任務(wù)信息,進(jìn)行業(yè)務(wù)邏輯處理 // do something logger.info("處理子任務(wù)信息:{}", JSON.toJSONString(obj)); return new ProcessResult(true); } } }
完成任務(wù)開(kāi)發(fā)部署后,在控制臺(tái)配置相應(yīng)定時(shí)任務(wù)運(yùn)行,請(qǐng)參見(jiàn)操作步驟。
操作步驟
任務(wù)配置
登錄分布式任務(wù)調(diào)度平臺(tái),在左側(cè)導(dǎo)航欄,單擊任務(wù)管理。
在任務(wù)管理頁(yè)面,單擊創(chuàng)建任務(wù)。
在創(chuàng)建任務(wù)面板,執(zhí)行模式下拉列表選擇可視化MapReduce。
在高級(jí)配置區(qū)域配置相關(guān)信息。其他配置項(xiàng),請(qǐng)參見(jiàn)任務(wù)管理高級(jí)配置參數(shù)說(shuō)明。
配置項(xiàng)
說(shuō)明
分發(fā)策略
輪詢(xún)策略(默認(rèn)):每個(gè)Worker平均分配等量子任務(wù),適用于每個(gè)子任務(wù)處理耗時(shí)基本一致的場(chǎng)景。
WorkerLoad最優(yōu)策略:由主節(jié)點(diǎn)自動(dòng)感知Worker節(jié)點(diǎn)的負(fù)載情況,適用于子任務(wù)和Worker機(jī)器處理耗時(shí)有較大差異的場(chǎng)景。
說(shuō)明客戶(hù)端版本為1.10.14及以上。
子任務(wù)單機(jī)并發(fā)數(shù)
即單機(jī)執(zhí)行線(xiàn)程數(shù),默認(rèn)為5。如需加快執(zhí)行速度,可以調(diào)大該值。如果下游或者數(shù)據(jù)庫(kù)無(wú)法承接,可適當(dāng)調(diào)小。
子任務(wù)失敗重試次數(shù)
子任務(wù)失敗會(huì)自動(dòng)重試,默認(rèn)為0。
子任務(wù)失敗重試間隔
子任務(wù)失敗重試間隔,單位:秒,默認(rèn)為0。
子任務(wù)failover策略
當(dāng)執(zhí)行節(jié)點(diǎn)宕機(jī)下線(xiàn)后,是否將子任務(wù)重新分發(fā)給其他機(jī)器執(zhí)行。開(kāi)啟該配置后,發(fā)生failover時(shí),子任務(wù)可能會(huì)重復(fù)執(zhí)行,需自行做好冪等。
說(shuō)明客戶(hù)端版本為1.8.13及以上。
主節(jié)點(diǎn)參與執(zhí)行
主節(jié)點(diǎn)是否參與子任務(wù)執(zhí)行。在線(xiàn)可運(yùn)行Worker數(shù)量必須不低于2臺(tái),在子任務(wù)數(shù)量特別大時(shí),推薦關(guān)閉該參數(shù)。
說(shuō)明客戶(hù)端版本為1.8.13及以上。
可視化能力
任務(wù)執(zhí)行后,您可以在執(zhí)行列表頁(yè)面,單擊詳情查看對(duì)應(yīng)子任務(wù)的詳細(xì)執(zhí)行信息。
在子任務(wù)列表頁(yè)簽查看每個(gè)子任務(wù)處理的狀態(tài)。
在子任務(wù)列表頁(yè)簽,單擊子任務(wù)操作列的日志,可以查看每個(gè)子任務(wù)運(yùn)行的業(yè)務(wù)日志信息,分析執(zhí)行狀態(tài)結(jié)果。
任務(wù)執(zhí)行記錄在運(yùn)行中時(shí),在當(dāng)前執(zhí)行詳情頁(yè)簽,單擊查看堆棧,可以查看對(duì)應(yīng)機(jī)器處理線(xiàn)程運(yùn)行中的情況,分析當(dāng)前任務(wù)運(yùn)行異常情況。
在子任務(wù)列表頁(yè)簽,當(dāng)接入鏈路追蹤后,單擊對(duì)應(yīng)的TraceId,可以查詢(xún)每個(gè)子任務(wù)的執(zhí)行調(diào)用鏈路。具體操作,請(qǐng)參見(jiàn)如何接入鏈路追蹤。