原生SDK概述
本文為您介紹較為常用的MapReduce核心接口。
如果您使用Maven,可以從Maven庫(kù)中搜索odps-sdk-mapred獲取不同版本的Java SDK,相關(guān)配置信息如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.40.10-public</version>
</dependency>
數(shù)據(jù)類型
MapReduce支持的數(shù)據(jù)類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute數(shù)據(jù)類型與Java數(shù)據(jù)類型的對(duì)應(yīng)關(guān)系如下。
MaxCompute SQL Type | Java Type |
BIGINT | LONG |
STRING | STRING |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | DATE |
DECIMAL | BIGDECIMAL |
MapReduce主要接口
主要接口 | 描述 |
MapperBase | 用戶自定義的Map函數(shù)需要繼承自此類。處理輸入表的記錄對(duì)象,加工處理成鍵值對(duì)集合輸出到Reduce階段,或者不經(jīng)過(guò)Reduce階段直接輸出結(jié)果記錄到結(jié)果表。不經(jīng)過(guò)Reduce階段而直接輸出計(jì)算結(jié)果的作業(yè),也可稱之為MapOnly作業(yè)。 |
ReducerBase | 用戶自定義的Reduce函數(shù)需要繼承自此類。對(duì)與一個(gè)鍵(Key)關(guān)聯(lián)的一組數(shù)值集(Values)進(jìn)行歸約計(jì)算。 |
TaskContext | 是MapperBase及ReducerBase多個(gè)成員函數(shù)的輸入?yún)?shù)之一,含有任務(wù)運(yùn)行的上下文信息。 |
JobClient | 用于提交和管理作業(yè),提交方式包括阻塞(同步)方式及非阻塞(異步) 方式。 |
RunningJob | 作業(yè)運(yùn)行時(shí)對(duì)象,用于跟蹤運(yùn)行中的MapReduce作業(yè)實(shí)例。 |
JobConf | 描述一個(gè)MapReduce任務(wù)的配置,通常在主程序(main函數(shù))中定義JobConf對(duì)象,然后通過(guò)JobClient提交作業(yè)給MaxCompute服務(wù)。 |
MapperBase
主要函數(shù)接口。
主要接口 | 描述 |
void cleanup(TaskContext context) | 在Map階段結(jié)束時(shí),map方法之后調(diào)用。 |
void map(long key, Record record, TaskContext context) | map方法,處理輸入表的記錄。 |
void setup(TaskContext context) | 在Map階段開始時(shí),map方法之前調(diào)用。 |
ReducerBase
主要函數(shù)接口。
主要接口 | 描述 |
void cleanup( TaskContext context) | 在Reduce階段結(jié)束時(shí),reduce方法之后調(diào)用。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | reduce方法,處理輸入表的記錄。 |
void setup( TaskContext context) | 在Reduce階段開始時(shí),reduce方法之前調(diào)用。 |
TaskContext
主要函數(shù)接口。
主要接口 | 描述 |
TableInfo[] getOutputTableInfo() | 獲取輸出的表信息。 |
Record createOutputRecord() | 創(chuàng)建默認(rèn)輸出表的記錄對(duì)象。 |
Record createOutputRecord(String label) | 創(chuàng)建給定label輸出表的記錄對(duì)象。 |
Record createMapOutputKeyRecord() | 創(chuàng)建Map輸出Key的記錄對(duì)象。 |
Record createMapOutputValueRecord() | 創(chuàng)建Map輸出Value的記錄對(duì)象。 |
void write(Record record) | 寫記錄到默認(rèn)輸出,用于Reduce端寫出數(shù)據(jù),可以在Reduce端多次調(diào)用。 |
void write(Record record, String label) | 寫記錄到給定label輸出,用于Reduce端寫出數(shù)據(jù)。可以在 Reduce端多次調(diào)用。 |
void write(Record key, Record value) | Map寫記錄到中間結(jié)果,可以在Map函數(shù)中多次調(diào)用。 可以在Map端多次調(diào)用。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | 讀取文件類型資源。 |
Iterator<Record > readResourceTable(String resourceName) | 讀取表類型資源。 |
Counter getCounter(Enum<? > name) | 獲取給定名稱的Counter對(duì)象。 |
Counter getCounter(String group, String name) | 獲取給定組名和名稱的Counter對(duì)象。 |
void progress() | 向MapReduce框架報(bào)告心跳信息。 如果用戶方法處理時(shí)間很長(zhǎng),且中間沒有調(diào)用框架,可以調(diào)用這個(gè)方法避免task超時(shí),框架默認(rèn)600秒超時(shí)。 |
MaxCompute的TaskContext接口中提供了progress功能,但此功能是為防止Worker長(zhǎng)時(shí)間運(yùn)行未結(jié)束,被框架誤認(rèn)為超時(shí)而被殺的情況出現(xiàn)。此接口更類似于向框架發(fā)送心跳信息,并不是用來(lái)匯報(bào)Worker進(jìn)度。
MaxCompute MapReduce默認(rèn)Worker超時(shí)時(shí)間為10分鐘(系統(tǒng)默認(rèn)配置,不受用戶控制),如果超過(guò)10分鐘,Worker仍然沒有向框架發(fā)送心跳(調(diào)用progress接口),框架會(huì)強(qiáng)制停止該Worker,MapReduce任務(wù)失敗退出。因此,建議您在Mapper/Reducer函數(shù)中,定期調(diào)用progress接口,防止框架認(rèn)為Worker超時(shí),誤殺任務(wù)。
JobConf
主要函數(shù)接口。
主要接口 | 描述 |
void setResources(String resourceNames) | 聲明本作業(yè)使用的資源。只有聲明的資源才能在運(yùn)行Mapper/Reducer時(shí)通過(guò)TaskContext對(duì)象讀取。 |
void setMapOutputKeySchema(Column[] schema) | 設(shè)置Mapper輸出到Reducer的Key屬性。 |
void setMapOutputValueSchema(Column[] schema) | 設(shè)置Mapper輸出到Reducer的Value屬性。 |
void setOutputKeySortColumns(String[] cols) | 設(shè)置Mapper輸出到Reducer的Key排序列。 |
void setOutputGroupingColumns(String[] cols) | 設(shè)置Key分組列。 |
void setMapperClass(Class<? extends Mapper > theClass) | 設(shè)置作業(yè)的Mapper函數(shù)。 |
void setPartitionColumns(String[] cols) | 設(shè)置作業(yè)指定的分區(qū)列。默認(rèn)是Mapper輸出Key的所有列。 |
void setReducerClass(Class<? extends Reducer > theClass) | 設(shè)置作業(yè)的Reducer。 |
void setCombinerClass(Class<? extends Reducer > theClass) | 設(shè)置作業(yè)的combiner。在Map端運(yùn)行,作用類似于單個(gè)Map對(duì)本地的相同Key值做Reduce。 |
void setSplitSize(long size) | 設(shè)置分片大小,單位MB,默認(rèn)值256。 |
void setNumReduceTasks(int n) | 設(shè)置Reducer任務(wù)數(shù),默認(rèn)為Mapper任務(wù)數(shù)的1/4。 |
void setMemoryForMapTask(int mem) | 設(shè)置Mapper任務(wù)中單個(gè)Worker的內(nèi)存大小,單位MB, 默認(rèn)值2048。 |
void setMemoryForReduceTask(int mem) | 設(shè)置Reducer任務(wù)中單個(gè)Worker的內(nèi)存大小,單位MB, 默認(rèn)值 2048。 |
通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
在Map端,Mapper輸出的Record會(huì)根據(jù)設(shè)置的PartitionColumns計(jì)算哈希值,決定分配到哪個(gè)Reducer,會(huì)根據(jù)KeySortColumns對(duì)Record進(jìn)行排序。
在Reduce端,輸入Records,再按照KeySortColumns排序后,會(huì)根據(jù)GroupingColumns指定的列對(duì)輸入的Records進(jìn)行分組,即會(huì)順序遍歷輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數(shù)調(diào)用的輸入。
JobClient
主要函數(shù)接口。
主要接口 | 描述 |
static RunningJob runJob(JobConf job) | 阻塞(同步)方式提交MapReduce作業(yè)后立即返回。 |
static RunningJob submitJob(JobConf job) | 非阻塞(異步)方式提交MapReduce作業(yè)后立即返回。 |
RunningJob
主要函數(shù)接口。
主要接口 | 描述 |
String getInstanceID() | 獲取作業(yè)運(yùn)行實(shí)例ID,用于查看運(yùn)行日志和作業(yè)管理。 |
boolean isComplete() | 查詢作業(yè)是否結(jié)束。 |
boolean isSuccessful() | 查詢作業(yè)實(shí)例是否運(yùn)行成功。 |
void waitForCompletion() | 等待直至作業(yè)實(shí)例結(jié)束。一般用于異步方式提交的作業(yè)。 |
JobStatus getJobStatus() | 查詢作業(yè)實(shí)例運(yùn)行狀態(tài)。 |
void killJob() | 結(jié)束此作業(yè)。 |
Counters getCounters() | 獲取Conter信息。 |
InputUtils
主要函數(shù)接口。
主要接口 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務(wù)輸入,可以被調(diào)用多次 ,新加入的表以append方式添加到輸入隊(duì)列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多張表到任務(wù)輸入中。 |
OutputUtils
主要函數(shù)接口。
主要接口 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務(wù)輸出,可以被調(diào)用多次 ,新加入的表以append方式添加到輸出隊(duì)列中。 |
static void setTables(TableInfo[] tables, JobConf conf) | 添加多張表到任務(wù)輸出中。 |
Pipeline
Pipeline是MR2的主體類。可以通過(guò)Pipeline.builder構(gòu)建一個(gè)Pipeline。Pipeline的主要接口如下。
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)
示例如下。
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
如上所示,您可以在main函數(shù)中構(gòu)建一個(gè)Map之后,連續(xù)接兩個(gè)Reduce的MapReduce任務(wù)。如果您比較熟悉MapReduce的基礎(chǔ)功能,即可輕松使用MR2。
建議您在使用MR2功能前,先了解MapReduce的基礎(chǔ)用法。
JobConf僅能夠配置Map后接單Reduce的MapReduce任務(wù)。