前言
在"阿里體量"的大數據生態中,伏羲系統管理著彈內外多個物理集群,超十萬臺物理機, 以及數百萬的 CPU/GPU cores。每天運行在伏羲分佈式平臺上的作業數已經超過千萬, 是業界少有的,單天處理 EB 級別數據分佈式平臺。其中單個作業規模已經高達數十萬計算節點,管理著數百億的邊連接。在過去的十年中,阿里集團以及阿里雲上這樣的作業數目和規模,錘鍊了伏羲分佈式平臺;與此同時,今天平臺上作業的日益多樣化,以及向前再發展的需求,對於伏羲系統架構的進一步演化,也都帶來了巨大挑戰與機遇。
一 背景
1 伏羲 DAG/AM 組件
從較高的層面來看整個分佈式系統的體系架構,物理集群之上運行的分佈式系統,大概可以分成資源管理,作業分佈式調度執行,與多個計算節點的運行這三個層次,如同下圖所示。通常所說的 DAG 組件,指的是每個分佈式作業的中心管理點,也就是 application master (AM)。AM 之所以經常被稱為 DAG (Directional Acyclic Graph, 有向無環圖) 組件,是因為 AM 最重要的責任,就是負責協調分佈式作業的執行。而現代的分佈式系統中的作業執行流程,通常可以通過 DAG 上面的調度以及數據流來描述[1]。相對於傳統的 Map-Reduce[2] 執行模式, DAG 的模型能對分佈式作業做更精準的描述,也是當今各種主流大數據系統 (Hadoop 2.0+, SPARK, FLINK, TENSORFLOW 等) 的設計架構基礎,區別只在於 DAG 的語義是透露給終端用戶,還是計算引擎開發者。
與此同時,從整個分佈式系統 stack 來看, AM 肩負著除了運行 DAG 以外更多的責任。作為作業的中心管控節點,向下其負責與 Resource Manager 之間的交互,為分佈式作業申請計算資源;向上其負責與計算引擎進行交互,並將收集的信息反饋到 DAG 的執行過程中。作為唯一有能力對每一個分佈式作業的執行大局有最精準的瞭解的組件,在全局上對 DAG 的運行做準確的管控和調整,也是 AM 的重要職責。從上圖描述的分佈式系統 stack 圖中,我們也可以很直觀的看出,AM 是系統中唯一需要和幾乎所有分佈式組件交互的組件,在作業的運行中起了重要的承上啟下的作用。這一組件之前在伏羲系統中被稱為 JobMaster (JM), 在本文中我們統一用 DAG 或者 AM 來指代。
2 邏輯圖與物理圖
分佈式作業的 DAG,有兩種層面上的表述:邏輯圖與物理圖。簡單地來說 (over-simplified),終端用戶平時理解的 DAG 拓撲,大多數情況下描述的是邏輯圖範疇:比如大家平時看到的 logview 圖,雖然裡面包含了一些物理信息(每個邏輯節點的併發度),但整體上可以認為描述的就是作業執行流程的邏輯圖。
準確一點說:
- 邏輯圖描述了用戶想要實現的數據處理流程,從數據庫/SQL 的角度(其他類型引擎也都有類似之處,比如 TENSORFLOW) 來看,可以大體認為 DAG 的邏輯圖,是對優化器執行計劃的一個延續。
- 物理圖更多描述了執行計劃映射到物理分佈式集群的具體描述,體現的是執行計劃被物化到分佈式系統上,具備的一些特性:比如併發度,數據傳輸方式等等。
而每個邏輯圖的"物理化",可以有很多等效方式。選擇合適的方式來將邏輯圖變成物理化執行,並進行靈活的調整,是 DAG 組件的重要職責之一。從上圖的邏輯圖到物理圖的映射可以看到,一個圖的物理化過程,實際上就是在回答一系列圖節點以及各個連接邊物理特性的問題,一旦這些問題得到確認,就能得到在分佈式系統上實際執行物理圖。
3 為什麼需要 DAG 2.0 架構升級?
作為從阿里雲飛天系統創建伊始就開始研發的伏羲分佈式作業執行框架,DAG 1.0 在過去十年中支撐了阿里集團的大數據業務,在系統規模以及可靠性等方面都走在了業界領先。另外一方面,作為一個開發了十年的系統,雖然在這個期間不斷的演進,DAG 1.0 在基本架構上秉承了比較明顯的 Map-Reduce 執行框架的一些特點,邏輯圖和物理圖之間沒有清晰的分層,這導致在這個基本架構上要繼續向前走,支持更多 DAG 執行過程中的動態性,以及同時支持多種計算模式等方面,都比較困難。事實上今天在 MaxCompute SQL 線上,離線作業模式以及準實時作業模式 (smode) 兩種執行模式,使用了兩套完全分開的分佈式執行框架,這也導致對於優化性能和優化系統資源使用之間的取捨,很多情況下只能走兩個極端,而無法比較好的 tradeoff。
除此之外,隨著 MaxCompute 以及 PAI 引擎的更新換代以及新功能演進,上層的分佈式計算自身能力在不斷的增強。對於 AM 組件在作業管理,DAG 執行等方面的動態性,靈活性等方面的需求也日益強烈。在這樣的一個大的背景下,為了支撐計算平臺下個 10 年的發展,伏羲團隊啟動了 DAG 2.0 的項目,將從代碼和功能方面,完整替代 1.0 的 JobMaster 組件,實現完全的升級換代。在更好的支撐上層計算需求的同時,也同時對接伏羲團隊在 shuffle 服務 (shuffle service) 上的升級,以及 fuxi master (Resource Manager) 的功能升級。與此同時,站在提供企業化服務的角度來看,一個好的分佈式執行框架,除了支持阿里內部極致的大規模大吞吐作業之外,我們需要支持計算平臺的向外走,支持雲上各種規模和計算模式的需求。除了繼續錘鍊超大規模的系統擴展能力以外,我們需要降低大數據系統使用的門檻,通過系統本身的智能動態化能力,來提供自適應(各種數據規模以及處理模式)的大數據企業界服務,是 DAG 2.0 在設計架構中考慮的另一重要維度。
二 DAG 2.0 架構以及整體設計
DAG 2.0 項目,在調研了業界各個分佈式系統(包括SPARK/FLINK/Dryad/Tez/Tensorlow)DAG 組件之後,參考了 Dryad/Tez 的框架。新一代的架構上,通過邏輯圖和物理圖的清晰分層,可擴展的狀態機管理,插件式的系統管理,以及基於事件驅動的調度策略等基座設計,實現了對計算平臺上多種計算模式的統一管理,並更好的提供了作業執行過程中在不同層面上的動態調整能力。
1 作業執行的動態性
傳統的分佈式作業執行流程,作業的執行計劃是在提交之前確定的。以 SQL 執行為例,一個 SQL 語句,在經過編譯器和優化器後產生執行圖,並被轉換成分佈式系統(伏羲)的執行計劃。
這個作業流程在大數據系統中是比較標準的操作。然而在具體實現中,如果在 DAG 的執行缺乏自適應動態調整能力的話,整個執行計劃都需要事先確定,會使得作業的運行沒有太多動態調整的空間。放在 DAG 的邏輯圖與物理圖的背景中來說,這要求框架在運行作業前,必須事先了解作業邏輯和處理數據各種特性,並能夠準確回答作業運行過程,各個節點和連接邊的物理特性問題,來實現邏輯圖往物理圖的轉換。
然而在現實情況中,許多物理特性相關的問題,在作業運行前是無法被感知的。以數據特性為例,一個分佈式作業在運行前,能夠獲得的只有原始輸入的一些特性(數據量等), 對於一個較深的 DAG 執行而言,這也就意味著只有根節點的物理計劃(併發度選擇等) 是相對合理的,而下游的節點和邊的物理特性只能通過一些特定的規則來猜測。雖然在輸入數據有豐富的 statistics 的前提下,優化器有可能可以將這些 statistics,與執行 plan 中的各個 operator 特性結合起來,進行一些適度的演算:從而推斷在整個執行流程中,每一步產生的中間數據可能符合什麼樣的特性。但這種推斷在實現上,尤其在面對阿里大體量的實際生產環境中,面臨著巨大的挑戰,例如:
實際輸入數據的 statistics 的缺失
即便是 SQL 作業處理的結構化數據,也無法保證其源表數據特性擁有很好的統計。事實上今天因為數據落盤方式多樣化,以及精細化統計方式的缺失,大部分的源表數據都是沒有完整的 statistics 的。此外對於集群內部和外部需要處理的非結構化數據,數據的特性的統計更加困難。
分佈式作業中存在的大量用戶邏輯黑盒
作為一個通用的大數據處理系統,不可避免的需要支持用戶邏輯在系統中的運行。比如 SQL 中常用的 UDF/UDTF/UDJ/Extractor/Outputer 等等,這些使用 Java/Python 實現的用戶邏輯,計算引擎和分佈式系統並無法理解,在整個作業流程中是類似黑盒的存在。以 MaxCompute 為例,線上有超過 20% 的 SQL 作業,尤其是重點基線作業,都包含用戶代碼。這些大量用戶代碼的存在,也造成了優化器在很多情況下無法對中間產出數據的特性進行預判。
優化器預判錯誤代價昂貴
在優化器選擇執行計劃時,會有一些優化方法,在數據符合一定特殊特性的時候,被合理選中能帶來性能優化。但是一旦選擇的前提假設錯誤(比如數據特性不符合預期),會適得其反,甚至帶來嚴重的性能回退或作業失敗。在這種前提下,依據靜態的信息實現進行過多的預測經常得不到理想的結果。
這種種原因造成的作業運行過程中的非確定性,要求一個好的分佈式作業執行系統,需要能夠根據中間運行結果的特點,來進行執行過程中的動態調整。因為只有在中間數據已經在執行過程中產生後,其數據特性才能被最準確的獲得,動態性的缺失,可能帶來一系列的線上問題,比如:
- 物理資源的浪費:比如計算節點事先選擇的資源類型的不合理,或者大量的計算被消耗用於處理後繼會被丟棄的無效數據。
- 作業的嚴重長尾:比如中間數據分佈傾斜或不合理編排,導致一個 stage 上計算節點需要處理的數據量極端化。
- 作業的不穩定:比如由於優化器靜態計劃的錯判,導致不合理的執行計劃無法完成。
而 DAG/AM 作為分佈式作業唯一的中心節點和調度管控節點,是唯一有能力收集並聚合相關數據信息,並基於這些數據特性來做作業執行的動態調整,的分佈式組件。這包括簡單的物理執行圖調整(比如動態的併發度調整),也包括複雜一點的調整比如對 shuffle 方式和數據編排方式重組。除此以外,數據的不同特點也會帶來邏輯執行圖調整的需求:對於邏輯圖的動態調整,在分佈式作業處理中是一個全新的方向,也是我們在 DAG 2.0 裡面探索的新式解決方案。
點,邊,圖的清晰物理邏輯分層,和基於事件的數據收集和調度管理,以及插件式的功能實現,方便了 DAG 2.0 在運行期間的數據收集,以及使用這些數據來系統性地回答,邏輯圖向物理圖轉化過程中需要確定的問題。從而在必要的時候實現物理圖和邏輯圖的雙重動態性,對執行計劃進行合理的調整。在下文中提到幾個落地場景中,我們會進一步舉例說明基於 2.0 的這種強動態性能力,實現更加自適應,更加高效的分佈式作業的執行。
2 統一的 AM/DAG 執行框架
DAG 2.0 抽象分層的點,邊,圖架構上,也使其能通過對點和邊上不同物理特性的描述,對接不同的計算模式。業界各種分佈式數據處理引擎,包括 SPARK, FLINK, HIVE, SCOPE, TENSORFLOW 等等,其分佈式執行框架的本源都可以歸結於 Dryad[1] 提出的 DAG 模型。我們認為對於圖的抽象分層描述,將允許在同一個 DAG 系統中,對於離線/實時/流/漸進計算等多種模型都可以有一個好的描述。在 DAG 2.0 初步落地的過程中,首要目標是在同一套代碼和架構系統上,統一當前伏羲平臺上運行的幾種計算模式,包括 MaxCompute 的離線作業,準實時作業,以及 PAI 平臺上的 Tensorflow 作業和其他的非 SQL 類作業。對更多新穎計算模式的探索,也會有計劃的分步驟進行。
1)統一的離線作業與準實時作業執行框架
首先我們來看平臺上作業數佔到絕大多數的 SQL 線離線作業 (batch job) 與準實時作業 (smode)。前面提到過,由於種種歷史原因,之前 MaxCompompute SQL 線的這兩種模式的資源管理和作業執行,是搭建在兩套完全分開的代碼實現上的。這除了導致兩套代碼和功能無法複用以外,兩種計算模式的非黑即白,使得彼此在資源利用率和執行性能之間無法 tradeoff。而在 2.0 的 DAG 模型上,我們實現了這兩種計算模式比較自然的融合和統一,如下圖所示:
在通過對邏輯節點和邏輯邊上映射不同的物理特性,離線作業和準實時作業都能得到準確的描述:
- 離線作業:每個節點按需去申請資源,一個邏輯節點代表一個調度單位;節點間連接邊上傳輸的數據,通過落盤的方式來保證可靠性;
- 準實時作業:整個作業的所有節點都統一在一個調度單位內進行 gang scheduling;節點間連接邊上通過網絡/內存直連傳輸數據,並利用數據 pipeline來追求最優的性能。
今天在線上,離線模式因為其 on-demand 的資源申請以及中間數據落盤等特點,作業在資源利用率,規模性和穩定性方面都有明顯的優勢。而準實時模式則通過常駐的計算資源池以及 gang scheduling 這種 greedy 資源申請,降低了作業運行過程中的 overhead,並使得數據的 pipelined 傳輸處理成為可能,達到加速作業運行的效果,但其資源使用的特點,也使其無法在廣泛範圍內來支持大規模作業。DAG 2.0 的升級,不僅在同一套架構上統一了這兩種計算模式,更重要的是這種統一的描述方式,使得探索離線作業高資源利用率,以及準實時作業的高性能之間的 tradeoff 成為可能:當調度單位可以自由調整,就可以實現一種全新的混合的計算模式,我們稱之為 Bubble 執行模式。
這種混合 Bubble 模式,使得 DAG 的用戶,也就是上層計算引擎的開發者(比如 MaxCompute 的優化器),能夠結合執行計劃的特點,以及引擎終端用戶對資源使用和性能的敏感度,來靈活選擇在執行計劃中切出 Bubble 子圖。在 Bubble 內部充分利用網絡直連和計算節點預熱等方式提升性能,沒有切入 Bubble 的節點則依然通過傳統離線作業模式運行。回過頭來看,現有的離線作業模式和準實時作業模式,分別可以被描述成 Bubble 執行模式的兩個極端特例,而在統一的新模型之上,計算引擎和執行框架可以在兩個極端之間,根據具體需要,選擇不同的平衡點,典型的幾個應用場景包括:
Greedy Bubble
在可用的資源(集群規模,quota 等)受限,一個大規模作業無法實現 gang scheduling 時,如果用戶對資源利用率不敏感,唯一的目標是儘快跑完一個大規模作業。這種情況下,可以實現基於可用計算節點數目,實施 greedy 的 bubble 切割的策略, 儘量切出大的 bubble。
Efficient Bubble
在作業的運行過程中,節點間的運算可能存在天然的 barrier (比如 sort 運算, 建 hash 表等等)。如果把兩個通過 barrier 邊連接的節點切到一個 bubble 中,雖然作業 e2e 性能上還是會有調度 overhead 降低等帶來的提升,但是因為數據無法完全 pipeline 起來,資源的利用率達不到最高。那麼在對資源的利用率較為敏感時,可以避免 bubble 內部出現 barrier 邊。這同樣是計算引擎可以根據執行計劃做出決定的。
這裡只列舉了兩個簡單的策略,其中還有更多可以細化以及針對性優化的地方。在不同的場景上,通過 DAG 層面提供的這種靈活按照 bubble 執行計算的能力,允許上層計算可以在不同場景上挑選合適的策略,更好的支持各種不同計算的需求。
2)支持新型計算模式的描述
1.0 的執行框架的底層設計受 Map-Reduce 模式的影響較深,節點之間的邊連接,同時混合了調度順序,運行順序,以及數據流動的多種語義。通過一條邊連接的兩個節點,下游節點必須在上游節點運行結束,退出,併產生數據後才能被調度。這種描述對於新型的一些計算模式並不適用。比如對於 Parameter Server 計算模式,Parameter Server(PS) 與 Worker 在運行過程中有如下特點:
- PS 作為 parameter 的 serving entity, 可以獨立運行。
- Worker 作為 parameter 的 consumer 和 updater, 需要 PS 在運行後才能有效的運行,並且在運行過程中需要和 PS 持續的進行數據交互。
這種運行模式下,PS 和 worker 之間天然存在著調度上的前後依賴關係。但是因為 PS 與 worker 必須同時運行,不存在 PS 先退出 worker 才調度的邏輯。所以在 1.0 框架上, PS 與 worker 只能作為兩個孤立無聯繫的 stage 來分開調度和運行。此外所有 PS 與 worker 之間,也只能完全通過計算節點間直連通訊,以及在外部 entity (比如 zookeeper 或 nuwa) 協助來進行溝通與協調。這導致 AM/DAG 作為中心管理節點作用的缺失,作業的管理基本被下放計算引擎上,由計算節點之間自行試圖協調來完成。這種無中心化的管理,對稍微複雜的情況下 (failover 等) 無法很好的處理。
在 DAG 2.0 的框架上,為了更準確的描述節點之間的調度和運行關係,引入並且實現了 concurrent edge 的概念:通過 concurrent edge 連接的上下游節點,在調度上存在先後,但是可以同時運行。而調度的時機也可以靈活配置:可以上下游同步調度,也可以在上游運行到一定程度後,通過事件來觸發下游的調度。在這種靈活的描述能力上, PS 作業可以通過如下這種 DAG 來描述,這不僅使得作業節點間的關係描述更加準確,而且使得 AM 能夠理解作業的拓撲,進行更加有效的作業管理,包括在不同計算節點發生 failover 時不同的處理策略等。
此外,DAG 2.0 新的描述模型,也允許 PAI 平臺上的 Tensorflow/PS 作業實現更多的動態優化,並進行新的創新性工作。在上圖的 dynamic PS DAG 中,就引進了一個額外的 control 節點,這一節點可以在作業運行過程中(包括 PS workload 運行之前和之後),對作業的資源申請,併發度等進行動態的調整,確保作業的優化執行。
事實上 concurrent edge 這個概念,描述的是上下游節點運行/調度時機的物理特性,也是我們在清晰的邏輯物理分層的架構上實現的一個重要擴展。不僅對於 PS 作業模式,在之前描述過的對於通過 bubble 來統一離線與準實時作業計算模式,這個概念也有重要的作用。
三 DAG 2.0 與上層計算引擎的集成
DAG 2.0 作為計算平臺的分佈式運行基座,它的升級換代,為上層的各種計算引擎提供了更多靈活高效的執行能力,而這些能力的落地,需要通過與具體計算場景的緊密結合來實現。接下來通過 2.0 與上層各個計算引擎(包括 MaxCompute 以及 PAI 平臺等)的一些對接場景,具體舉例說明 2.0 新的調度執行框架,如何賦能平臺上層的計算與應用。
1 運行過程中的 DAG 動態調整
作為計算平臺上的作業大戶,MaxCompute 平臺上多種多樣的計算場景,尤其是離線作業中的各種複雜邏輯,為動態圖能力的落地提供了豐富多樣的場景,這裡從動態物理圖和邏輯圖幾個方面討論幾個例子。
1)動態併發度調整
基於作業運行期間中間數據大小進行動態併發度調整,是 DAG 動態調整中最基本的能力。以傳統 MR 作業為例,對於一個靜態 MR 作業而言,能根據讀取數據量來比較準確判斷 Mapper 的併發,但是對於 Reducer 的併發只能簡單推測,比如下圖中對於處理 1TB 的 MR 作業而言,提交作業時,只能根據 Mapper 1000 併發,來猜測給出 500 的 Reducer 併發度,而如果數據在 Mapper 經過大量過濾導致最終只產出 10MB 中間數據時,500 併發度 Redcuer 顯然是非常浪費的,動態的 DAG 必須能夠根據實際的 Mapper 產出來進行 Reducer 併發調整(500 -> 1)。
而實際實現中,最簡單的動態調整,會直接按照併發度調整比例來聚合上游輸出的 partition 數據,如下圖這個併發度從 10 調整到 5 的例子所示,在調整的過程中,可能產生不必要的數據傾斜。
DAG 2.0 基於中間數據的動態併發調整實現,充分考慮了數據 partition 可能存在傾斜的情況,對動態調整的策略進行了優化,使得動態調整的策略後數據的分佈更加均勻,可以有效避免由於動態調整可能引入的數據傾斜。
這種最常見下游併發調整方式是 DAG 2.0 動態物理圖能力的一個直觀展示。在 2.0 中項目中,結合計算引擎的數據處理的特點,還探索了基於源數據的動態併發調整。例如對於最常見的兩個原表數據的 join (M1 join M2 at J), 如果用節點大小來表示其處理數據的的多少,那對於下圖這麼一個例子,M1 處理的是中等的一個數據表(假設 M1 需要併發度為 10),M2 處理的是較大的數據表(併發度為1000),naïve 的執行方式會將按照 10 + 1000 的併發度調度,同時因為 M2 輸出需要全量 shuffle 到 J, J 需要的併發度也會較大 (~1000)。
而實際上,對於這種計算 pattern 而言,M2 需要讀取(並進行處理)的,應該只有能和 M1 的輸出 join 得上的數據,也就是說在考慮了整體執行 cost 後,在這種 M1 期望的輸出數據要比 M2 小的多的情況下,可以先行調度 M1 完成計算,將 M1 輸出數據的 statistics 在 AM/DAG 端進行聚合,然後只挑選出 M2 的有效數據進行處理。這裡 "M2 的有效數據"的選擇本質上是一個 predicate push down 的過程,可以由計算引擎的優化器和運行時聯合進行判斷。也就是說,這種情況下 M2 的併發度調整,是和上層計算緊密結合的。
一個最直觀的例子是,如果 M2 是一個 1000 個分區的分區表,並且分區的 key 和 join 的 key 相同,那麼可以只讀取 M2 能和 M1 輸出 join 上的有效數據的 partition 進行讀取處理。假如 M1 的輸出只包含了 M2 原表數據的 3 個 partition keys, 那麼在 M2 就只需要調度 3 個計算節點來處理這 3 個分區的數據。也就是說 M2 的併發度從默認的 1000,可以降低到 3,這在保證同樣的邏輯計算等效性與正確性的前提下,能大大降低計算資源的消耗,並數倍加速作業的運行。這裡的優化來自幾個方面:
- M2 的併發度 (1000 -> 3) 以及處理的數據量大大降低
- M2 需要 shuffle 到 J 的數據量以及 shuffle 需要的計算量大大降低
- J 需要處理的數據量以及其併發度能大大降低
從上圖這個例子中我們也可以看到,為了保證 M1 -> M2 的調度順序上,DAG 中在 M1 和 M2 間引入了一條依賴邊,而這條邊上是沒有數據流動的,是一條只表示執行先後的依賴邊。這與傳統 MR/DAG 執行框架裡,邊的連接與數據流動緊綁定的假設也有不同,是在 DAG 2.0 中對於邊概念的一個拓展之一。
DAG 執行引擎作為底層分佈式調度執行框架,其直接的對接"用戶"是上層計算引擎的開發團隊,其升級對於終端用戶除了性能上的提升,直接的體感可能會少一點。這裡我們舉一個終端用戶體感較強的具體例子,來展示 DAG 更加動態的執行能力,能夠給終端用戶帶來的直接好處。就是在 DAG 動態能力的基礎上,實現的 LIMIT 的優化。
對於 SQL 用戶來說,對數據進行一些基本的 at hoc 操作,瞭解數據表的特性,一個非常常見的操作是 LIMIT,比如:
SELECT * FROM tpch_lineitem WHERE l_orderkey > 0 LIMIT 5;
在分佈式執行框架上,這個操作對應的執行計劃,是通過將源表做切分後,然後調度起所需數目的 mapper 去讀取全部數據,再將 mapper 的輸出彙總到 reducer 後去做最後的 LIMIT 截斷操作。假設源表 (這裡的 tpch_lineitem) 是一個很大的表,需要 1000 個 mapper 才能讀取,那麼在整個分佈式執行過程中,涉及的調度代價就是要調度 1000 mapper + 1 reducer。這個過程中會有一些上層計算引擎可以優化的地方,比如每個 mapper 可以最多輸出 LIMIT 需要的 record 數目(這裡的 LIMIT 5)提前退出,而不必處理完所有分配給它的數據分片等等。但是在一個靜態的執行框架上,為了獲取這樣簡單的信息,整體 1001 個計算節點的調度無法避免。這給這種 ad hoc query 執行,帶來了巨大的 overhead, 在集群資源緊張的時候尤其明顯。
DAG 2.0 上, 針對這種 LIMIT 的場景,依託新執行框架的動態能力,實現了一些優化,這主要包括幾方面:
- 上游 Exponential start:對於這種大概率下上游 mapper 計算節點不需要全部運行的情況,DAG 框架將對 mapper 進行指數型的分批調度,也就是調度按照 1, 10 ... FULL 的分批執行
- 下游的 Early scheduling:上游產生的 record 數目作為執行過程中的統計數據上報給 AM, AM 在判斷上游已經產生足夠的 record 條數後,則提前調度下游 reducer 來消費上游的數據。
- 上游的 Early termination:下游 reducer 在判斷最終輸出的 LIMIT 條數已經滿足條件後,直接退出。這時候 AM 可以觸發上游 mapper 整個邏輯節點的提前退出(在這種情況下,大部分 mapper 可能都還沒有調度起來),整個作業也能提前完成。
這種計算引擎和 DAG 在執行過程中的靈活動態交互,能夠帶來大量的資源節省,以及加速作業的執行。在線下測試和實際上線效果上,基本上絕大多數作業在 mapper 執行完 1 個計算節點後就能提前退出,而無需全量調起 (1000 vs 1)。
下圖是在線下測試中,當 mapper 併發為 4000 時,上述 query 優化前後的區別:
可以看到,執行時間優化後增速了 5X+, 計算資源的消耗更是減小了數百倍。
這個線下測試結果作為比較典型的例子,稍微有些理想化。為了評估真實的效果,在 DAG 2.0 上線後,選取了 LIMIT 優化生效的線上作業,統計了一星期結果如下:這個優化平均為每個作業節省了 (254.5 cores x min CPU + 207.3 GB x min) 的計算資源,同時每個作業上,平均能節省 4349 個(無效)計算節點的調度。
LIMIT 執行上的改進,作為一個針對特殊場景上實現的優化,涉及了整個 DAG 執行不同策略的調整,這種細化的改進能力,能更直觀的體現 DAG 2.0 架構升級諸多好處:靈活的架構使得 DAG 的執行中擁有了更多的動態調整能力,也能和計算引擎在一起進行更多有針對性的優化。
不同情況下的動態併發度調整,以及具體調度執行策略的動態調整,只是圖的物理特性動態調整的幾個例子。事實上對於物理特性運行時的調整,在 2.0 的框架之上有各種各樣的應用,比如通過動態數據編排/shuffle 來解決各種運行期間的skew問題等,這裡不再做進一步的展開。接下來我們再來看看 DAG 2.0 上對於邏輯圖的動態調整做的一些探索。
2)動態邏輯圖的調整
分佈式 SQL 中,map join 是一個比較常見的優化,其實現原理是在 join 的兩個表中,如果有一個超小的表(可以 fit 到單個計算節點的內存中),那對於這個超小表可以不做 shuffle,而是直接將其全量數據 broadcast 到每個處理大表的分佈式計算節點上。通過在內存中直接建立 hash 表,完成 join 操作。map join 優化能大量減少 (大表) shuffle 和排序,非常明顯的提升作業運行性能。但是其侷限性也同樣顯著:如果"超小表"實際不小,無法 fit 進單機內存,那麼在試圖建立內存中的 hash 表時就會因為 OOM 而導致整個分佈式作業的失敗,而需要重跑。所以雖然 map join 在正確使用時,可以帶來較大的性能提升,但實際上優化器在產生 map join 的 plan 時需要偏保守,很多情況下需要用戶顯式的提供 map join hint 來產生這種優化。此外不管是用戶還是優化器的選擇,對於非源表的輸入都無法做很好的判斷,因為中間數據的大小往往需要在作業運行過程中才能準確得知。
而 map join 與默認 join 方式 (sorted merge join) 對應的其實是兩種不同優化器執行計劃,在 DAG 層面,其對應的是兩種不同的邏輯圖。要支持這種運行過程中根據中間數據特性的動態優化,就需要 DAG 框架具備動態邏輯圖的執行能力,這也是在 DAG 2.0 上開發的 conditional join 功能。
如同下圖展示,在對於 join 使用的算法無法被事先確定的時候,允許優化器提供一個 conditional DAG,這樣的 DAG 同時包括使用兩種不同 join 的方式對應的不同執行計劃支路。在實際執行時,AM 根據上游產出數據量,動態選擇一條支路執行 (plan A or plan B)。這樣子的動態邏輯圖執行流程,能夠保證每次作業運行時都能根據實際作業數據特性,選擇最優的執行計劃。
conditional join 是動態邏輯圖的第一個落地場景,在線上選擇一批適用性作業,動態的 conditional join 相比靜態的執行計劃,整體獲得了將近 3X 的性能提升。
2 混合 Bubble 模式
Bubble 模式是我們在 DAG 2.0 架構上探索的一種全新的作業運行方式,通過對於 bubble 大小以及位置的調整,可以獲取性能和資源利用率的不同 tradeoff 點。這裡通過一些更加直觀的例子,來幫助大家理解 Bubble 執行在分佈式作業中的實際應用。
在上圖的 TPCH Q21 上。比如在 Q21 上,我們看到了通過將作業被切分為三個 "bubble",數據能夠有效的在節點之間 pipeline 起來,並且通過熱點節點實現調度的加速。最終消耗的資源數 (cpu * time) 是準實時作業的 35%, 而性能則與一體化調度的準實時作業非常相近 (96%), 比離線作業性能提升 70% 左右。
在標準 TPCH 1TB 全量測試中,混合 bubble 模式體現出了相比離線和準實時的一體化模式 (gang scheduling) 更好的資源/性能 tradeoff。選用 Greedy Bubble(size = 500) 的策略,bubble 相比離線作業性能提升了 2X (資源消耗僅增加 17%,具體數值略)。同時與一體化調度的準實時作業比較,bubble 執行在只消耗了 40% 不到的資源 (cpu * time) 的前提下,其性能達到了準實時作業的 85% (具體數值略)。可以看到,這種新型的 bubble 執行模式,允許我們在實際應用中獲取很好的性能與資源的平衡,達到系統資源有效的利用。Bubble 執行模式目前正在阿里集團內部全量上線中,我們在實際線上的作業也看到了與 TPCH 測試非常相似的效果。
如同之前所述,混合 bubble 模式支持了不同切分策略,這裡提供的只是一種切分策略上的效果。在與上層計算引擎 (e.g., MaxCompute 優化器) 緊密結合時,這種 DAG 分佈式調度 bubble 執行的能力,能夠允許我們根據可用資源和作業計算特點,來尋找性能與資源利用率的最佳平衡點。
四 資源的動態配置和動態管理
傳統分佈式作業對於每個計算節點需要的資源類型 (CPU/GPU/Memory) 和大小都是預先確定下來的。然而在作業運行之前,對計算節點資源類型和大小的合理選擇,是比較困難的。即便對於計算引擎的開發者,也需要通過一些比較複雜的規則,才能預估出大概合理的配置。而對於需要將這些配置透明給終端用戶的計算模式,終端用戶要做出選擇就更加困難。
在這裡以 PAI 的 Tensorflow (TF) 作業為例,描述 DAG 2.0 的資源動態配置能力,怎樣幫助平臺的 TF 作業選擇合理的 GPU 類型資源以及提高 GPU 資源的利用率。相比 CPU 而言,GPU 作為一種較新的計算資源,硬件的更新換代較快,同時普通終端用戶對於其計算特點也相對不瞭解。因此終端用戶在指定 GPU 資源類型時,經常存在著不合理的情況。與此同時,GPU 在線上又是相對稀缺資源。今天在線上,GPU 申請量經常超過集群 GPU 總數,導致用戶需要花很長時間排隊等待資源。而另外一方面,集群中 GPU 的實際利用率卻偏低,平均只有 20% 左右。這種申請和實際使用之間存在的 Gap,往往是由於用戶作業配置中,事先指定的 GPU 資源配置不合理造成。
在 DAG 2.0 的框架上,PAI TF GPU 作業 (見 session 2.2.2 的 dynamic PS DAG) 引入了一個額外的"計算控制節點",可以通過運行 PAI 平臺的資源預測算法,來判斷當前作業實際需要的 GPU 資源類型,並在必要的時候,通過向 AM 發送動態事件,來請求修改下游 worker 實際申請的 GPU 類型。這其中資源預測算法,可以是根據算法的類型,數據的特點,以及歷史作業信息來做 HBO (history based optimization),也可以通過 dry-run 的方法來進行試運行,以此確定合理的資源類型。
具體實現上,這個場景中 control stage 與 Worker 之間通過 concurrent edge 連接,這條邊上的調度觸發條件是在 control stage 已經做出資源選擇決定之後,通過其發出的事件來觸發。這樣的作業運行期間的動態資源配置,在線上功能測試中,帶來了 40% 以上的集群 GPU 利用率提升。
作為物理特性一個重要的維度,對計算節點的資源特性在運行時的動態調整能力,在 PAI 以及 MaxCompute 上都能找到廣泛的應用。以 MaxCompute SQL 為例,對於下游節點的 CPU/Memory 的大小,可以根據上游數據的特點進行有效的預判;同時對於系統中發生的 OOM,可以嘗試自動調高 OOM 後重試的計算節點的內存申請,避免作業的失敗,等等。這些都是在 DAG 2.0 上新的架構上實現的一些新功能,在這裡不做具體的展開。
五 工程化與上線
作為分佈式系統的底座,DAG 本身的動態能力以及靈活度,在與上層計算引擎結合時,能夠支持上層計算實現更加高效準確的執行計劃,在特定場景上實現數倍的性能提升以及對資源利用率的提高。在上文中,也集中介紹了整個 DAG 2.0 項目工作中,開發實現的一些新功能與新的計算模式。除了對接計算引擎來實現更高效的執行計劃,調度本身的敏捷性,是 AM/DAG 執行性能的基本素質。DAG 2.0 的調度決策均基於事件驅動框架以及靈活的狀態機設計來實現,在這裡也交出 DAG 2.0 在基本工程素養和性能方面的成績單:
這裡選用了最簡單的 Map-Reduce (MR) 作業為例,對於這種作業,調度執行上並無太多可以取巧的地方,考驗的是調度系統本身的敏捷度和整個處理流程中的全面去阻塞能力。這個例子也凸顯了 DAG 2.0 的調度性能優勢,尤其作業規模越大,優勢越發明顯。此外,對於更接近線上的 work-load 的場景,在 TPCDS 標準 benchmark 中,當執行計劃和運行邏輯完全相同時,2.0 (未打開動態執行等功能)的高性能調度也給作業帶來了顯著的提升。
最後,對於一個從頭到尾完整替代原有系統的新一代的全新框架,怎樣無縫對接線上場景,實現大規模的上線,是一個同樣重要(甚至更重要)的話題,也是對一個實際生產系統進行升級,與小範圍的新系統 POC 之間最大的區別。今天的伏羲調度系統,每天支撐著阿里集團內外大數據計算平臺千萬的分佈式作業。DAG/AM 這一核心分佈式調度執行組件的更新換代,要完整替換線上已經支撐了大數據業務 10 年的分佈式生產系統,而不造成現有場景的失敗,這需要的不僅僅是架構和設計上的先進性。如何在"飛行中換引擎", 保質保量的實現系統升級,其挑戰完全不亞於新的系統架構本身的設計。要實現這樣的升級,擁有一個穩固的工程基座,以及測試/發佈框架,都是不可或缺的。沒有這樣子的底座,上層的動態功能與新計算模式,都無從談起。
目前 DAG 2.0 目前已全面覆蓋了阿里集團 MaxCompute 所有線上的 SQL 離線作業和所有準實時作業,以及 PAI 平臺的所有 Tensorflow 作業(CPU 和 GPU)+ PyTorch 作業。每天支撐數千萬分佈式作業的運行,並經受了 19 年雙11 /雙12 的考驗。在面對兩次大促創歷史記錄的數據洪峰(相比 18 年增長 50%+)壓力下,保障了集團重點基線在大促當天準時產出。與此同時,更多種類型的作業(例如跨集群復製作業等等)正在遷移到 DAG 2.0 的新架構,並且依託新的架構升級計算作業本身的能力。DAG 2.0 的框架基座的上線,為各條計算線上依託其實現新功能打下了堅實基礎。
六 展望
伏羲 DAG 2.0 核心架構的升級,旨在夯實阿里計算平臺長期發展的基礎,並支持上層計算引擎與分佈式調度方面結合,實現各種創新和創建新計算生態。架構的升級本身是向前邁出的重要一步,但也只是第一步。要支撐企業級的,各種規模,各種模式的全頻譜計算平臺,需要將新架構的能力和上層計算引擎,以及伏羲系統其他組件進行深度整合。依託阿里的應用場景,DAG 2.0 除了在作業規模等方面繼續在業界保持領先之外,架構和功能上也有許多創新, 比如前面我們已經介紹過的:
- 在業界首次在分佈式執行框架上,實現了執行過程中邏輯圖和物理圖的雙重動態可調;
- 通過 Bubble 機制實現了混合的計算模式,探索資源利用率和作業性能間的最佳平衡。
除此之外,2.0 更加清晰的系統封層架構帶來的一個重要改變就是能有利於新功能更快速開發,提速平臺和引擎向前創新。由於篇幅有限,本文只能由點及面地介紹一部分新功能與新計算模式,還有許許多多已經實現,或正在開發中的功能,在業界都是全新的探索,暫時不做進一步展開,比如:
- 準實時作業體系架構的整體升級: 資源管理與多作業管理的解耦,支持準實時作業場景上的動態圖功能。
- 常駐的單 container 多 slot 執行的 cache-aware 查詢加速服務 (MaxCompute 短查詢)
- 基於狀態機的作業節點管理以及失敗下的智能重跑機制。
- 動態可定義的 shuffle 方式:通過 recursive shuffle 等方式動態解決線上大規模作業中的 in-cast 問題。
- 基於 adaptive 的中間數據動態切分與聚合,解決實際分佈式作業中各種數據傾斜問題
- 支持 PAI TF GPU 作業的多執行計劃選項。
- 通過 DAG 執行過程中與優化器的交互,實現漸進式的交互式動態優化。
- 支持 Imperative 語言特性,通過 DAG 的動態自增長等能力,對接 IF/ELSE/LOOP 等語義。
核心調度底座能力的提升,能夠為上層的各種分佈式計算引擎提供真正企業級的服務能力,提供必須的彈藥。而這些計算調度能力提升帶來的紅利,最終會通過 MaxCompute 和 PAI 等引擎,透傳到終端的阿里雲計算服務的各個企業。在過去的十年,阿里業務由內向外的驅動,鍛造了業界規模最大的雲上分佈式平臺。而通過更好服務集團內部以及雲上的企業用戶,我們希望能夠提升平臺的企業級服務能力,可以完成由內向外,到由外至內的整個正向循環過程,推動計算系統螺旋式上升的不斷創新,並通過性能/規模,以及智能化自適應能力兩個維度方面的推進,降低分佈式計算服務的使用門檻,真正實現大數據的普惠。
免費下載大數據實戰電子書《領軍行業大數據及 AI 實戰》
雲上不同行業企業大數據及 AI 典型場景最佳實踐全揭祕。深度剖析大數據在直播、多媒體、新零售、物聯網、金融科技、社交、家居服務、互聯網、泛娛樂 9 個行業實戰場景,通過企業真實案例,助你速懂企業大數據實踐。
識別下方二維碼立即下載: