日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

本文為您介紹較為常用的MapReduce核心接口。

如果您使用Maven,可以從Maven庫(kù)中搜索odps-sdk-mapred獲取不同版本的Java SDK,相關(guān)配置信息如下。

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.40.10-public</version>
</dependency>

數(shù)據(jù)類型

MapReduce支持的數(shù)據(jù)類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute數(shù)據(jù)類型與Java數(shù)據(jù)類型的對(duì)應(yīng)關(guān)系如下。

MaxCompute SQL Type

Java Type

BIGINT

LONG

STRING

STRING

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

DATE

DECIMAL

BIGDECIMAL

MapReduce主要接口

主要接口

描述

MapperBase

用戶自定義的Map函數(shù)需要繼承自此類。處理輸入表的記錄對(duì)象,加工處理成鍵值對(duì)集合輸出到Reduce階段,或者不經(jīng)過(guò)Reduce階段直接輸出結(jié)果記錄到結(jié)果表。不經(jīng)過(guò)Reduce階段而直接輸出計(jì)算結(jié)果的作業(yè),也可稱之為MapOnly作業(yè)。

ReducerBase

用戶自定義的Reduce函數(shù)需要繼承自此類。對(duì)與一個(gè)鍵(Key)關(guān)聯(lián)的一組數(shù)值集(Values)進(jìn)行歸約計(jì)算。

TaskContext

是MapperBase及ReducerBase多個(gè)成員函數(shù)的輸入?yún)?shù)之一,含有任務(wù)運(yùn)行的上下文信息。

JobClient

用于提交和管理作業(yè),提交方式包括阻塞(同步)方式及非阻塞(異步) 方式。

RunningJob

作業(yè)運(yùn)行時(shí)對(duì)象,用于跟蹤運(yùn)行中的MapReduce作業(yè)實(shí)例。

JobConf

描述一個(gè)MapReduce任務(wù)的配置,通常在主程序(main函數(shù))中定義JobConf對(duì)象,然后通過(guò)JobClient提交作業(yè)給MaxCompute服務(wù)。

MapperBase

主要函數(shù)接口。

主要接口

描述

void cleanup(TaskContext context)

在Map階段結(jié)束時(shí),map方法之后調(diào)用。

void map(long key, Record record, TaskContext context)

map方法,處理輸入表的記錄。

void setup(TaskContext context)

在Map階段開始時(shí),map方法之前調(diào)用。

ReducerBase

主要函數(shù)接口。

主要接口

描述

void cleanup( TaskContext context)

在Reduce階段結(jié)束時(shí),reduce方法之后調(diào)用。

void reduce(Record key, Iterator<Record > values, TaskContext context)

reduce方法,處理輸入表的記錄。

void setup( TaskContext context)

在Reduce階段開始時(shí),reduce方法之前調(diào)用。

TaskContext

主要函數(shù)接口。

主要接口

描述

TableInfo[] getOutputTableInfo()

獲取輸出的表信息。

Record createOutputRecord()

創(chuàng)建默認(rèn)輸出表的記錄對(duì)象。

Record createOutputRecord(String label)

創(chuàng)建給定label輸出表的記錄對(duì)象。

Record createMapOutputKeyRecord()

創(chuàng)建Map輸出Key的記錄對(duì)象。

Record createMapOutputValueRecord()

創(chuàng)建Map輸出Value的記錄對(duì)象。

void write(Record record)

寫記錄到默認(rèn)輸出,用于Reduce端寫出數(shù)據(jù),可以在Reduce端多次調(diào)用。

void write(Record record, String label)

寫記錄到給定label輸出,用于Reduce端寫出數(shù)據(jù)。可以在 Reduce端多次調(diào)用。

void write(Record key, Record value)

Map寫記錄到中間結(jié)果,可以在Map函數(shù)中多次調(diào)用。 可以在Map端多次調(diào)用。

BufferedInputStream readResourceFileAsStream(String resourceName)

讀取文件類型資源。

Iterator<Record > readResourceTable(String resourceName)

讀取表類型資源。

Counter getCounter(Enum<? > name)

獲取給定名稱的Counter對(duì)象。

Counter getCounter(String group, String name)

獲取給定組名和名稱的Counter對(duì)象。

void progress()

向MapReduce框架報(bào)告心跳信息。 如果用戶方法處理時(shí)間很長(zhǎng),且中間沒有調(diào)用框架,可以調(diào)用這個(gè)方法避免task超時(shí),框架默認(rèn)600秒超時(shí)。

  • MaxCompute的TaskContext接口中提供了progress功能,但此功能是為防止Worker長(zhǎng)時(shí)間運(yùn)行未結(jié)束,被框架誤認(rèn)為超時(shí)而被殺的情況出現(xiàn)。此接口更類似于向框架發(fā)送心跳信息,并不是用來(lái)匯報(bào)Worker進(jìn)度。

  • MaxCompute MapReduce默認(rèn)Worker超時(shí)時(shí)間為10分鐘(系統(tǒng)默認(rèn)配置,不受用戶控制),如果超過(guò)10分鐘,Worker仍然沒有向框架發(fā)送心跳(調(diào)用progress接口),框架會(huì)強(qiáng)制停止該Worker,MapReduce任務(wù)失敗退出。因此,建議您在Mapper/Reducer函數(shù)中,定期調(diào)用progress接口,防止框架認(rèn)為Worker超時(shí),誤殺任務(wù)。

JobConf

主要函數(shù)接口。

主要接口

描述

void setResources(String resourceNames)

聲明本作業(yè)使用的資源。只有聲明的資源才能在運(yùn)行Mapper/Reducer時(shí)通過(guò)TaskContext對(duì)象讀取。

void setMapOutputKeySchema(Column[] schema)

設(shè)置Mapper輸出到Reducer的Key屬性。

void setMapOutputValueSchema(Column[] schema)

設(shè)置Mapper輸出到Reducer的Value屬性。

void setOutputKeySortColumns(String[] cols)

設(shè)置Mapper輸出到Reducer的Key排序列。

void setOutputGroupingColumns(String[] cols)

設(shè)置Key分組列。

void setMapperClass(Class<? extends Mapper > theClass)

設(shè)置作業(yè)的Mapper函數(shù)。

void setPartitionColumns(String[] cols)

設(shè)置作業(yè)指定的分區(qū)列。默認(rèn)是Mapper輸出Key的所有列。

void setReducerClass(Class<? extends Reducer > theClass)

設(shè)置作業(yè)的Reducer。

void setCombinerClass(Class<? extends Reducer > theClass)

設(shè)置作業(yè)的combiner。在Map端運(yùn)行,作用類似于單個(gè)Map對(duì)本地的相同Key值做Reduce。

void setSplitSize(long size)

設(shè)置分片大小,單位MB,默認(rèn)值256。

void setNumReduceTasks(int n)

設(shè)置Reducer任務(wù)數(shù),默認(rèn)為Mapper任務(wù)數(shù)的1/4。

void setMemoryForMapTask(int mem)

設(shè)置Mapper任務(wù)中單個(gè)Worker的內(nèi)存大小,單位MB, 默認(rèn)值2048。

void setMemoryForReduceTask(int mem)

設(shè)置Reducer任務(wù)中單個(gè)Worker的內(nèi)存大小,單位MB, 默認(rèn)值 2048。

  • 通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。

  • 在Map端,Mapper輸出的Record會(huì)根據(jù)設(shè)置的PartitionColumns計(jì)算哈希值,決定分配到哪個(gè)Reducer,會(huì)根據(jù)KeySortColumns對(duì)Record進(jìn)行排序。

  • 在Reduce端,輸入Records,再按照KeySortColumns排序后,會(huì)根據(jù)GroupingColumns指定的列對(duì)輸入的Records進(jìn)行分組,即會(huì)順序遍歷輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數(shù)調(diào)用的輸入。

JobClient

主要函數(shù)接口。

主要接口

描述

static RunningJob runJob(JobConf job)

阻塞(同步)方式提交MapReduce作業(yè)后立即返回。

static RunningJob submitJob(JobConf job)

非阻塞(異步)方式提交MapReduce作業(yè)后立即返回。

RunningJob

主要函數(shù)接口。

主要接口

描述

String getInstanceID()

獲取作業(yè)運(yùn)行實(shí)例ID,用于查看運(yùn)行日志和作業(yè)管理。

boolean isComplete()

查詢作業(yè)是否結(jié)束。

boolean isSuccessful()

查詢作業(yè)實(shí)例是否運(yùn)行成功。

void waitForCompletion()

等待直至作業(yè)實(shí)例結(jié)束。一般用于異步方式提交的作業(yè)。

JobStatus getJobStatus()

查詢作業(yè)實(shí)例運(yùn)行狀態(tài)。

void killJob()

結(jié)束此作業(yè)。

Counters getCounters()

獲取Conter信息。

InputUtils

主要函數(shù)接口。

主要接口

描述

static void addTable(TableInfo table, JobConf conf)

添加表table到任務(wù)輸入,可以被調(diào)用多次 ,新加入的表以append方式添加到輸入隊(duì)列中。

static void setTables(TableInfo [] tables, JobConf conf)

添加多張表到任務(wù)輸入中。

OutputUtils

主要函數(shù)接口。

主要接口

描述

static void addTable(TableInfo table, JobConf conf)

添加表table到任務(wù)輸出,可以被調(diào)用多次 ,新加入的表以append方式添加到輸出隊(duì)列中。

static void setTables(TableInfo[] tables, JobConf conf)

添加多張表到任務(wù)輸出中。

Pipeline

Pipeline是MR2的主體類。可以通過(guò)Pipeline.builder構(gòu)建一個(gè)Pipeline。Pipeline的主要接口如下。

    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder setOutputKeySchema(Column[] keySchema)
    public Builder setOutputValueSchema(Column[] valueSchema)
    public Builder setOutputKeySortColumns(String[] sortCols)
    public Builder setOutputKeySortOrder(SortOrder[] order)
    public Builder setPartitionColumns(String[] partCols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    public Builder setOutputGroupingColumns(String[] cols)

示例如下。

    Job job = new Job();
    Pipeline pipeline = Pipeline.builder()
     .addMapper(TokenizerMapper.class)
     .setOutputKeySchema(
         new Column[] { new Column("word", OdpsType.STRING) })
     .setOutputValueSchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .addReducer(SumReducer.class)
     .setOutputKeySchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .setOutputValueSchema(
         new Column[] { new Column("word", OdpsType.STRING),
         new Column("count", OdpsType.BIGINT) })
     .addReducer(IdentityReducer.class).createPipeline();
    job.setPipeline(pipeline);  
    job.addInput(...)
    job.addOutput(...)
    job.submit();

如上所示,您可以在main函數(shù)中構(gòu)建一個(gè)Map之后,連續(xù)接兩個(gè)Reduce的MapReduce任務(wù)。如果您比較熟悉MapReduce的基礎(chǔ)功能,即可輕松使用MR2。

說(shuō)明
  • 建議您在使用MR2功能前,先了解MapReduce的基礎(chǔ)用法。

  • JobConf僅能夠配置Map后接單Reduce的MapReduce任務(wù)。