日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Aggregator機制

本文將為您介紹Aggregator的執行機制、相關API,并以Kmeans Clustering為例說明Aggregator的具體用法。

Aggregator是MaxCompute Graph作業中常用的特征,特別適用于解決機器學習問題。MaxCompute Graph中,Aggregator用于匯總并處理全局信息。

Aggregator機制

Aggregator的邏輯分為兩部分:

  • 一部分在所有Worker上執行,即分布式執行。

  • 一部分只在Aggregator Owner所在的Worker上執行,即單點執行。

其中,在所有Worker上執行的操作包括創建初始值及局部聚合,然后將局部聚合結果發送給Aggregator Owner所在的Worker上。Aggregator Owner所在的Worker上聚合普通Worker發送過來的局部聚合對象,得到全局聚合結果,然后判斷迭代是否結束。全局聚合的結果會在下一輪超步(迭代)分發給所有Worker,供下一輪迭代使用。Aggregator機制

Aggregator的基本流程如下:

  1. 每個Worker啟動時執行createStartupValue用于創建AggregatorValue。

  2. 每輪迭代開始前,每個Worker執行createInitialValue來初始化本輪的AggregatorValue。

  3. 一輪迭代中每個點通過context.aggregate()來執行aggregate()實現Worker內的局部迭代。

  4. 每個Worker將局部迭代結果發送給AggregatorOwner所在的Worker。

  5. AggregatorOwner所在Worker執行多次merge,實現全局聚合。

  6. AggregatorOwner所在Worker執行terminate用于處理全局聚合結果,并決定是否結束迭代。

Aggregator的API

Aggregator共提供了5個API供您實現。API的調用時機及常規用途如下:

  • createStartupValue(context)

    該API在所有Worker上執行一次,調用時機是所有超步開始之前,通常用于初始化AggregatorValue。在第0輪超步中,調用WorkerContext.getLastAggregatedValue()ComputeContext.getLastAggregatedValue()可以獲取該API初始化的AggregatorValue對象。

  • createInitialValue(context)

    該API在所有Worker上每輪超步開始時調用一次,用于初始化本輪迭代所用的AggregatorValue。通常操作是通過 WorkerContext.getLastAggregatedValue()得到上一輪迭代的結果,然后執行部分初始化操作。

  • aggregate(value, item)

    該API同樣在所有Worker上執行,與上述API不同的是,該API由用戶顯示調用ComputeContext#aggregate(item)來觸發,而上述兩個API由框架自動調用。該API用于執行局部聚合操作,其中第一個參數value是本Worker在該輪超步已經聚合的結果(初始值是createInitialValue返回的對象),第二個參數是您的代碼調用ComputeContext#aggregate(item)傳入的參數。該API中通常用item來更新value實現聚合。所有aggregate執行完后,得到的value就是該Worker的局部聚合結果,然后由框架發送給Aggregator Owner所在的Worker。

  • merge(value, partial)

    該API執行于Aggregator Owner所在Worker,用于合并各Worker局部聚合的結果,達到全局聚合對象。與aggregate類似,value是已經聚合的結果,而partial待聚合的對象,同樣用partial更新value

    假設有3個Worker,分別是w0、w1、w2,其局部聚合結果是p0、p1、p2。例如,發送到Aggregator Owner所在Worker的順序為p1、p0、p2,則merge執行次序為:

    1. 首先執行merge(p1, p0),這樣p1和p0就聚合為p1。

    2. 然后執行merge(p1, p2),p1和p2聚合為p1,而p1即為本輪超步全局聚合的結果。

    由上述示例可見,當只有一個Worker時,不需要執行merge方法,即merge()不會被調用。

  • terminate(context, value)

    當Aggregator Owner所在Worker執行完merge()后,框架會調用terminate(context, value)執行最后的處理。其中第二個參數value,即為merge()最后得到全局聚合,在該方法中可以對全局聚合繼續修改。執行完terminate()后,框架會將全局聚合對象分發給所有Worker,供下一輪超步使用。terminate()方法的一個特殊之處在于,如果返回True,則整個作業就結束迭代,否則繼續執行。在機器學習場景中,通常判斷收斂后返回True以結束作業。

Kmeans Clustering示例

下面以典型的Kmeans Clustering為例,為您介紹Aggregator的具體用法。

說明

完整代碼請參見Kmeans,此處為解析代碼。

  • GraphLoader部分

    GraphLoader部分用于加載輸入表,并轉換為圖的點或邊。這里我們輸入表的每行數據為一個樣本,一個樣本構造一個點,并用Vertex的Value來存放樣本。

    首先定義一個Writable類KmeansValue作為Vertex的value類型。

    public static class KmeansValue implements Writable {
        DenseVector sample;
        public KmeansValue() {
        }
        public KmeansValue(DenseVector v) {
            this.sample = v;
        }
        @Override
            public void write(DataOutput out) throws IOException {
            wirteForDenseVector(out, sample);
        }
        @Override
            public void readFields(DataInput in) throws IOException {
            sample = readFieldsForDenseVector(in);
        }
    }

    KmeansValue中封裝一個DenseVector對象來存放一個樣本,這里DenseVector類型來自matrix-toolkits-java,而wirteForDenseVector()readFieldsForDenseVector()用于實現序列化及反序列化。

    自定義的KmeansReader代碼,如下所示。

    public static class KmeansReader extends
        GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
        @Override
            public void load(
            LongWritable recordNum,
            WritableRecord record,
            MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
            throws IOException {
            KmeansVertex v = new KmeansVertex();
            v.setId(recordNum);
            int n = record.size();
            DenseVector dv = new DenseVector(n);
            for (int i = 0; i < n; i++) {
                dv.set(i, ((DoubleWritable)record.get(i)).get());
            }
            v.setValue(new KmeansValue(dv));
            context.addVertexRequest(v);
        }
    }

    KmeansReader中,每讀入一行數據創建一個點,這里用recordNum作為點的ID,將record內容轉換成DenseVector對象并封裝進VertexValue中。

  • Vertex部分

    自定義的KmeansVertex代碼如下。邏輯非常簡單,每輪迭代要做的事情就是將自己維護的樣本執行局部聚合。具體邏輯參見下面Aggregator的實現。

    public static class KmeansVertex extends
        Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {
        @Override
            public void compute(
            ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
            Iterable<NullWritable> messages) throws IOException {
            context.aggregate(getValue());
        }
    }
  • Aggregator部分

    整個Kmeans的主要邏輯集中在Aggregator中。首先是自定義的KmeansAggrValue,用于維護要聚合及分發的內容。

    public static class KmeansAggrValue implements Writable {
        DenseMatrix centroids;
        DenseMatrix sums; // used to recalculate new centroids
        DenseVector counts; // used to recalculate new centroids
        @Override
            public void write(DataOutput out) throws IOException {
            wirteForDenseDenseMatrix(out, centroids);
            wirteForDenseDenseMatrix(out, sums);
            wirteForDenseVector(out, counts);
        }
        @Override
            public void readFields(DataInput in) throws IOException {
            centroids = readFieldsForDenseMatrix(in);
            sums = readFieldsForDenseMatrix(in);
            counts = readFieldsForDenseVector(in);
        }
    }

    KmeansAggrValue維護了3個對象:

    • centroids是當前的K個中心點。如果樣本是m維,centroids就是一個K*m的矩陣。

    • sums是和centroids大小一樣的矩陣,每個元素記錄了到特定中心點最近的樣本特定維之和。例如sums(i,j)是到第i個中心點最近的樣本的第j維度之和。

    • counts是個K維的向量,記錄到每個中心點距離最短的樣本個數。sumscounts一起用于計算新的中心點,也是要聚合的主要內容。

    接下來是自定義的Aggregator實現類KmeansAggregator,按照上述API的順序分析其實現。

    1. createStartupValue()的實現。

      public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
          public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
              KmeansAggrValue av = new KmeansAggrValue();
              byte[] centers = context.readCacheFile("centers");
              String lines[] = new String(centers).split("\n");
              int rows = lines.length;
              int cols = lines[0].split(",").length; // assumption rows >= 1
              av.centroids = new DenseMatrix(rows, cols);
              av.sums = new DenseMatrix(rows, cols);
              av.sums.zero();
              av.counts = new DenseVector(rows);
              av.counts.zero();
              for (int i = 0; i < lines.length; i++) {
                  String[] ss = lines[i].split(",");
                  for (int j = 0; j < ss.length; j++) {
                      av.centroids.set(i, j, Double.valueOf(ss[j]));
                  }
              }
              return av;
          }
      }

      在該方法中初始化一個KmeansAggrValue對象,然后從資源文件centers中讀取初始中心點,并賦值給centroids。而sumscounts初始化為0。

    2. createInitialValue()的實現。

      @Override
      public KmeansAggrValue createInitialValue(WorkerContext context)
          throws IOException {
          KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0);
          // reset for next iteration
          av.sums.zero();
          av.counts.zero();
          return av;
      }

      該方法首先獲取上一輪迭代的KmeansAggrValue,然后將sumscounts清零,只保留了上一輪迭代出的centroids

    3. aggregate()的實現。

      @Override
      public void aggregate(KmeansAggrValue value, Object item)
          throws IOException {
          DenseVector sample = ((KmeansValue)item).sample;
          // find the nearest centroid
          int min = findNearestCentroid(value.centroids, sample);
          // update sum and count
          for (int i = 0; i < sample.size(); i ++) {
              value.sums.add(min, i, sample.get(i));
          }
          value.counts.add(min, 1.0d);
      }

      該方法中調用findNearestCentroid()找到樣本item距離最近的中心點索引,然后將其各個維度加到sums上,最后counts計數加1。

    以上3個方法執行于所有Worker上,實現局部聚合。在Aggregator Owner所在Worker執行的全局聚合相關操作如下:

    1. merge()的實現。

      @Override
      public void merge(KmeansAggrValue value, KmeansAggrValue partial)
          throws IOException {
          value.sums.add(partial.sums);
          value.counts.add(partial.counts);
      }

      merge的實現邏輯很簡單,就是把各個Worker聚合出的sumscounts相加即可。

    2. terminate()的實現。

      @Override
      public boolean terminate(WorkerContext context, KmeansAggrValue value)
          throws IOException {
          // Calculate the new means to be the centroids (original sums)
          DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
          // print old centroids and new centroids for debugging
          System.out.println("\nsuperstep: " + context.getSuperstep() +
                             "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);
          boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
          System.out.println("superstep: " + context.getSuperstep() + "/"
                             + (context.getMaxIteration() - 1) + " converged: " + converged);
          if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
              // converged or reach max iteration, output centriods
              for (int i = 0; i < newCentriods.numRows(); i++) {
                  Writable[] centriod = new Writable[newCentriods.numColumns()];
                  for (int j = 0; j < newCentriods.numColumns(); j++) {
                      centriod[j] = new DoubleWritable(newCentriods.get(i, j));
                  }
                  context.write(centriod);
              }
              // true means to terminate iteration
              return true;
          }
          // update centriods
          value.centroids.set(newCentriods);
          // false means to continue iteration
          return false;
      }

      teminate()中首先根據sumscounts調用calculateNewCentroids()求平均計算出新的中心點。然后調用isConverged()根據新老中心點歐拉距離判斷是否已經收斂。如果收斂或迭代次數達到最大數,則將新的中心點輸出并返回True,以結束迭代。否則更新中心點并返回False以繼續迭代。

  • main方法

    main方法用于構造GraphJob,然后設置相應配置,并提交作業。

    public static void main(String[] args) throws IOException {
        if (args.length < 2)
            printUsage();
        GraphJob job = new GraphJob();
        job.setGraphLoaderClass(KmeansReader.class);
        job.setRuntimePartitioning(false);
        job.setVertexClass(KmeansVertex.class);
        job.setAggregatorClass(KmeansAggregator.class);
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
        // default max iteration is 30
        job.setMaxIteration(30);
        if (args.length >= 3)
            job.setMaxIteration(Integer.parseInt(args[2]));
        long start = System.currentTimeMillis();
        job.run();
        System.out.println("Job Finished in "
                           + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
    }
    說明

    job.setRuntimePartitioning(false)設置為False后,各個Worker加載的數據不再根據Partitioner重新分區,即誰加載的數據誰維護。