日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

執行機制

實現原理

每個UDAF的實現在單一的類中,該類需要繼承自基類AggFunc并實現相關接口。主要接口如下(分為local和global兩個部分):

// 以下四個函數供collect階段調用
virtual bool initCollectInput(const TablePtr &inputTable);
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;

// 以下四個函數供merge階段使用
virtual bool initMergeInput(const TablePtr &inputTable);
virtual bool initResultOutput(const TablePtr &outputTable);
virtual bool merge(Row inputRow, Accumulator *acc);
virtual bool outputResult(Accumulator *acc, Row outputRow) const;

UDAF在運行時主要分為兩個階段:1.collect階段 2.merge階段。這兩個階段分別運行在Searcher和Qrs上:執行引擎先會在Searcher環境下調用上述collect階段的四個函數,在數據集上完成初步統計;待所有Searcher上的collect階段執行完畢后,會將中間結果匯總到Qrs上,然后在Qrs環境下調用上述merge階段的四個函數,對collect階段的輸出進一步加工,并輸出最終結果。

下面以內置函數avg為例,結合HA3的運行流程,介紹該UDAF在這兩個階段的實現。

collect階段

collect階段會運行在Searcher上,引擎將根據本Searcher上的聚合結果,調用AggFunc的 collect 方法進行初步統計,并將結果保存在相應的Accumulator對象中。其中,每個分組會有自己對應的Accumulator,用于存放該分組統計過程中的狀態信息。

// 初始化一些`ColumnData`對象,用于訪問輸入表中數據
virtual bool initCollectInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`對象,用于將Accumulator序列化到輸出表
virtual bool initAccumulatorOutput(const TablePtr &outputTable) = 0;
// collect階段主要過程,將多行輸入數據的統計結果存儲在對應Accumulator對象上
virtual bool collect(Row inputRow, Accumulator *acc) = 0;
// 將Accumulator序列化到輸出表上(方便網絡傳輸),之后會由Searcher匯總到Qrs上
virtual bool outputAccumulator(Accumulator *acc, Row outputRow) const = 0;

引擎對每行數據調用collect方法前,會先計算出該行數據的GroupKey作為分組依據,取出該分組對應的Accumulator一同傳入。用戶在收到當前數據后,根據數據內容修改Accumulator的狀態。

對Avg函數來講,collect階段要做的事情就是記錄當前Group下的數據條目數以及數值總和,為未來均值的計算作準備。

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::collect(Row inputRow, Accumulator *acc) {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    avgAcc->count++;
    avgAcc->sum += _inputColumn->get(inputRow);
    return true;
}

引擎通過訪問 AvgAggFuncCreator::createLocalFunction ,獲得當前階段的AggFunc。注意,這一階段的輸出AvgAccumulator 上的所有屬性。

AggFunc *AvgAggFuncCreator::createLocalFunction(
        const vector<ValueType> &inputTypes,
        const vector<string> &inputFields,
        const vector<string> &outputFields);

merge階段

本階段在Qrs上執行,用于將各個Searcher上返回的初步統計結果進行處理,輸出最終的結果。主要的處理過程在AggFunc的 merge 函數內

// 初始化一些`ColumnData`對象,用于訪問輸入表中數據,包括Accumulator數據
virtual bool initMergeInput(const TablePtr &inputTable);
// 初始化一些`ColumnData`對象,用于輸出統計結果輸出結果
virtual bool initResultOutput(const TablePtr &outputTable);
// merge階段主要過程,將來自多個Searcher的Accumulator信息整合
virtual bool merge(Row inputRow, Accumulator *acc);
// 計算最終結果并輸出
virtual bool outputResult(Accumulator *acc, Row outputRow) const;

例如單次查詢中,多個Searcher中都有GroupKey=Apple的Accumulator上報,則在merge階段引擎會為GroupKey=Apple新生成一個Accumulator,用于將各個Searcher傳入的相關Accumulator信息聚合。

可以注意到Avg函數在merge階段,需要將所有collect階段的Accumulator.count累加,才能得到該Group在所有Searcher上的數據條目數。這與collect階段統計count的實現并不相同,這也是為什么要針對兩個階段分別設置處理函數的原因。

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::merge(Row inputRow, Accumulator *acc) {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    avgAcc->count += _countColumn->get(inputRow);
    avgAcc->sum += _sumColumn->get(inputRow);
    return true;
}

當所有Group的Accumulator通過 merge 函數統計完畢后,引擎會依次將每個Group的Accumulator并傳入`outputResult`函數獲得最終輸出。Avg函數這個階段只需要執行 avg = sum/count 即可獲得該Group的均值

template<typename InputType, typename AccumulatorType>
bool AvgAggFunc<InputType, AccumulatorType>::outputResult(Accumulator *acc, Row outputRow) const {
    AvgAccumulator<AccumulatorType> *avgAcc = static_cast<AvgAccumulator<AccumulatorType> *>(acc);
    assert(avgAcc->count > 0);
    double avg = (double)avgAcc->sum / avgAcc->count;
    _avgColumn->set(outputRow, avg);
    return true;
}

引擎通過訪問 AvgAggFuncCreator::createGlobalFunction ,獲得當前階段的AggFunc。注意,這一階段的輸入是Accumulator字段。

AggFunc *AvgAggFuncCreator::createGlobalFunction(
        const vector<ValueType> &inputTypes,
        const vector<string> &inputFields,
        const vector<string> &outputFields);

特殊情況

為提升性能,當執行引擎發現 GROUP BY 的字段即為Searchers數據分列的依據時(即各個Searcher之間不會有數據的GroupKey相同),會將UDAF的merge階段也下沉到Searcher上進行。即在單次Query中,每個Searcher會依次執行UDAF的collect階段、merge兩個階段(但會跳過其中無用的Accumulator序列化和反序列化),Qrs僅僅將各個Searcher返回的聚合統計結果進行簡單粘貼,不再執行merge階段。

此優化可以有效降低某些場景下Qrs的負載,提升對Query的處理性能。該優化對UDAF的編寫者是無感知的,無需為此調整UDAF的設計。