EMR團隊探索並開發了SparkSQL Native Codegen框架,為SparkSQL換了引擎,新引擎帶來最高4倍性能提升,為EMR再次獲取世界第一立下汗馬功勞。來自阿里雲EMR團隊的周克勇將詳細介紹Native Codegen框架。本文整理自視頻 https://developer.aliyun.com/live/43579
本次分享主要分為三部分,第一做這件事情的動機和背景,第二做的過程中解決的核心問題,最後是總結。
有些同學可能瞭解到,EMR團隊今年4月份打破了大數據領域Benchmark TBCDS的世界紀錄。在硬件完全相同的情況下,性能提升了一倍,從520萬分提高到1100多萬分。這個成績的背後主要依賴兩條技術,增強了Optimizer和Native Runtime。Optimizer層面,我們在之前工作的基礎上,又做了諸如 CTE 、動態分區裁剪、小表廣播、PK/FK、Fast Decimal等優化。
大家如果關注剛結束的SparkSummit,會發現一些類似的技術,如動態分區裁剪已經進了最新的Spark3.0,EMR版本的Spark在幾個月之前就支持了。另一條技術是Native Runtime,也是今天分享的主題,涵蓋的主要工作,包括 Native Codegen、統一內存佈局、Batch化執行框架,後續會詳細介紹。
大家都知道 Optimizer的目的是獲取最好的執行計劃,主要技術包括states收集和Cost Model,難點是靜態states不夠準確,無法在Plan階段準確預知Filter或join之後的數據量,因此對後續Plan的代價評估不夠準確。
今年SparkSummit發佈的adaptive Execution,就把動態stats收集和plan優化結合在一起來解決這個問題。相對應的 Runtime的目的是針對選定的plan,如何使它跑得更快,長期以來 Runtime的主要工作基本上都聚焦在解決當下的新硬件瓶頸。如MapReduce剛出來時,網絡帶寬是瓶頸,所以Google做了很多locality方面的優化。Spark剛出來時解決的硬件瓶頸是磁盤I/O,它通過內存緩存來提升性能。
再後來 CPU成了新的瓶頸,我們可以看到從10年到20年,磁盤I/O和網絡帶寬都有了每年數量級的提升,但是CPU的主頻基本上保持不變,因此CPU成了新的硬硬件瓶頸,提升CPU性能,成為近年來 Runtime領域重要的優化方向。優化CPU主要有兩條技術路線,向量化和Codegen。我們先看一下傳統的 SQL執行所應用的火山模型的問題所在,這是一個簡單的Select家,Filter加Project加Agg的例子。
在執行的過程中,在火山模型中,每個算子都是一個迭代器,下游的算子,調上游算子的next方法,next返回當前算子處理之後的中間結果。這個模型最大的問題是每條record在經過每一個算子的時候,都要經過一次虛函數調用,而虛函數調用的開銷是非常大的。
第二個問題就是在每個算子之間需要把中間的結果物化到內存。針對這個問題,向量化技術給出的解,是通過批量執行加列式存儲,加小循環,來更好的利用 SIMD的指令和CPU的亂序執行,從而最大化數據並行度和指令並行度,從而分攤掉虛函數調用的開銷,並提升執行性能。
例如上面例子裡Agg 算子計算過程,他把輸入 column1,column2以及 Agg的輸出結果sum都存在數組裡,然後通過一個很緊湊的for循環進行計算。由於循環足夠簡單,編譯器會做循環展開和SIMD的優化。從截圖中我們可以看到,編譯器生成了很多向量化的指令,此外,由於for循環足夠簡單,然後for循環內部基本上都是訪存指令,如訪問colum1的第i個數據,colum2的第i個數據,所以每次放循環最主要的時間都是在進行訪存,而因為 for循環足夠的短,所以CPU的亂序執行的窗口裡,可以同時發射多條漏斗指令,從而解決了 Memory Wall的問題。
這個技術的代表是MonetDB/X100(2005),以及今年SparkSummit宣佈的 photon(2020)技術,主要的缺點是中間緩存的數據量比較大,Codegen技術的給出的解釋算子融合,他打破了Stage內部算子間的界限,拼出來跟原來的邏輯保持一致的裸的代碼通常是一個大的for循環,然後把拼成的代碼編譯成可執行文件,這裡 面展示的跨越的第一個Stage拼出來的代碼,可以看到最外層是一個大的for循環,接下來是Filter,表達了 Filter算術的語義,然後在Filter的內部是Agg的語意,拼出的代碼完全不存在迭代器和額外的函數調用,就像是一個新手手寫的代碼,而這種代碼不存在任何框架上的Overhead,性能往往是最好的。
Spark的Codegen把拼成的代碼交給 Janino模塊做編譯,在運行的時候直接load即時編譯出來的class文件。Codegen技術的好處有幾點:
1.用for循環代替了迭代器,完全消除了虛函數調用;
2.沒有了霧化,中間數據都保存在寄存器裡。它的缺點就是因為 for循環比較大,而且每次迭代執行的邏輯非常的複雜,所以很難應用SIMD的優化。這個技術的代表是Hyper和Apache Spark,儘管Spark的Java Codegen,相比之前有了數量級的提升,但依然有一些不足。首先是Java的性能還是弱於Native Code,二是Java語義的限制,例如無法顯示使用 SIMD或Prefetch之類,並且由於機器的存在,無法自主精細化控制內存。
3.NativeCode更容易跟新硬件進行交互。基於這個原因,我們決定使用 Native Runtime替換Java Runtime。同時我們不想對現有的Spark做太多的改動,所以最終我們選擇了Codegen技術路線,結合起來就是Native Codegen。
接下來介紹我們做Native codegen解決的核心問題,集中在三個方面,我們要生成什麼代碼,怎麼生成這些代碼,以及怎麼樣跟Spark做集成。
第一個問題,生成什麼?
如今的NativeCode有很多,C/C++。Go Rest,LLVM等。基於我們自己的技能點,其實可以選擇的就只有C/C++, C++實現起來相對直觀,只需要對照原來生成的Java代碼,替換成C++即可。但C++最大的問題是它在編譯時間過長,根據HyPer的論文,C++的編譯時間比LLVM高出了一個數量級。LLVM的編譯時間很短,而且執行的效率跟C++相當,看上去是一個很不錯的選擇。
其實很多Native Codegen這樣的系統都選擇了LLVM,包括HyPer,Impala以及阿里雲自研的MaxComputer,ADB等,但LLVM對我們來說還是過於複雜,它的語法接近彙編,是想用匯編重寫SQL算法的工作量會有多大,其實大多數引擎也不會用
LLVM寫全量的代碼,比如HyPer,解碼算子的核心邏輯,用LLVM生成其他通用的功能,包括spill複雜數據結構的管理等,實際上是用C++提前編寫好並進行編譯。 即便如此,LLVM對我們來說依然過於複雜,在廣泛調研之後,另外一種可能性出現了 Weld。
先介紹一下Weld
這個是Spark的作者matei的學生的作品,他提供了包括Language+Compiler+Runner的工具鏈,最終會轉化成LLVM,然後用LLVM的工具鏈編譯執行, Weld最初想解決的問題是不同lib之間相互調用時數據傳輸的開銷,例如要在pandas裡調用numpy的接口,首先pandas把數據寫入內存,然後numpy讀取內存進行計算。
對於極度優化的Library來說,內存的寫入和讀取的時間可能會遠超計算本身。針對這個問題,Weld開發了Common Runtime,並配套提供了一組IR,再加上惰性求值的特性,只需要簡單修改Library,使其符合Weld的語法規範,便可以做到不同Library共用Weld的Runtime,再利用惰性求值實現快Library的Pipeline,從而省去數據物化的開銷。Weld Runtime還做了若干優化,如循環融合循環展開,向量化自適應執行等。
此外Weld支持調用C代碼,可以方便的調用三方庫。我們感興趣的是Weld提供的IR和對應的Runtime。
Weld IR語法是針對關係代數進行設計的,非常適合表達SQL語句。數據結構層面,Weld IR最核心的數據結構是vec和struct,對應C語言裡的數組和struct,能較好的表達Spark SQL的 Row Batch基於struct和vec,可以構造字典數據結構,能夠比較好的表達SQL裡面重度使用Hash結構,操作層面,Weld IR提供了類函數式語言的語義,如Map,Filter,Iterator等配合Builder語義,能方便地表達Project、Filter、Agg、Broadcast join等算子語義,例如 select加Filter的例子,用Weld IR的表達如下,第一行是函數簽名,表示入參是一個數組,數組的元素是一個struct,strut包含兩個int32的成員。
接下來就是一個大的 for表達式,跟常見的語法不同,for表達式包含三個參數
1.需要遍歷的數組;
2.Build,用來生成最終的結果。 Build類型也決定了最終生成的結果的。用什麼數據結構來存儲。
3.lambda,用來定義針對每個元素的操作,在這個例子裡面,第一個參數就是這個函數的入參v第二個參數是append,表示最終構造的結果,存在一個數組裡面。第三個,lambda參數是一個if表達式, if的語義跟我們常見的也不太相同,它實際上是把 if的true和false的兩個分支都作為參數表達,其中第一個參數是condition,第二個參數是當condition為true的時候,所執行的邏輯。
第三個參數肯定是認為false的時候執行的邏輯,在這個裡面可以看到當第二個成員它是從0開始計數,當第二個成員大於10的時候,會把第一個成員 merge到 appender裡面。否則的話就什麼都不做,直接返回原來的build。Weld的IR。通過 weld_module_compile和weld_module_run,兩個接口,分別做編譯和執行。由於Weld同時兼顧了語法簡潔,編譯時間短的特性,因此我們選擇Weld作為生成的目標。
第二個問題就是怎麼生成?
我們複用了Spark Codegen框架。我們知道 Spark Codegen包含Expression和Stage兩個級別,在Expression級別,我們對照原來的doGenCode()的接口,增加了doGenNativeCode(),裡面拼出來的是Weld的語法,例如之前可能Java的代碼裡面就直接是兩個變量的相加,然後改造了以後就成了一個struct的兩個成員的相加。在WholeStage級別,我們複用了producer/ consumer的框架,熟悉Spark源碼的同學應該瞭解到,在producer/consumer框架下,每個算子都提供了produce和consume接口,produce的職責是生成為下游提供數據的代碼,consumer的職責是生成消費上游數據的代碼,Spark 中並非所有的算子都支持Codegen,例如outjoin就不支持支持Codegen的算子,繼承了CodeGenSupport的接口,我們對整個producer/consumer的框架並沒有改動,在他們旁邊又新增加了一系列的接口,包括 NativeCodeGenSupport/doProduceNative/doConsumeNative。
以一個具體的例子加以說明,還是一個相比較簡單的select加Filter加Project的例子, query包含三個算子,Scan、Filter、Project。
然後 query他的代碼生成的過程是右上角的這張圖。首先 project就是最下游的算子,它的produce方法會返回最終生成的代碼的字符串。然後它這個是怎麼生成的呢?Project 的doProduce。直接調用了 Filter的doProduce方法,然後Filter的doProduce方法直接調用了Scan的doProduce,然後Scan的 doProduce會生成一個框架代碼,在框架代碼的內部會調用Scan的 doConsume。Scan的doConsume。直接調用Filter的doConsume。Filter的doConsume會生成Filter的邏輯,並在內部調用Project的doConsume,Project的 doConsume。會把最終的數據輸出 append的到 output中。
我們看下面這三張圖,Scan的 doProduce會生成for循環的一個架子。然後在for循環的每個迭代裡面調用 Filter的doConsume方法, Filter的doConsume會生成一個if的表達式的框架,然後在判斷為true,也就是if的內部的話,調用的是project的 doConsume。 最後project的doConsume拼成一段append 的方法把column1 append到 output裡面到此為止。一個完整的Java的Codegen過程就結束了,然後我們就拿得到了直接可以編譯的Java代碼,當然這個是簡化的過程。
對於Native Codegen的話,我們是複用了這個邏輯,只是把生成的Java代碼替換成了Weld的IR,如底下三張圖所示,具體的Weld,語法我就不詳細展開了。
感興趣的同學可以到Weld官網上看語法定義,代碼生成還有一個問題就是Fallback機制,由於人力有限,我們無法覆蓋所有的算子,因此需要實現Fallback機制。這裡需要做的決定是應該做算子級別的Fallback,還是Stage級別的Fallback。直觀上算子粒力度的Fallback好像更加合理,實際上卻會導致更嚴重的問題。它會導致Stage內部Pipeline的斷裂。前面講了Codegen的一個優勢是整個過程不存在物化,而算子力度的Fallback則會導致Stage內部一部分算是走Native Runtime,另一部分走Java Runtime,兩者的連接數無可避免存在數據物化,開銷通常會大於Native Runtime帶來的收益。
基於這個原因,我們選擇Stage級別的Fallback,一旦有任何算子不支持Native Codegen,在整個Stage都Fallback到Java Codegen,代碼也已經生成了。
最後的問題,如何跟Spark集成 。
task的執行可以理解為一個黑盒,它的輸入是Row Batch或者Row Iterator我們知道在Scan Stage Spark用了向量化讀的優化,讀出來的是列式存儲的 column batch,每一列本質上都用一個數組進行存儲,而在Shuffle Stage,Shuffle Fetch回來的數據結構是行式存儲的 unsaferowbatch。每個Stage的輸出會封裝成會封裝成Row Iterator。
我們前面講到既然選擇了Stage級別的Fallback,意味著黑盒要麼是Java Runtime要麼是Native Runtime,不存在混合的情況。因此我們關心的如何把輸入轉化為Weld認識的內存佈局,以及如何把Weld的輸出包裝成Row Iterator。針對列存數據,打開offheap開關,數據天生就是指針數組,Weld可以直接操作。對於行存數據,主要問題是變長數據難以映射到 Weld的 struct右上方的圖展示了Spark Row Batch的內存佈局,首先是固定長度的,null bitmap,然後是固定長度的列數據,最後是變長數據,由於變長數據的存在,無法直接把一條record映射成 strut。
我們的做法是把定長部分和變長部分分別拷貝出來,並有offset和length來標誌變量部分的位置和長度。這樣一來record就能映射到strut結構了,而整個Row Batch就映射成了一個 vec strut。例如這個例子,每個record包括兩個long和一個String,null bit用一個long表示,緊接著是兩個long表示兩個列的數據。第三個long保存變長數據的 offset和length,最後是變成部分,我們把變成拷貝出去之後,根據原先的offset和length,計算新的offset和length,最終我們用1個包含5個long的strut表示 record,分別是 null bitmap原先的兩個long offset和length。這樣一來我們就完成了統一內存佈局,並且當且僅當有變長數據存在的時候才需要拷貝,否則的話是不需要拷貝的。Weld輸出轉換成Row Batch是剛才所說的過逆向過程,這裡就不再贅述了,完成了數據轉換,最後是Spark的執行流程。首先我們嘗試走Native Codegen,若有異常發生,則切換到Java Codegen。若沒有異常,則執行StageInit做初始化工作,包括初始化Weld,加載編譯好的Weld module,拉取Broadcast數據等。
接著是一個循環,每個循環會讀取一個Row Batch,給Native Runtime來執行,輸出結果包裝成Row Iterator,給Shuffle Write。以上就是EMR團隊在Native Runtime上做的探索。總結下來,我們採用Weld的IR作為代碼生成的目標語言,複用了Spark Codegen框架,進行代碼生成,採用了Stage級別的Fallback機制,並通過統一內存佈局跟Spark做了集成。
由於時間有限,一些工作沒有包含在今天的分享中,例如 Weld的不好表達的算子,如SortMergejoin、Partitionby ,我們其實也用了Native的技術進行了優化。再例如 Weld本身的字典的實現效率比較低,我們也對此進行了比較大的優化。除了Native Runtime,EMR團隊在Spark很多技術點都做了工作,歡迎大家交流溝通。