大數據

MaxCompute執行引擎核心技術DAG揭祕

作為業界少有的EB級別數據分佈式平臺,MaxCompute系統每天支撐上千萬個分佈式作業的運行。在這個量級的作業數目上,毫無疑問平臺需要支撐的作業特點也多種多樣:既有在"阿里體量"的大數據生態中獨有的包含數十萬計算節點的超大型作業,也有中小規模的分佈式作業。同時不同用戶對於不同規模/特點的作業,在運行時間,資源使用效率,數據吞吐率等方面,也有著不同的期待。

image.png

Fig.1 MaxCompute線上數據分析

基於作業的不同規模,當前MaxCompute平臺提供了兩種不同的運行模式,下表對於這兩種模式做了總結對比:

image.png

Fig.2 離線(batch)模式 vs 一體化調度準實時(smode)模式

從上圖可以看到,離線作業和一體化調度的準實時作業,在調度方式,數據傳輸,使用資源來源等多個方面,都有非常顯著的區別。可以說,這兩種運行方式分別代表了在海量數據場景上按需申請資源來優化吞吐量和資源利用率,以及在處理中等(少量)數據時通過計算節點的全量預拉起來(以及數據直傳等手段加速)降低執行時延的兩個極端。而這些區別,最終會通過執行時間和作業資源利用率等方面體現出來。很顯然,以高Throughput為主要優化目標的離線模式,和以追求低Latency的準實時模式,在各方面的性能指標會有很大的區別。比如以1TB-TPCH標準benchmark為例,此報告執行時間(性能)和資源消耗兩個維度來做比較。可以看到,準實時的(SMODE)在性能上有著非常明顯的優勢(2.3X),但是這樣的性能提升也並不是沒有代價的。在TPCH這個特定的場景上,一體化執行的SMODE模式,在獲取了2.3X性能提升的同時,也消耗了3.2X的系統資源(cpu * time)。

image.png

Fig.3 性能/資源消耗比較:離線(batch)模式 vs 一體化調度準實時(smode)模式

這個觀察結論其實並不意外,或者從某種程度上是by design的。拿下圖一個典型SQL產生的DAG來看,所有計算節點都在作業提交伊始就被拉起,雖然這樣的調度方式允許數據得以(在需要的時候)pipeline起來,從而可能加速數據的處理。但並不是所有的執行計劃裡的所有上下游計算節點都可以有理想化的pipelined dataflow。事實上對於許多作業而言,除了DAG的根節點(下圖中的M節點)以外,下游的計算節點在某種程度上都存在著一定程度的浪費。

image.png

Fig.4 一體化調度準實時(smode)模式下,可能的資源使用低效

這種空轉造成的資源使用的低效,在數據的處理流程上存在barrier算子而無法pipeline,以及在DAG圖比較深的情況下會尤為明顯。當然對於希望極致優化作業運行時間的場景而言,通過更多的資源消耗,來獲取極致的性能優化,在一些場景上是有其合理性的。 事實上,在一些business-critical的在線服務系統中,為了保證服務總是能迅速響應並處理峰值數據,平均個位數的CPU利用率也並非少見。但是對於計算平臺這種量級的分佈式系統,能否在極致性能以及高效的資源利用率之間,獲取一個更好的平衡呢

答案是肯定的。這就是我們在這裡要介紹的混合計算模式:Bubble Execution

1. Bubble Execution 概述

DAG框架的核心架構思想,在於對執行計劃的邏輯層與物理層的清晰分層設計。物理執行圖是通過對邏輯圖中的節點、邊等的物理特性(如數據傳輸介質,調度時機,資源特性等)的物化來實現的。對比在Fig.2中描述的batch模式和smode模式,DAG提供了在一套靈活的調度執行框架之上,統一離線模式和準實時一體化執行模式的實現。如同下圖所示,通過調整計算節點和數據連接邊的不同物理特性,不僅能對現有的兩種計算模式做清晰的表述,在對其進行更通用化的擴展後,還可以探索一種全新的混合運行模式,也就是Bubble Execution。

image.png

Fig.5 DAG框架上的多種計算模式

直觀上來理解,如果我們把一個Bubble當作一個大的調度單位,Bubble內部的資源一起申請運行,並且內部上下游節點的數據均通過網絡/內存直連傳輸。與之相對的,Bubbles之間連接邊上的數據傳輸,則通過落盤方式來傳輸。那麼離線和準實時作業執行,其實可以認為是Bubble執行的兩個極端場景:離線模式可以認為是每個stage都單獨作為single-bubble的特例,而準實時框架則是將作業所有計算節點都規劃到一個大Bubble內部,來做一體化調度執行的另一個極端。DAG AM已經將兩種計算模式統一到一套調度執行infra之上。使得在兩種模式上進行優點互補成為可能,為引入Bubble Execution奠定了基礎。

Bubble Execution通過靈活自適應的子圖(Bubble)切割,在現有的兩個極端之間,提供了一種選取更細粒度,更通用的調度執行方法,達到作業性能和資源利用率之間獲取優化的tradeoff的方法。在根據輸入數據量、算子特性、作業規模等信息進行分析後,DAG的Bubble執行模式可以將一個離線作業切分出多個Bubbles,在Bubble內部充分利用網絡/內存直連和計算節點預熱等方式提升性能。這種切分方式下,一個DAG運行圖中的計算節點,可以都被切入某個Bubble,根據所在DAG中的位置被切入不同Bubbles,還可以完全不被切入任何Bubble(依然以傳統離線作業模式運行)。這種高度靈活的混合運行模式,使整個作業的運行能更加靈活的自適應線上多種多樣作業的特點,在實際生產中具有重要的意義:

  • Bubble模式使更多作業的加速成為可能:一體化調度的準實時作業具有基於整體規模(線上默認2000)的"一刀切"式的准入條件。這一方面是出於有限資源的公平使用,另一方面也是為了控制節點failure帶來的cost。但對於中大型作業,雖然整體規模可能超過准入門限,但是其內部的不同子圖,有可能是規模合適,且可以通過數據pipeline等方法來加速的。此外線上部分計算節點由於其本身的特性(比如包含UDF等用戶邏輯需要安全沙箱),無法使用預熱的準實時資源池執行,而當前非黑即白的模式,會使得一個作業中,只要包含一個這種計算節點,整個作業都無法使用加速模式執行。Bubble模式能較好的解決這些問題。
  • Bubble模式將enable線上兩個資源池的打通:當前離線資源(cold)和準實時資源池(warm)作為兩種特性不同的線上資源,完全隔離,各自管理。這種分離的現狀,可能導致資源的浪費。比如對於大規模作業,因為完全無法利用準實時資源池,排隊等待離線資源,而同時準實時資源池可能正處於空閒狀態,反之亦然。Bubble模式能通過在作業內部拉通不同資源的混合使用,使得兩者各自補充,削峰填谷。
  • Bubble模式可以整體上提高資源的利用率:從資源利用的角度來看,對於可以滿足準實時模式准入的中型作業,由於準實時模式一體式調度拉起的運行模式,雖然運行速度能有所提升,但客觀上會造成一定程度資源的空轉與浪費(尤其是DAG圖較深以及計算邏輯有barrier時)。這種情況下,按照節點數目,計算barrier等條件,將一體化模式拆解成多個Bubble。這能夠有效的減少節點大量的空轉消耗,而且在拆分條件合理的情況下,性能方面的損失也可以做到較低。
  • Bubble模式能有效降低單個計算節點failure帶來的代價:一體化的準實時模式執行,由於其數據pipeline的特性,作業的容錯粒度和其調度粒度是緊密掛鉤的:都是all-in-one。也就是說,只要有一個節點運行失敗,整個作業都要重新運行。因為作業規模越大,運行過程中可能有節點失敗的概率也就越大,這樣的failover粒度無疑也限制了其能支持的最大作業規模。而Bubble模式則提供了一個更好的平衡點:單個計算節點的失敗,最多隻影響同處於一個Bubble的節點。此外Bubble模式對於各種failover做了細粒度的各種處理,我們將在下文描述。

我們可以通過標準的TPCH-1TB測試benchmark來直觀評測Bubble執行模式的效果。在上層計算引擎(MaxCompute優化器以及runtime等)保持不變,並且Bubble的大小維持在500(具體Bubble切分規則下文介紹)時,做一下Bubble執行模式與標準離線模式,以及準實時模式,在性能(Latency) 以及資源消耗(cpu * time)兩個方面的比較:

image.png

Fig.6.a 性能(Latency)比較:Bubble模式 vs 離線(batch)模式 vs 一體化調度準實時(smode)模式

從運行時間來看,Bubble模式顯然要遠優於離線模式(整體2X的性能提升),而較準實時的一體化調度模式而言,Bubble的執行性能也並沒有太明顯的下降。當然在一些數據可以非常有效利用pipeline處理的query(比如Q5, Q8等),準實時作業還是有一定的優勢。但SMODE作業在執行時間上的優勢並不是沒有代價的,如果同時考慮資源消耗,在下圖中,我們可以看到,準實時作業的性能提升是建立在資源消耗遠遠大於Bubble模式的前提之上的。而Bubble在性能遠優於離線模式的同時,其資源消耗,則整體上是相近的。image.png

Fig.6.b 資源消耗(cpu * time)比較:

Bubble模式 vs 離線(batch)模式 vs 一體化調度準實時(smode)模式

綜合起來看,Bubble Execution可以很好的結合batch模式和準實時模式的優點:

  • 在執行時間層面,對於TPCH測試集中的任意query,bubble模式的執行時間都比batch模式要短,整體上22個Queries總耗時縮減將近2X,接近service mode模式的耗時;
  • 在資源消耗層面,bubble模式基本上和batch模式相當,相比於service mode模式有大幅度的減少,整體縮減2.6X。

image.png

Fig.6.c Bubble模式與離線/準實時模式的整體比較

值得說明的是,在上面的TPCH Benchmark比較中,我們把Bubble切分條件簡單化了,也就是整體上之限制bubble的大小在500,而沒有充分考慮barrier等條件,如果在切分bubble的時候進一步調優,比如對於數據可以有效pipeline起來的節點,儘量保證切分在bubble內部,那作業的執行性能和資源利用率等方面都還可以進一步得到的提升,這是我們在實際生產系統上線過程中會注重考慮的。具體上線的效果見Section 3。

在瞭解了Bubble執行模式的整體設計思想與架構後,接下來展開來講一下具體Bubble模式的實現細節,以及將這種全新的混合執行模式推上線所需要的具體工作。

2. Bubble的切分與執行

採用Bubble Execution的作業(以下簡稱Bubble作業)和傳統的離線作業一樣,會通過一個DAG master(aka. Application Master)來管理整個DAG的生命週期。AM負責對DAG進行合理的bubble切分,以及對應的資源申請和調度運行。整體而言,Bubble內部的計算節點,將按照計算加速度原則,包括同時使用預拉起的計算節點以及數據傳輸通過內存/網絡直傳進行pipeline加速。而不切在bubble內部的計算節點則通過經典離線模式執行,不在bubble內部的連接邊(包括橫跨bubble boundary的邊)上的數據,均通過落盤方式進行傳輸。

image.png

Fig.7 混合Bubble執行模式

Bubble切分方法,決定了作業的執行時間和資源利用率。需要根據計算節點的併發規模,節點內部算子屬性等信息綜合考慮。而在切分出bubble之後,Bubble的執行則涉及到節點的執行,與數據pipeline/barrier的shuffle方式怎麼做到有機的結合,這裡分開做一下描述。

2.1 Bubble 切分原理

Bubble Execution的核心思想在於將一個離線作業拆分成多個Bubble來執行。為了切分出有利於作業整體高效運行的bubble,有幾個因素需要綜合考慮:

  • 計算節點內部算子特性:對於同時拉起bubble所有計算節點的調度模式而言,數據在bubble內部的上下游節點之間能否有效的進行pipeline處理,很大程度上決定了在bubble內部,下游節點是否會因處於空轉狀態帶來資源浪費。所以在切分bubble的邏輯中,當節點包含barrier特性的算子而可能阻塞數據的pipeline時,將考慮不將該節點與其下游切入同一個bubble。
  • 單個Bubble內部計算節點數目的多少:如同之前討論的,一體化的資源申請/運行,當包含的計算節點過多時,可能無法申請到資源,或者即使能申請到其failure代價也可能無法控制。限定Bubble的大小,可以避免過大的一體化運行帶來的負面作用。
  • 聚合計算節點,切割Bubble的迭代方向:考慮到bubble大小的限制,從上而下切分bubble與從下而上切分bubble兩種方式,可能導致切分的結果的不同。對於線上大部分作業而言,處理的數據往往呈倒三角型,對應的DAG也大多數是倒三角形態,所以默認採用自底向上的算法來切割bubble,也就是從距離root vertex最遠的節點開始迭代。

在上述的幾個因素中,算子的barrier屬性由上層計算引擎(e.g., MaxCompute的optimizer)給出。一般而言,依賴global sort操作的算子(比如MergeJoin, SorteAggregate等),會被認為會造成數據阻塞(barrier),而基於hash特性操作的算子則對於pipeline更加友好。對於單個Bubble內部允許的計算節點數目,根據我們對線上準實時作業特點的分析和Bubble作業的實際灰度實驗,選定的默認上限在500。這是一個在大多數場景下比較合理的值,既能保證比較快速的拿到全量資源,同時由於處理數據量和DoP基本成正相關關係,這個規模的bubble一般也不會出現內存超限的問題。當然這些參數和配置,均允許作業級別通過配置進行微調,同時Bubble執行框架也會後繼提供作業運行期間動態實時調整的能力。

在DAG的體系中,邊連接的物理屬性之一,就是邊連接的上下游節點,是否有運行上的前後依賴關係。對於傳統的離線模式,上下游先後運行,對應的是sequential的屬性,我們稱之為sequential edge。而對於bubble內部的上下游節點,是同時調度同時運行的,我們稱連接這樣的上下游節點的邊,為concurrent edge。可以注意到,這種concurrent/sequential的物理屬性,在bubble應用場景上,實際與數據的傳送方式(網絡/內存直傳 vs 數據落盤)的物理屬性是重合的(Note: 但這兩種依然是分開的物理屬性,比如在必要的時候concurrent edge上也可以通過數據落盤方式傳送數據)。

基於這樣的分層抽象,Bubble切分算法,本質上就是嘗試聚合DAG圖的節點,將不滿足bubble准入條件的concurrent edge還原成sequential edge的過程。最終,由concurrent edge聯通的子圖即為bubble。在這裡我們通過一個實際的例子來展示Bubble切分算法的工作原理。假設存在下圖所示的DAG圖,圖中的圓圈表示計算頂點(vertex),每個圓圈中的數字表示該vertex對應的實際計算節點併發度。其中V1和V3因為在作業提交初始,就因為其內部包含barrier算子,而被標註成barrier vertex。圓圈之間的連接線表示上下游的連接邊(edge)。橙色線代表(初始)concurrent edge,黑色線代表sequential edge,初始狀態圖中的sequential edge根據barrier vertex的輸出邊均為sequential edge的原則確定,其他邊默認均初始化為concurrent edge。

image.png

Fig.8 示例DAG圖(初始狀態)

在這個初始DAG基礎上,按照上面介紹過的整體原則,以及本章節最後描述的一些實現細節,上圖描述的初始狀態,可以經過多輪算法迭代,最終產生如下的Bubble切分結果。在這個結果中產生了兩個Bubbles: Bubble#0 [V2, V4, V7, V8],Bubble#1 [V6, V10], 而其他的節點則被判斷將使用離線模式運行。

image.png

Fig.9 示例DAG圖Bubble切分結果

在上圖的切分過程中,自底向上的遍歷vertex,並秉承如下原則:

若當前vertex不能加入bubble,將其輸入edge均還原為sequential edge(比如DAG圖中的V9);

若當前vertex能夠加入bubble,執行廣度優先遍歷算法聚合生成bubble,先檢索輸入edge連接的vertex,再檢索輸出edge連接的,對於不能聯通的vertex,將edge還原為sequential edge(比如DAG圖中遍歷V2的輸出vertex V5時會因為total task count超過500觸發edge還原)。

而對任意一個vertex,只有當滿足以下條件才能被添加到bubble中:

  • vertex和當前bubble之間不存在sequential edge連接;
  • vertex和當前bubble不存在循環依賴,即:
    • Case#1:該vertex的所有下游vertex中不存在某個vertex是當前bubble的上游
    • Case#2:該vertex的所有上游vertex中不存在某個vertex是當前bubble的下游
    • Case#3:該vertex的所有下游bubble中不存在某個vertex是當前bubble的上游
    • Case#4:該vertex的所有上游bubble中不存在某個vertex是當前bubble的下游

注:這裡的上游/下游不僅僅代表當前vertex的直接後繼/前驅,也包含間接後繼/前驅

image.png

Fig.10 切分Bubble過程可能存在循環依賴的幾種場景

而實際線上bubble的切分還會考慮到實際資源和預期運行時間等信息,比如計算節點的plan memory 是否超過一定數值,計算節點中是否包含UDF算子,生產作業中計算節點基於歷史信息(HBO)的預估執行時間是否超長,等等,這裡不再贅述。

2.2 Bubble的調度與執行

2.2.1 Bubble調度

為了實現計算的加速,Bubble內部的計算節點的來源默認均來自常駐的預熱資源池,這一點與準實時執行框架相同。與此同時我們提供了靈活的可插拔性,在必要的情況下,允許Bubble計算節點從Resource Manager當場申請(可通過配置切換)。

從調度時機上來看,一個Bubble內部的節點調度策略與其對應的輸入邊特性相關,可以分成下面幾種情況:

  • 不存在任何input edge的bubble root vertext(比如 Fig.9中的V2):作業一運行就被調度拉起。
  • 只有sequential edge輸入bubble root vertex(比如 Fig.9中的V6):等待上游節點完成度達到配置的min fraction比例(默認為100%,即所有上游節點完成)才被調度。
  • Bubble內部的vertex(即所有輸入邊都是concurrent edge,比如 Fig.9中的V4, V8, V10),因為其完全是通過concurrent edge進行連接的,會自然的被與上游同時觸發調度。
  • Bubble邊界上存在mixed-inputs的bubble root vertex(比如 Fig.9中的V7)。這種情況需要一些特殊處理,雖然V7與V4是通過concurrent edge鏈接,但是由於V7的調度同時被V3通過sequential edge控制,所以事實上需要等待V3完成min-fraction後才能調度V7。對於這種場景,可以將V3的min-fraction配置為較小(甚至0)來提前觸發;此外Bubble內部我們也提供了progressive調度的能力,對這種場景也會有幫助。

比如圖7中的Bubble#1,只有一條SequentialEdge外部依賴邊,當V2完成後,就會觸發V6 + V10(通過concurrent edge)的整體調度,從而將整個Bubble#1運行起來。

在Bubble被觸發調度後,會直接向SMODE Admin申請資源,默認使用的是一體化Gang-Scheduling(GS)的資源申請模式,在這種模式下,整個Bubble會構建一個request,發送給Admin。當Admin有足夠的資源來滿足這個申請時,會將,再包含預拉起worker信息的調度結果發送給bubble作業的AM。

image.png

Fig.11 Bubble與Admin之間的資源交互

為了同時支持緊張資源上以及Bubble內部動態調整的場景,Bubble同時還支持Progressive的資源申請模式。這種模式允許Bubble內的每個Vertex獨立申請資源和調度。對於這種申請,Admin只要有增量的資源調度即會將結果發送給AM,直到對應Vertex的request完全滿足。對於這種場景上的獨特應用這裡暫時不做展開。

在準實時執行框架升級後,SMODE服務中的資源管理(Admin)和多DAG作業管理邏輯(MultiJobManager)已經解耦,因此bubble模式中的資源申請邏輯,只需要和Admin進行交互,而不會對於正常準實時作業的DAG執行管理邏輯帶來任何影響。另外,為了支持線上灰度熱升級能力,Admin管理的資源池中的每個常駐計算節點均通過Agent+多Labor模式運行,在調度具體資源時,還會根據AM版本,進行worker版本的匹配,並調度滿足條件的labor給Bubble作業。

2.2.2 Bubble數據Shuffle

對於穿越Bubble bourndary上的sequential edge,其上傳輸的數據和普通離線作業相同,都是通過落盤的方式來進行數據傳輸。這裡我們主要討論在Bubble內部的數據傳輸方式。根據之前描述的作業bubble切分原則,bubble內部的通常具備充分的數據pipeline特性,且數據量不大。因此對於bubble內部concurrent edge上的數據,均採用執行速度最快的網絡/內存直傳方式來進行shuffle。

這其中網絡shuffle的方式和經典的準實時作業相同,通過上游節點和下游節點之間建立TCP鏈接,進行網絡直連發送數據。這種push-based的網絡傳送數據方式,要求上下游必須同時拉起,根據鏈式的依賴傳遞,這種網絡push模式強依賴於Gang-Scheduling,此外在容錯,長尾規避等問題上也限制了bubble的靈活性。

為了更好的解決以上問題,在Bubble模式上,探索了內存shuffle模式。在這一模式下,上游節點將數據直接寫到集群ShuffleAgent(SA)的內存中,而下游節點則從SA中讀取數據。內存shuffle模式的容錯,擴展,包括在內存不夠的時候將部分數據異步落盤保證更高的可用性等能力,由ShuffleService獨立提供。這種模式可以同時支持Gang-Scheduling/Progressive兩種調度模式,也使其具備了較強的可擴展性,比如可以通過SA Locality調度實現更多的Local數據讀取,通過基於血緣的instance level retry實現粒度更精細的容錯機制等等。

image.png

Fig.12 Network Shuffle VS Memory Shuffle

鑑於內存shuffle提供的諸多可擴展優勢,這也是線上Bubble作業選用的默認shuffle方式,而網絡直傳則作為備選方案,允許在容錯代價很小的超小規模作業上,通過配置使用。

2.3 Fault-Tolerance

作為一種全新的混合執行模式,Bubble執行探索了在離線作業和一體化調度的準實時作業間的各種細粒度平衡。在線上複雜的集群中,運行過程中各種各樣的失敗在所難免。而bubble這種全新模式下,為了保證失敗的影響最小,並在可靠性和作業性能之間取得最佳的平衡,其對於失敗處理的策略也更加的多樣化。

針對不同的異常問題,我們設計了各種針對性容錯策略,通過各種從細到粗的力度,處理執行過程中可能涉及的各種異常場景處理,比如:向admin申請資源失敗、bubble中的task執行失敗(bubble-rerun)、bubble多次執行失敗的回退(bubble-renew),執行過程中AM發生failover等等。

2.3.1 Bubble Rerun

目前Bubble在內部計算節點失敗時,默認採用的retry策略是rerun bubble。即當bubble內的某個節點的本次執行(attempt)失敗,會立即rerun整個bubble,取消正在執行的同一版本的attempt。在歸還資源的同時,觸發bubble重新執行。通過這種方式,保證bubble內所有計算節點對應的(retry) attempt版本一致。

觸發bubble rerun的場景有很多,比較常見的有以下幾種:

  • Instance Failed:計算節點執行失敗,通常由上層引擎的runtime錯誤觸發(比如拋出retryable-exception)。
  • Resource Revoked:在線上生產環境,有很多種場景會導致資源節點重啟。比如所在的機器整機oom、機器被加黑等。在worker被殺之後,重啟之後的worker會依照最初的啟動參數重新連回admin。此時,admin會將這個worker重啟的消息封裝成Resource Revoked發送給對應的AM,觸發bubble rerun。
  • Admin Failover: 由於Bubble作業所使用的計算資源來自於SMODE的admin資源池,當admin由於某些原因Failover,或者SMODE整體服務被重啟時,分配給AM的計算節點會被停止。Admin在Failover之後不感知當前各個節點被分配的AM信息,無法將這些重啟的消息發送給AM。目前的處理方法是,每個AM訂閱了admin對應的nuwa,在admin重啟之後會更新這個文件. AM感知到信息更新後,會觸發對應的taskAttempt Failed,從而rerun bubble。
  • Input Read Error:在計算節點執行時,讀不到上游數據是一個很常見的錯誤,對於bubble來說,這個錯誤實際上有三種不同的類型:
    • Bubble內的InputReadError:由於shuffle數據源也在bubble內,在rerun bubble時,對應上游task也會重跑。不需要再做針對性的處理。
    • Bubble邊界處的InputReadError: shuffle數據源是上游離線vertex(或也可能是另一個bubble)中的task產生,InputReadError會觸發上游的task重跑,當前bubble rerun之後會被delay住,直到上游血緣(lineage)的新版本數據全部ready之後再觸發調度。
    • Bubble下游的InputReadError: 如果bubble下游的task出現了InputReadError,這個事件會觸發bubble內的某個task重跑,此時由於該task依賴的內存shuffle數據已經被釋放,會觸發整個bubble rerun。

2.3.2 Bubble Renew

在Admin資源緊張時, Bubble從Admin的資源申請可能等因為等待而超時。在一些異常情況下,比如bubble申請資源時剛好onlinejob服務處於重啟的間隔,也會出現申請資源失敗的情況。在這種情況下,bubble內所有vertex都將回退成純離線vertex狀態執行。此外對於rerun次數超過上限的bubble,也會觸發bubble renew。在bubble renew發生後,其內部所有邊都還原成sequential edge,並在所有vertex重新初始化之後,通過回放內部所有調度狀態機觸發事件,重新以純離線的方式觸發這些vertex的內部狀態轉換。確保當前bubble內的所有vertex在回退後,均會以經典離線的模式執行,從而有效的保障了作業能夠正常terminated。

image.png

Fig. 13 Bubble Renew

2.3.3 Bubble AM Failover

對於正常的離線作業,在DAG框架中,每個計算節點相關的內部調度事件都會被持久化存儲,方便做計算節點級別的增量failover。但是對於bubble作業來說,如果在bubble執行過程發生了AM failover重啟,通過存儲事件的replay來恢復出的bubble,有可能恢復到running的中間狀態。然而由於內部shuffle數據可能存儲在內存而丟失,恢復成中間running狀態的bubble內未完成的計算節點,會因讀取不到上游shuffle數據而立刻失敗。

這本質上是因為在Gang-Scheduled Bubble的場景上,bubble整體是作為failover的最小粒度存在的,所以一旦發生AM的failover,恢復粒度也應該在bubble這個層面上。所以對於bubble相關的所有調度事件,在運行中都會被當作一個整體,同時當bubble開始和結束的時候分別刷出bubbleStartedEvent和bubbleFInishedEvent。一個bubble所有相關的events在failover後恢復時會被作為一個整體,只有結尾的bubbleFInishedEvent才表示這個bubble可以被認為完全結束,否則將重跑整個bubble。

比如在下圖這個例子中,DAG中包含兩個Bubble(Bubble#0: {V1, V2}, Bubble#1: {V3, V4}),在發生AM重啟時,Bubble#0已經TERMINATED,並且寫出BubbleFinishedEvent。而Bubble#1中的V3也已經Terminated,但是V4處於Running狀態,整個Bubble #1並沒有到達終態。AM recover之後,V1,V2會恢復為Terminated狀態,而Bubble#1會重頭開始執行。

image.png

Fig 14. AM Failover with Bubbles

3. 上線效果

當前Bubble模式已經在公共雲全量上線,SQL作業中34%執行Bubble,日均執行包含176K個Bubble。

我們針對signature相同的query在bubble execution關閉和打開時進行對比,我們發現在整體的資源消耗基本不變的基礎上,作業的執行性能提升了34%,每秒處理的數據量提升了54%。

image.png

image.png

Fig 15. 執行性能/資源消耗對比

除了整體的對比之外,我們針對VIP用戶也進行了針對性的分析,用戶Project在打開了Bubble開關之後(下圖中紅色標記的點為打開Bubble的時間點),作業的平均執行性能有非常明顯的提升。

image.png

Fig 16. VIP用戶開啟Bubble後平均執行時間對比

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *