開發與維運

ClickHouse內核分析-MergeTree的Merge和Mutation機制

引言

ClickHouse內核分析系列文章,繼上一篇文章 MergeTree查詢鏈路 之後,這次我將為大家介紹MergeTree存儲引擎的異步Merge和Mutation機制。建議讀者先補充上一篇文章的基礎知識,這樣會比較容易理解。

MergeTree Mutation功能介紹

在上一篇系列文章中,我已經介紹過ClickHouse內核中的MergeTree存儲一旦生成一個Data Part,這個Data Part就不可再更改了。所以從MergeTree存儲內核層面,ClickHouse就不擅長做數據更新刪除操作。但是絕大部分用戶場景中,難免會出現需要手動訂正、修復數據的場景。所以ClickHouse為用戶設計了一套離線異步機制來支持低頻的Mutation(改、刪)操作。

Mutation命令執行

ALTER TABLE [db.]table DELETE WHERE filter_expr;

ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;

ClickHouse的方言把Delete和Update操作也加入到了Alter Table的範疇中,它並不支持裸的Delete或者Update操作。當用戶執行一個如上的Mutation操作獲得返回時,ClickHouse內核其實只做了兩件事情:

  1. 檢查Mutation操作是否合法;
  2. 保存Mutation命令到存儲文件中,喚醒一個異步處理merge和mutation的工作線程;

兩者的主體邏輯分別在MutationsInterpreter::validate函數和StorageMergeTree::mutate函數中。

MutationsInterpreter::validate函數dry run一個異步Mutation執行的全過程,其中涉及到檢查Mutation是否合法的判斷原則是列值更新後記錄的分區鍵和排序鍵不能有變化。因為分區鍵和排序鍵一旦發生變化,就會導致多個Data Part之間之間Merge邏輯的複雜化。剩餘的Mutation執行過程可以看做是打開一個Data Part的BlockInputStream,在這個BlockStream的基礎上封裝刪除操作的FilterBlockInputStream,再加上更新操作的ExpressionBlockInputStream,最後把數據通過BlockOutputStream寫回到新的Data Part中。這裡簡單介紹一下ClickHouse的計算層實現,整體上它是一個火山模型的計算引擎,數據的各種filer、投影、join、agg都是通過BlockStrem抽象實現,在BlockStream中數據是按照Block進行傳輸處理的,而Block中的數據又是按照列模式組織,這使得ClickHouse在單列的計算上可以批量化並使用一些SIMD指令加速。BlockOutputStream承擔了MergeTree Data Part列存寫入和索引構建的全部工作,我會在後續的文章中會詳細展開介紹ClickHouse計算層中各類功能的BlockStream,以及BlockOutputStream中構建索引的實現細節。

在Mutation命令的執行過程中,我們可以看到MergeTree會把整條Alter命令保存到存儲文件夾下,然後創建一個MergeTreeMutationEntry對象保存到表的待修改狀態中,最後喚醒一個異步處理merge和 mutation的工作線程。這裡有一個關鍵的問題,因為Mutation的實際操作是異步發生的,在用戶的Alter命令返回之後仍然會有數據寫入,系統如何在異步訂正的過程中排除掉Alter命令之後寫入的數據呢?下一節中我會介紹MergeTree中Data Part的Version機制,它可以在Data Part級別解決上面的問題。但是因為ClickHouse寫入鏈路的異步性,ClickHouse仍然無法保證Alter命令前Insert的每條紀錄都被更新,只能確保Alter命令前已經存在的Data Part都會被訂正,推薦用戶只用來訂正T+1場景的離線數據。

異步Merge&Mutation

Batch Insert和Mutation的數據一致性

struct MergeTreePartInfo
{
    String partition_id;
    Int64 min_block = 0;
    Int64 max_block = 0;
    UInt32 level = 0;
    Int64 mutation = 0;   /// If the part has been mutated or contains mutated parts, is equal to mutation version number.
    ...
    /// Get block number that can be used to determine which mutations we still need to apply to this part
    /// (all mutations with version greater than this block number).
    Int64 getDataVersion() const { return mutation ? mutation : min_block; }
    ...    
    bool operator<(const MergeTreePartInfo & rhs) const
    {
        return std::forward_as_tuple(partition_id, min_block, max_block, level, mutation)
            < std::forward_as_tuple(rhs.partition_id, rhs.min_block, rhs.max_block, rhs.level, rhs.mutation);
    }
}

在具體展開MergeTree的異步merge和mutation機制之前,先需要詳細介紹一下MergeTree中對Data Part的管理方式。每個Data Part都有一個MergeTreePartInfo對象來保存它的meta信息,MergeTreePartInfo類的結構如上方代碼所示。

  1. partition_id:表示所屬的數據分區id。
  2. min_block、max_block:blockNumber是數據寫入的一個版本信息,在上一篇系列文章中講過,用戶每次批量寫入的數據都會生成一個Data Part。同一批寫入的數據會被assign一個唯一的blockNumber,而這個blockNumber是在MergeTree表級別自增的。以及MergeTree在merge多個Data Part的時候會準守一個原則:在同一個數據分區下選擇blockNumber區間相鄰的若干個Data Parts進行合併,不會出現在同一個數據分區下Data Parts之間的blockNumber區間出現重合。所以Data Part中的min_block和max_block可以表示當前Data Part中數據的版本範圍。
  3. level:表示Data Part所在的層級,新寫入的Data Part都屬於level 0。異步merge多個Data Part的過程中,系統會選擇其中最大的level + 1作為新Data Part的level。這個信息可以一定程度反映出當前的Data Part是經歷了多少次merge,但是不能準確表示,核心原因是MergeTree允許多個Data Part跨level進行merge的,為了最終一個數據分區內的數據merge成一個Data Part。
  4. mutation:和批量寫入數據的版本號機制類似,MergeTree表的mutation命令也會被assign一個唯一的blockNumber作為版本號,這個版本號信息會保存在MergeTreeMutationEntry中,所以通過版本號信息我們可以看出數據寫入和mutation命令之間的先後關係。Data Part中的這個mutation表示的則是當前這個Data Part已經完成的mutation操作,對每個Data Part來說它是按照mutation的blockNumber順序依次完成所有的mutation。

解釋了MergeTreePartInfo類中的信息含義,我們就可以理解上一節中遺留的異步Mutation如何選擇哪些Data Parts需要訂正的問題。系統可以通過MergeTreePartInfo::getDataVersion() { return mutation ? mutation : min_block }函數來判斷當前Data Part是否需要進行某個mutation訂正,比較兩者version即可。

Merge&Mutation工作任務

ClickHouse內核中異步merge、mutation工作由統一的工作線程池來完成,這個線程池的大小用戶可以通過參數background_pool_size進行設置。線程池中的線程Task總體邏輯如下,可以看出這個異步Task主要做三塊工作:清理殘留文件,merge Data Parts 和 mutate Data Part。


BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
    ....
    try
    {
        /// Clear old parts. It is unnecessary to do it more than once a second.
        if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
        {
            {
                /// TODO: Implement tryLockStructureForShare.
                auto lock_structure = lockStructureForShare(false, "");
                clearOldPartsFromFilesystem();
                clearOldTemporaryDirectories();
            }
            clearOldMutations();
        }
        ///TODO: read deduplicate option from table config
        if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        if (tryMutatePart())
            return BackgroundProcessingPoolTaskResult::SUCCESS;
        return BackgroundProcessingPoolTaskResult::ERROR;
    }
   ...
}```
需要清理的殘留文件分為三部分:過期的Data Part,臨時文件夾,過期的Mutation命令文件。如下方代碼所示,MergeTree Data Part的生命週期包含多個階段,創建一個Data Part的時候分兩階段執行Temporary->Precommitted->Commited,淘汰一個Data Part的時候也可能會先經過一個Outdated狀態,再到Deleting狀態。在Outdated狀態下的Data Part仍然是可查的。異步Task在收集Outdated Data Part的時候會根據它的shared_ptr計數來判斷當前是否有查詢Context引用它,沒有的話才進行刪除。清理臨時文件的邏輯較為簡單,在數據文件夾中遍歷搜索"tmp_"開頭的文件夾,並判斷創建時長是否超過temporary_directories_lifetime。臨時文件夾主要在ClickHouse的兩階段提交過程可能造成殘留。最後是清理數據已經全部訂正完成的過期Mutation命令文件。

enum class State

{
    Temporary,       /// the part is generating now, it is not in data_parts list
    PreCommitted,    /// the part is in data_parts, but not used for SELECTs
    Committed,       /// active data part, used by current and upcoming SELECTs
    Outdated,        /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
    Deleting,        /// not active data part with identity refcounter, it is deleting right now by a cleaner
    DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
};
###Merge邏輯
StorageMergeTree::merge函數是MergeTree異步Merge的核心邏輯,Data Part Merge的工作除了通過後臺工作線程自動完成,用戶還可以通過Optimize命令來手動觸發。自動觸發的場景中,系統會根據後臺空閒線程的數據來啟發式地決定本次Merge最大可以處理的數據量大小,max_bytes_to_merge_at_min_space_in_pool和max_bytes_to_merge_at_max_space_in_pool參數分別決定當空閒線程數最大時可處理的數據量上限以及只剩下一個空閒線程時可處理的數據量上限。當用戶的寫入量非常大的時候,應該適當調整工作線程池的大小和這兩個參數。當用戶手動觸發merge時,系統則是根據disk剩餘容量來決定可處理的最大數據量。

接下來介紹merge過程中最核心的邏輯:如何選擇Data Parts進行merge?為了方便理解,這裡先介紹一下Data Parts在MergeTree表引擎中的管理組織方式。上一節中提到的MergeTreePartInfo類中定義了比較操作符,MergeTree中的Data Parts就是按照這個比較操作符進行排序管理,排序鍵是(partition_id, min_block, max_block, level, mutation),索引管理結構如下圖所示:
![image.png](https://ucc.alicdn.com/pic/developer-ecology/e4b61e469cb74eaa9ba78f7556c6caa2.png)
自動Merge的處理邏輯,首先是通過MergeTreeDataMergerMutator::selectPartsToMerge函數篩選出本次merge要合併的Data Parts,這個篩選過程需要準守三個原則:
1. 跨數據分區的Data Part之間不能合併;
2. 合併的Data Parts之間必須是相鄰(在上圖的有序組織關係中相鄰),只能在排序鏈表中按段合併,不能跳躍;
3. 合併的Data Parts之間的mutation狀態必須是一致的,如果Data Part A 後續還需要完成mutation-23而Data Part B後續不需要完成mutation-23(數據全部是在mutation命令之後寫入或者已經完成mutation-23),則A和B不能進行合併;

所以我們上面的Data Parts組織關係邏輯示意圖中,相同顏色的Data Parts是可以合併的。雖然圖中三個不同顏色的Data Parts序列都是可以合併的,但是合併工作線程每次只會挑選其中某個序列的一小段進行合併(如前文所述,系統會限定每次合併的Data Parts的數據量)。對於如何從這些序列中挑選出最佳的一段區間,ClickHouse抽象出了IMergeSelector類來實現不同的邏輯。當前主要有兩種不同的merge策略:TTL數據淘汰策略和常規策略。
1. TTL數據淘汰策略:TTL數據淘汰策略啟用的條件比較苛刻,只有當某個Data Part中存在數據生命週期超時需要淘汰,並且距離上次使用TTL策略達到一定時間間隔(默認1小時)。TTL策略也非常簡單,首先挑選出TTL超時最嚴重Data Part,把這個Data Part所在的數據分區作為要進行數據合併的分區,最後會把這個TTL超時最嚴重的Data Part前後連續的所有存在TTL過期的Data Part都納入到merge的範圍中。這個策略簡單直接,每次保證優先合併掉最老的存在過期數據的Data Part。

2. 常規策略:這裡的選舉策略就比較複雜,基本邏輯是枚舉每個可能合併的Data Parts區間,通過啟發式規則判斷是否滿足合併條件,再有啟發式規則進行算分,選取分數最好的區間。啟發式判斷是否滿足合併條件的算法在SimpleMergeSelector.cpp::allow函數中,其中的主要思想分為以下幾點:系統默認對合並的區間有一個Data Parts數量的限制要求(每5個Data Parts才能合併);如果當前數據分區中的Data Parts出現了膨脹,則適量放寬合併數量限制要求(最低可以兩兩merge);如果參與合併的Data Parts中有很久之前寫入的Data Part,也適量放寬合併數量限制要求,放寬的程度還取決於要合併的數據量。第一條規則是為了提升寫入性能,避免在高速寫入時兩兩merge這種低效的合併方式。最後一條規則則是為了保證隨著數據分區中的Data Part老化,老齡化的數據分區內數據全部合併到一個Data Part。中間的規則更多是一種保護手段,防止因為寫入和頻繁mutation的極端情況下,Data Parts出現膨脹。啟發式算法的策略則是優先選擇IO開銷最小的Data Parts區間完成合並,儘快合併掉小數據量的Data Parts是對在線查詢最有利的方式,數據量很大的Data Parts已經有了很較好的數據壓縮和索引效率,合併操作對查詢帶來的性價比較低。
Mutation邏輯
StorageMergeTree::tryMutatePart函數是MergeTree異步mutation的核心邏輯,主體邏輯如下。系統每次都只會訂正一個Data Part,但是會聚合多個mutation任務批量完成,這點實現非常的棒。因為在用戶真實業務場景中一次數據訂正邏輯中可能會包含多個Mutation命令,把這多個mutation操作聚合到一起訂正效率上就非常高。系統每次選擇一個排序鍵最小的並且需要訂正Data Part進行操作,本意上就是把數據從前往後進行依次訂正。
Mutation功能是MergeTree表引擎最新推出一大功能,從我個人的角度看在實現完備度上還有一下兩點需要去優化:
1. mutation沒有實時可見能力。我這裡的實時可見並不是指在存儲上立即原地更新,而是給用戶提供一種途徑可以立即看到數據訂正後的最終視圖確保訂正無誤。類比在使用CollapsingMergeTree、SummingMergeTree等高級MergeTree引擎時,數據還沒有完全merge到一個Data Part之前,存儲層並沒有一個數據的最終視圖。但是用戶可以通過Final查詢模式,在計算引擎層實時聚合出數據的最終視圖。這個原理對mutation實時可見也同樣適用,在實時查詢中通過FilterBlockInputStream和ExpressionBlockInputStream完成用戶的mutation操作,給用戶提供一個最終視圖。
2. mutation和merge相互獨立執行。看完本文前面的分析,大家應該也注意到了目前Data Part的merge和mutation是相互獨立執行的,Data Part在同一時刻只能是在merge或者mutation操作中。對於MergeTree這種存儲徹底Immutable的設計,數據頻繁merge、mutation會引入巨大的IO負載。實時上merge和mutation操作是可以合併到一起去考慮的,這樣可以省去數據一次讀寫盤的開銷。對數據寫入壓力很大又有頻繁mutation的場景,會有很大幫助。

for (const auto & part : getDataPartsVector())

    {
        ...
        size_t current_ast_elements = 0;
        for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
        {
            MutationsInterpreter interpreter(shared_from_this(), it->second.commands, global_context);
            size_t commands_size = interpreter.evaluateCommandsSize();
            if (current_ast_elements + commands_size >= max_ast_elements)
                break;
            current_ast_elements += commands_size;
            commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
        }
        auto new_part_info = part->info;
        new_part_info.mutation = current_mutations_by_version.rbegin()->first;
        future_part.parts.push_back(part);
        future_part.part_info = new_part_info;
        future_part.name = part->getNewName(new_part_info);
        tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
        break;
    }
最後在經過後臺工作線程一輪merge和mutation操作之後,上一節中展示的MergeTree表引擎中的Data Parts可能發生的變化如下圖所示,2020-05-10數據分區下的頭兩個Data Parts被merge到了一起,並且完成了Mutation 37和Mutation 39的數據訂正,新產生的Data Part如紅色所示:
![image.png](https://ucc.alicdn.com/pic/developer-ecology/a9bdd694228a43cc853a00880ed98466.png)

Leave a Reply

Your email address will not be published. Required fields are marked *