PolarDB IMCI執行器默認用行號表示執行的中間結果,當大查詢所需數據量無法完全存放于內存時則可能會引發大量隨機且重復IO,從而影響執行效率。為了解決上述問題,IMCI執行器實現了基于中間結果物化的算子集合,本文介紹了HashJoin算子的物化版本HashMatch的實現細節。
設計方案
HashMatch實現主要分為build與probe兩個階段,其中build階段將左表每一行按join謂詞作為key構建出散列表,而probe階段則遍歷右表每一行并根據其對應的join謂詞查找散列表,最終針對不同join類型依匹配結果輸出對應結果集。
對于build階段,可以將左表所有數據構建出一個散列表,但只用一個散列表存放全部數據會導致該散列表比較大,在構建過程中可能存在相對嚴重沖突和不斷擴容。為避免此問題,build階段可以將左表數據按一定規則進行分區,每一個分區各自構建獨立散列表,而probe階段則根據右表每一行所在分區查找對應分區上的散列表進行相應處理。
Build階段
在IMCI中HashMatch的build功能是在DoOpen中完成,實際分為DoBuild與DoMerge兩階段,每一階段均采用線程組并發處理。
DoBuild
DoBuild階段線程組Workers各自向左表取數據,并按照數據分區Partition來構建每一分區的獨立散列表:
Worker\Partition | Partition0 | Partition1 | ... | PartitionN |
Worker0 | HashMap00 | HashMap01 | ... | HashMap0N |
Worker1 | HashMap10 | HashMap11 | ... | HashMap1N |
... | ... | ... | ... | ... |
WorkerM | HashMapM0 | HashMapM1 | ... | HashMapMN |
即每一個Worker在每一個Partition均構建一個散列表HashMap。其實除HashMap外,還保存著一組chunk對象,其保存物化后真正結果,而HashMap的uint64類型value只標記當前key所對應chunk位置,其中uint64按位分拆為uint16/uint16/uint32三部分,分別表示所屬Worker/chunk內偏移/chunk數組索引等。 每一Worker并行從左表中取到元組,并按分區規則將該元組無須加鎖直接插入到該Worker和Partition所對應HashMap中,不斷重復該build步驟直到所有Worker取完左表為止。
DoMerge
DoBuild階段完成后,每一Worker在每一個Partition均構建出一張散列表HashMap。 Build階段構建出散列表主要用于Probe階段進行查找判斷是否匹配,既然Build階段數據是按分區構建,那Probe階段也需要根據分區規則到指定分區的散列表中查找。 而目前DoBuild構建出來的每一個分區均有Worker個散列表,當然Probe時可以依次查找該Partition的所有Worker散列表,但為了后期Probe階段的便利性和查找性能,HashMatch在DoBuild后進行DoMerge,即將每一Partition上所有Worker散列表合成一個散列表。
Build\Partition | Partition0 | Partition1 | ... | PartitionN |
Merge | HashMap0 | HashMap1 | ... | HashMapN |
DoMerge由線程組來完成,為了避免無意義鎖同步操作,采用每一線程獨自合并一個分區方案,由于Partition數目往往遠大于Worker數目,DoMerge階段各線程承擔工作量基本一致。
Build落盤
由于每一個Worker處理比較均衡,因此可以假設每個Worker處理數據量大致相同,直接將總內存均分值作為Worker內存配額。
限于內存容量,HashMatch并非總能將所有分區的HashMap與chunks維持在內存中,需要能夠按一定規則進行落盤。由于HashMap與chunks均按分區隔開,因此當內存不足時按分區落盤比較直觀。
當出現內存不足時,需要按一定規則將一些分區數據落盤,以便內存中分區能夠正常進行Build與Probe階段。目前HashMatch采用從最高分區開始整區落盤,直到能夠完成處理前面分區,若出現連一個分區均無法處理時則直接拋出OOM。
在DoBuild不斷構建的過程中,若當前Worker出現內存不足導致HashMap無法插入KV或不能保存chunk數據時,需要將該Worker內存中編號最高分區的數據進行落盤,即將chunks集合按chunk寫入臨時文件中并釋放chunks內存,同時直接刪除HashMap而不需要落盤,后面處理該分區時再從臨時文件中加載chunks集合并通過chunks數據構建出該分區的HashMap。 對于一個Worker在內存中的最高分區號,其它Worker也是可見的。當一個Worker看到其它Worker的內存最高分區號比自己的小時,該Worker也會更新自己的最高分區號,并在適當時機進行內存釋放,在DoBuild階段也會不再構建大于最高分區號的分區中HashMap,但還是會將數據保存到chunk中,當chunk滿后直接落盤。
Probe階段
Build階段讀取左表并構建出散列表,而Probe階段讀取右表數據后查找散列表并根據匹配情況進行輸出, 既然Build階段已經將數據進行分區構建,那Probe階段也需要按Build階段所采用的數據分區規則來進行分區處理。
DoFetch
Probe階段同樣采用線程組處理方式,由父結點的Fetch操作來驅動。在DoFetch過程中,HashMatch的每個Worker同樣不斷fetch右表數據,對于fetch到的每一元組按分區規則到指定分區的HashMap中查找,然后根據匹配情況進行處理,不斷重復該probe步驟直到所有Worker取完右表為止。
Probe落盤
若Build階段中內存無法保存所有分區時,Probe階段也需要針對內存分區和磁盤分區進行分別處理。
在DoFetch過程中,當Worker取到右表數據后,若該元組對應的分區在內存中則直接查找HashMap進行匹配處理,若該分區在磁盤中,則需要將該元組保存到該Worker所屬Partition的chunk中,當該chunk滿時則需要刷盤并釋放chunk內存。當Worker取完右表并probe完成后,則表示內存中分區數據已經處理完成,可以釋放內存中所有分區。
當全部處理完內存中的分區后,開始處理磁盤中的分區,由于磁盤中分區的數據已經按分區保存在不同臨時文件中,為了避免鎖同步,probe階段仍采用一個磁盤分區由單獨Worker獨立完成,由于Partition數目往往遠大于Worker數目,因此一般不會存在Worker處理不均問題。
當Worker開始處理磁盤中分區時,主要也是分為build與probe兩階段:
build階段:先從該分區的臨時文件中不斷讀取左表數據并序列化出chunk,然后根據左表chunk數據不斷構建HashMap,不斷重復該build步驟直到該Worker讀取完左表數據為止。
probe階段:從該分區的臨時文件中不斷讀取右表數據并序列化出chunk,然后對其每一元組在該HashMap中查找根據匹配情況進行處理,不斷重復該probe步驟直到該Worker讀取完右表數據為止。
當所有Worker處理完所有磁盤分區后則整個HashMatch結束。雖然文檔中按內存分區與磁盤分區進行不同處理說明,但實現時統一到了一套代碼中。
Probe流程
HashMatch中probe主要由ProbeMem、ProbeLeft與ProbeDisk等三個步驟組成,但其真正probe處理均由Probe函數完成:
ProbeMem用于從右表讀取數據并根據數據分區在內存或磁盤分別進行處理。若在內存中直接調用Probe處理,否則將數據保存到臨時文件,以便ProbeDisk處理指定磁盤分區時重新加載后再調用Probe處理。
ProbeLeft主要用于LeftOuter/LeftSemi/LeftAntiSemi等Left類型的Join,其遍歷整個HashMap所有KV并過濾出已匹配或未匹配過的元組。
ProbeDisk用于磁盤分區的probe操作,按分區來處理,處理指定磁盤分區時先從該分區的臨時文件中加載chunk,然后直接調用Probe處理,若為Left類型的Join,還需要調用ProbeLeft對該分區進行處理。
Join邏輯
HashMatch實現Inner/LeftOuter/RightOuter/LeftSemi/LeftAntiSemi/RightSemi/RightAntiSemi及其PostFilter功能。所有join類型主體邏輯均可分為build與probe兩個階段,其中build階段基本相同(區別在于對null的處理),主要區別在于probe階段。在此只簡單描述不同join類型的處理邏輯。
Inner
對于右表每一元組,若該元組非null且匹配左表HashMap中元組則輸出該左表和右表元組。
LeftOuter
對于右表每一元組,若該元組非null且匹配左表HashMap中的元組,則輸出該左表和右表元組。 遍歷完右表后,對左表中所有均沒被匹配過的元組輸出該左表元組,而右表元組位置為null。 對于存在PostFilter的LeftOuter,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
RightOuter
對于右表每一元組,若該元組非null且匹配左表HashMap中元組則輸出該左表和右表元組,若不匹配則輸出右表元組,而左表元組位置置null。對于存在PostFilter的RightOuter,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
LeftSemi
LeftSemi流程類似LeftOuter,但其并不真正輸出左表和右表元組,而根據下面真值表輸出左表元組和NULL/TRUE/FALSE值,或僅輸出左表元組,或不輸出等。 對于存在PostFilter的LeftSemi,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
//+------------------------------+--------------+----------------+
//| mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//| normal true | (left, TRUE) | (left, ONLY) |
//+------------------------------+--------------+----------------+
//+------------------------------+--------------+----------------+
//| ! mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//|NULL v.s. (empty set) | | |
//|e.g., NULL IN (empty set) | (left, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|NULL v.s. (set) | | |
//|e.g., NULL IN (1, 2, 3) | (left, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|left_row v.s. (set with NULL) | | |
//|e.g., 10 IN (1, NULL, 3) | (left, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|normal false | | |
//|e.g., 10 IN (1, 2, 3) | (left, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
LeftAntiSemi
LeftAntiSemi流程類似LeftOuter,但其并不真正輸出左表和右表元組,而根據下面真值表僅輸出左表元組,或不輸出等。 對于存在PostFilter的LeftAntiSemi,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
//+------------------------------+----------------+
//| ! mathched | ! semi_probe_ |
//+------------------------------+----------------+
//|NULL v.s. (empty set) | |
//|e.g., NULL NOT IN (empty set) | (left, ONLY) |
//+------------------------------+----------------+
//|NULL v.s. (set) | |
//|e.g., NULL NOT IN (1, 2, 3) | (left, ONLY) |
//+------------------------------+----------------+
//|left_row v.s. (set with NULL) | |
//|e.g., 10 NOT IN (1, NULL, 3) | (left, ONLY) |
//+------------------------------+----------------+
//|normal false | |
//|e.g., 10 NOT IN (1, 2, 3) | (left, ONLY) |
//+------------------------------+----------------+
RightSemi
RightSemi流程類似RightOuter,但其并不真正輸出左表和右表元組,根據下面真值表輸出右表元組和NULL/TRUE/FALSE值,或僅輸出右表元組,或不輸出等。 對于存在PostFilter的RightSemi,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
//+------------------------------+--------------+----------------+
//| mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//| normal true | (right, TRUE)| (right, ONLY) |
//+------------------------------+--------------+----------------+
//+------------------------------+--------------+----------------+
//| ! mathched | semi_probe_ | ! semi_probe_ |
//+------------------------------+--------------+----------------+
//|NULL v.s. (empty set) | | |
//|e.g., NULL IN (empty set) |(right, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|NULL v.s. (set) | | |
//|e.g., NULL IN (1, 2, 3) |(right, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|left_row v.s. (set with NULL) | | |
//|e.g., 10 IN (1, NULL, 3) |(right, NULL) | NO_OUTPUT |
//+------------------------------+--------------+----------------+
//|normal false | | |
//|e.g., 10 IN (1, 2, 3) |(right, FALSE)| NO_OUTPUT |
//+------------------------------+--------------+----------------+
RightAntiSemi
RightAntiSemi流程類似RightOuter,但其并不真正輸出左表和右表元組,而根據下面真值表僅輸出右表元組,或不輸出等。 對于存在PostFilter的RightAntiSemi,若匹配左表HashMap后還需要經過PostFilter來判斷其是否真正匹配。
//+------------------------------+----------------+
//| ! mathched | ! semi_probe_ |
//+------------------------------+----------------+
//|NULL v.s. (empty set) | |
//|e.g., NULL NOT IN (empty set) | (right, ONLY) |
//+------------------------------+----------------+
//|NULL v.s. (set) | |
//|e.g., NULL NOT IN (1, 2, 3) | (right, ONLY) |
//+------------------------------+----------------+
//|left_row v.s. (set with NULL) | |
//|e.g., 10 NOT IN (1, NULL, 3) | (right, ONLY) |
//+------------------------------+----------------+
//|normal false | |
//|e.g., 10 NOT IN (1, 2, 3) | (right, ONLY) |
//+------------------------------+----------------+
實現
HashMatch將內存與磁盤處理、不同join類型與是否帶PostFilter功能均抽象為一套處理流程。
HashMap
HashMap為散列表實現,主要提供插入和查找兩個接口:
size_t PutValue(uint64_t hash_code, const char *key_buf, uint64_t key_len, const uint64_t tuple);
ValueIterator FindValue(uint64_t hash_code, const char *key_data, const uint64_t key_len, const bool need_mark = false);
同時提供兩個迭代器用于遍歷整張散列表:
enum IteratorType { Normal = 0, NoneMark = 1, Mark = 2, END };
class TableIterator {
public:
void Next();
bool IsValid() const { return valid_; }
ValueIterator GetIterator(IteratorType type);
private:
IteratorType type_ = IteratorType::END;
};
class ValueIterator {
struct Listener {
virtual void BlockEvent() {}
};
void SetListener(Listener *listener) { listener_ = listener; }
void Next();
bool IsValid() const { return valid_; }
private:
IteratorType type_ = IteratorType::Normal;
Listener *listener_ = nullptr;
};
TableIterator迭代器用于遍歷HashMap的全部KV;而ValueIterator迭代器用于遍歷當前KV的全部數據塊;其中TableIterator與ValueIterator均提供Normal/NoneMark/Mark三種迭代模型,用于不同join類型。
由于TableIterator用于遍歷全部HashMap,其主要用于LEFT_OUTER/LEFT_ANTI_SEMI/LEFT_SEMI等。
Info
HMInfo主要用于存放所有Worker共享全局數據,如內存分區號與分區對象,其中分區中保存當前分區合并后的HashMap、左右表chunks集合與左右表臨時文件對象。每一個分區HMPartition均有單獨臨時文件,通過pread/pwrite函數與offset原子變量來提供給所有Worker進行原子讀寫操作。
Local Info
HMLocalInfo主要用于存放當前Worker的私有數據,如當前Worker的內存分區號與左右分區對象HMLocalPartitions,其中每一分區對象HMLocalPartition保存當前Worker當前Partition的HashMap、chunks集合與正在寫待完整的chunk對象等。
Fetcher
HashMatch存在向左表取數據、右表取數據、從Info/LocalInfo內存chunks集合中讀取chunk對象、從臨時文件中讀取并序列化出chunk對象等多種不同fetch方式,雖然方式各異但其fetch數據均為chunk對象,其用于Build與Probe階段。
class HashMatchFetcher final {
bool Fetch(Context &context, TupleChunk *&mat_chunk);
// fetch from left or right child
bool FetchMem(Context &context);
// fetch from info chunks (include load from temp files)
bool FetchDisk(Context &context, TupleChunk *&mat_chunk);
size_t part_index_ = 0;
TupleChunk chunk_;
};
Builder
Builder類用于Build階段DoBuild操作,統一處理內存分區與磁盤分區的build功能。
左表構建(MemBuilder類):從左表中fetch到數據保存到chunks集合中,若該元組屬于內存分區的則插入HashMap,屬于磁盤分區則將chunk數據落盤。
磁盤構建(DiskBuilder類):從臨時表中讀取chunks集合,并構建該分區的HashMap。
class HashMatchBuilder {
void Build();
virtual void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) = 0;
virtual void ChunkDone() = 0;
HashMatchFetcher fetcher_;
};
class HashMatchMemBuilder final : public HashMatchBuilder {
void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) override;
void ChunkDone() override;
TupleChunk origin_chunk_;
};
class HashMatchDiskBuilder final : public HashMatchBuilder {
void ChunkResult(const size_t offset, const bool is_null,
const size_t part_index, const uint64_t hash_val,
const char *key_data, const size_t key_len) override;
void ChunkDone() override;
const size_t part_index_ = 0;
};
Prober
Probe階段ProbeMem/ProbeLeft/ProbeDisk操作均由Prober類完成,其統一處理內存分區與磁盤分區的probe功能。
class HashMatchProber final {
public:
void ProbeResult(TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
bool ProbeIter(Context &context, TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
bool Probe(Context &context, TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size, const bool disk);
private:
const HashMatch &join_;
HMInfo *info_ = nullptr;
HMLocalInfo *local_info_ = nullptr;
size_t part_index_ = 0;
PostFilter filter_;
LeftIterator lit_;
RightIterator rit_;
TraverseIterator tit_; // used for probe left
};
HashMatchProber::PostFilter類主要對帶PostFilter的Join類型的Probe后期進行處理,即經過Probe得到結果集還需要再由PostFilter處理后才能確定其是否真正匹配。
struct PostFilter final {
bool Evaluate();
bool Probe(TupleChunk *tpchunk, size_t &chunk_off, const size_t chunk_size);
const HashMatchProber &prober_;
const RTExprTreePtr &post_expr_;
const HashMatchExpr &left_expr_;
const HashMatchExpr &right_expr_;
std::shared_ptr<Expressions::ExprEnv> post_env_ = nullptr;
};
Prober提供LeftIterator/RightIterator/TraverseIterator等三種類型迭代器。
struct Iterator {
virtual void InitExpr() {}
virtual void FiniExpr() {}
virtual void Init(const size_t part_index);
virtual void Fini();
virtual bool Valid(Context &context) { return false; }
virtual void Next() = 0;
HashMatchProber &prober_;
PostFilter &filter_;
};
HashMatchProber::LeftIterator類使用HashMap::ValueIterator來遍歷HashMap指定key的所有value并根據value定位到指定chunk元組,即對外提供指定key的所有元組功能,統一處理不同join類型和PostFilter功能。
// for Probe
struct LeftIterator final : public Iterator, public ValueIterator::Listener {
void BlockEvent() override;
bool Valid(Context &context) override;
void Next() override;
bool Find(const size_t part_index, const uint64_t hash_val,
const char *key_data, const uint64_t key_len);
ValueIterator it_;
};
HashMatchProber::RightIterator類不斷使用HashMatchFetcher從右表或臨時文件中獲取chunk集合并對外提供遍歷所有chunk所有元組的功能。 所有類型Join均由ProbeMem/ProbeDisk使用RightIterator獲取chunk,然后在該分區中查找HashMap,若找到則構造LeftIterator對象來遍歷該key的所有元組,另外Left類型Join還需要ProbeLeft處理。
// for Probe
struct RightIterator : public Iterator {
bool Valid(Context &context) override;
void Next() override;
HashMatchFetcher fetcher_;
TupleChunk origin_chunk_;
size_t chunk_size_ = 0;
};
HashMatchProber::TraverseIterator類主要用于Left類型Join,如LeftOuter/LeftSemi/LeftAntiSemi等,其使用HashMap::TableIterator遍歷整個HashMap并過濾出已匹配或未匹配的key,然后使用LeftIterator來遍歷該key的所有元組。 Left類型Join處理流程是先使用ProbeMem/ProbeDisk查找HashMap并進行匹配處理,若匹配則在HashMap中標記該KV,然后由ProbeLeft來使用TraverseIterator來整個HashMap并過濾出已匹配或未匹配的KV處理即可。
// for ProbeLeft
struct TraverseIterator final : public Iterator {
bool Valid(Context &context) override;
void Next() override;
TableIterator tit_;
LeftIterator lit_;
IteratorType it_type_ = IteratorType::END;
};
測試
在TPC-H,1 TB數據集,Q14上對比HashJoin與HashMatch的性能,其中HashJoin算法與HashMatch基本一致,主要區分在于中間結果是否物化。
select
100.00 * sum(case
when p_type like 'PROMO%'
then l_extendedprice * (1 - l_discount)
else 0
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from
lineitem,
part
where
l_partkey = p_partkey
and l_shipdate >= date '1995-09-01'
and l_shipdate < date '1995-09-01' + interval '1' month;
在LRU緩存與執行器內存均為100 GB配置下進行查詢:
Query(TPCH1T)
HashJoin
HashMatch
Q14
23.96秒
12.56秒
在LRU緩存與執行器內存均為32 GB配置下進行查詢:
Query(TPCH1T)
HashJoin
HashMatch
Q14
>10分
35.73秒