大數據

EMR Spark-SQL性能極致優化揭祕 Native Codegen Framework

作者:周克勇,花名一錘,阿里巴巴計算平臺事業部EMR團隊技術專家,大數據領域技術愛好者,對Spark有濃厚興趣和一定的瞭解,目前主要專注於EMR產品中開源計算引擎的優化工作。


背景和動機

SparkSQL多年來的性能優化集中在Optimizer和Runtime兩個領域。前者的目的是為了獲得最優的執行計劃,後者的目的是針對既定的計劃儘可能執行的更快。

相比於Runtime,Optimizer是更加通用的、跟實現無關的優化。無論是Java世界(Spark, Hive)還是C++世界(Impala, MaxCompute),無論是Batch-Based(Spark, Hive)還是MPP-Based(Impala, Presto),甚至無論是大數據領域還是傳統數據庫領域亦或HTAP領域(HyPer, ADB),在Optimizer層面考慮的都是非常類似的問題: Stats收集,Cost評估以及計劃選擇;採用的優化技術也比較類似,如JoinReorder, CTE, GroupKey Elimination等。儘管因為上下文不同(如是否有索引)在Cost Model的構造上會有不同,或者特定場景下采用不同的空間搜索策略(如遺傳算法 vs. 動態規劃),但方法大體是相同的。

長期以來,Runtime的優化工作基本聚焦在解決當時的硬件瓶頸。如MapReduce剛出來時網絡帶寬是瓶頸,所以Google做了很多Locality方面的優化;Spark剛出來時解決的問題是磁盤IO,內存緩存的設計使得性能相比MapReduce有了數量級的提升;後來CPU成為了新的瓶頸[1],因此提升CPU性能成了近年來Runtime領域重要的優化方向。

提升CPU性能的兩個主流技術是以MonetDB/X100[2](如今演化為VectorWise[3])為代表的向量化(Vectorized Processing)技術和以HyPer[5][6]為代表的代碼生成(CodeGen)技術(其中Spark跟進的是CodeGen[9])。簡單來說,向量化技術沿用了火山模型,但與其讓SQL算子每次計算一條Record,向量化技術會積攢一批數據後再執行。逐批計算相比於逐條計算有了更大的優化空間,例如虛函數的開銷分攤,SIMD優化,更加Cache友好等。這個技術的劣勢在於算子之間傳遞的數據從條變成了批,因此增大了中間數據的物化開銷。CodeGen技術從另外一個角度解決虛函數開銷和中間數據物化問題:算子融合。簡單來說,CodeGen框架通過打破算子之間的界限把火山模型“壓平”了,把原來迭代器鏈壓縮成了大的for循環,同時生成語義相同的代碼(Java/C++/LLVM),緊接著用對應的工具鏈編譯生成的代碼,最後用編譯後的class(Java)或so(C++,LLVM)去執行,從而把解釋執行轉變成了編譯執行。此外,儘管還是逐條執行,由於抹去了函數調用,一條Record從(Stage內的)初始算子一直執行到結束算子都基本處於寄存器中,不會物化到內存。CodeGen技術的劣勢在於難以應用SIMD等優化。

兩個門派相愛相殺,在經歷了互相發論文驗證自家優於對方後[4][8]兩家走向了合作,合作產出了一系列項目和論文,而目前學界的主流看法也是兩者融合是最優解,一些採用融合做法的項目也應運而生,如進化版HyPer[6], Pelonton[7]等。

儘管學界已走到了融合,業界主流卻沒有很強的動力往融合的路子走,探究其主要原因一是目前融合的做法相比單獨的優化並沒有質的提升;二是融合技術目前沒有一個廣為接受的最優做法,還在探索階段;三是業界在單一的技術上還沒有發揮出最大潛力。以SparkSQL為例,從2015年SparkSQL首次露面自帶的Expression級別的Codegen,到後來參考HyPer實現的WholeStage Codegen,再經過多年的打磨,SparkSQL的Codegen技術已趨成熟,性能也獲得了兩次數量級的躍升。然而,也許是出於可維護性或開發者接受度的考慮,SparkSQL的Codegen一直限制在生成Java代碼,並沒有嘗試過NativeCode(C/C++, LLVM)。儘管Java的性能已經很優,但相比於Native Code還是有一定的Overhead,並缺乏SIMD(Java在做這方面feature),Prefetch等語義,更重要的是,Native Code直接操作裸金屬,易於極致壓榨硬件性能,對一些加速器(如GPU)或新硬件(如AEP)的支持也更方便。

基於以上動機,EMR團隊探索並開發了SparkSQL Native Codegen框架,為SparkSQL換了引擎,新引擎帶來20%左右的性能提升,為EMR再次獲取世界第一立下汗馬功勞,本文講詳細介紹Native Codegen框架。

核心問題

做Native Codegen,核心問題有三個:
1.生成什麼?
2.怎麼生成?
3.如何集成到Spark?

生成什麼

針對生成什麼代碼,結合調研的結果以及開發同學的技術棧,有三個候選項:C/C++, LLVM, Weld IR。C/C++的優勢是實現相對簡單,只需對照Spark生成的Java代碼邏輯改寫即可,劣勢是編譯時間過長,下圖是HyPer的測評數據,C++的編譯時間比LLVM高了一個數量級。compile time.jpg
編譯時間過長對小query很不友好,極端case編譯時間比運行時間還要長。基於這個考慮,我們排除了C/C++選項。上圖看上去LLVM的編譯時間非常友好,而且很多Native CodeGen的引擎,如HyPer, Impala, 以及阿里雲自研大數據引擎MaxCompute,ADB等,均採用了LLVM作為目標代碼。LLVM對我們來說(對你們則不一定:D)最大的劣勢就是過於底層,語法接近於彙編,試想用匯編重寫SparkSQL算子的工作量會有多酸爽。大多數引擎也不會用LLVM寫全量代碼,如HyPer僅把算子核心邏輯用LLVM生成,其他通用功能(如spill,複雜數據結構管理等)用C++編寫並提前編譯好。即使LLVM+C++節省了不少工作量,對我們來說依然不可接受,因此我們把目光轉向了第三個選項: Weld IR(Intermediate Representation)。

首先簡短介紹以下Weld。Weld的作者Shoumik Palkar是 Matei Zaharia的學生,後者大家一定很熟悉,Spark的作者。Weld最初想解決的問題是不同lib之間互相調用時數據傳輸的開銷,例如要在pandas裡調用numpy的接口,首先pandas把數據寫入內存,然後numpy讀取內存進行計算,對於極度優化的lib來說,內存的寫入和讀取的時間可能會遠超計算本身。針對這個問題,Weld開發了Common Runtime並配套提供了一組IR,再加上惰性求值的特性,只需(簡單)修改lib使其符合Weld的規範,便可以做到不同lib共用Weld Runtime,Weld Runtime利用惰性求值實現跨lib的Pipeline,從而省去數據物化的開銷。Weld Runtime還做了若干優化,如循環融合,循環展開,向量化,自適應執行等。此外,Weld支持調用C代碼,可以方便調用三方庫。

我們感興趣的是Weld提供的IR和對應的Runtime。Weld IR面向數據分析進行設計,因此語義上跟SQL非常接近,能較好的表達算子。數據結構層面,Weld IR最核心的數據結構是vec和struct,能較好地表達SparkSQL的UnsafeRow Batch;基於struct和vec可以構造dict,能較好的表達SQL裡重度使用的Hash結構。操作層面,Weld IR提供了類函數式語言的語義,如map, filter, iterator等,配合builder語義,能方便的表達Project, Filter, Agg, BroadCastJoin等算子語義。例如,以下IR表達了Filter + Project語義,具體含義是若第二列大於10,則返回第一列:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

以下IR表達了groupBy的語義,具體含義是按照第一列做groupBy來計算第二列的sum:

|v: vec[{i32,i32}]| for(v,dictmerger[i32,i32,+],|b,i,n| merge(b,{n.$0,n.$1}))

具體的語法定義請參考Weld文檔(https://github.com/weld-project/weld/blob/master/docs/language.md)。
Weld 開發者API提供了兩個核型接口:

  1. weld_module_compile, 把Weld IR編譯成可執行模塊(module)。
  2. weld_module_run, 執行編譯好的模塊。

基本流程如下圖所示,最終也是生成LLVM代碼。
weld pipeline.png

由此,Weld IR的優勢就顯然易見了,既兼顧了性能(最終生成LLVM代碼),又兼顧了易用性(CodeGen Weld IR相比LLVM, C++方便很多)。基於這些考慮,我們最終選擇Weld IR作為目標代碼。

怎麼生成

SparkSQL原有的CodeGen框架之前簡單介紹過了,詳見https://developer.aliyun.com/article/727277。我們參考了Spark原有的做法,支持了表達式級別,算子級別,以及WholeStage級別的Codegen。複用Producer-Consumer框架,每個算子負責生成自己的代碼,最後由WholeStageCodeGenExec負責組裝。

這個過程有兩個關鍵問題:

1.算子之間傳輸的介質是什麼?
2.如何處理Weld不支持的算子?

傳輸介質

不同於Java,Weld IR不提供循環結構,取而代之的是vec結構和其上的泛迭代器操作,因此Weld IR難以借鑑Java Codegen在Stage外層套個大循環,然後每個算子處理一條Record的模式,取而代之的做法是每個算子處理一批數據,IR層面做假物化,然後依賴Weld的Loop-Fusion優化去消除物化。例如前面提到的Filter後接Project,Filter算子生成的IR如下,過濾掉第二列<=10的數據:

|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b)

Project算子生成的IR如下,返回第一列數據:

let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0))

表面上看上去Filter算子會把中間結果做物化,實際上Weld的Loop-Fusion優化器會消除此次物化,優化後代碼如下:

|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))

儘管依賴Weld的Loop-Fusion優化可以極大簡化CodeGen的邏輯,但開發中我們發現Loop-Fusion過程非常耗時,對於複雜SQL(嵌套3層以上)甚至無法在有限時間給出結果。當時面臨兩個選擇:修改Weld的實現,或者修改CodeGen直接生成Loop-Fusion之後的代碼,我們選擇了後者。重構後生成的代碼如下,其中1,2,11行由Scan算子生成,3,4,5,6,8,9,10行由Filter算子生成,7行由Project算子生成。

|v: vec[{i32,i32}]|
    for(v,appender,|b,i,n|
        if(
            n.$1 > 10, 
            merge(
            b,
            n.$0
            ), 
            b
        )
    )

這個優化使得編譯時間重回亞秒級別。

Fallback機制

受限於Weld當前的表達能力,一些算子無法用Weld實現,例如SortMergeJoin,Rollup等。即使是原版的Java CodeGen,一些算子如Outter Join也不支持CodeGen,因此如何做好Fallback是保證正確性的前提。我們採用的策略很直觀:若當前算子不支持Native CodeGen,則由Java CodeGen接管。這裡涉及的關鍵問題是Fallback的粒度:是算子級別還是Stage級別?

拋去實現難度不談,雖然直觀上算子粒度的Fallback更加合理,但實際上卻會導致更嚴重的問題:Stage內部Pipeline的斷裂。如上文所述,CodeGen的一個優勢是把整個Stage的邏輯Pipeline化,打破算子之間的界限,單條Record從初始算子執行到結束算子,整個過程不存在物化。而算子粒度的Fallback則會導致Stage內部一部分走Native Runtime,另一部分走Java Runtime,則兩者連接處無可避免存在中間數據物化,這個開銷通常會大於Native Runtime帶來的收益。

基於以上考慮,我們選擇了Stage級別的Fallback,在CodeGen階段一旦遇到不支持的算子,則整個Stage都Fallback到Java CodeGen。統計顯示,整個TPCDS Benchmark,命中Native CodeGen的Stage達到80%。

Spark集成

完成了代碼生成和Fallback機制,最後的問題就是如何跟Spark集成了。Spark的WholeStageCodegenExec的執行可以理解為一個黑盒,無論上游是Table Scan,Shuffle Read,還是BroadCast,給到黑盒的輸入類型只有兩種: RowBatch(上游是Table Scan)或Row Iterator(上游非Table Scan),而黑盒的輸出固定為Row Iterator,如下圖所示:
Spark Task.png

上文介紹我們選擇了Stage級別的Fallback,也就決定了黑盒要麼是Java Runtime,要麼是Native Runtime,不存在混合的情況,因此我們只需要關心如何把Row Batch/Row Iterator轉化為Weld認識的內存佈局,以及如何把Weld的輸出轉化成Row Iterator即可。為了進一步簡化問題,我們注意到,儘管Shuffle Reader/BroadCast的輸入是Row Iterator,但本質上遠端序列化的數據結構是Row Batch,只不過Spark反序列化後轉換成Row Iterator後再餵給CodeGen Module,RowBatch包裝成Row Iterator非常簡易。因此Native Runtime的輸入輸出可以統一成RowBatch。

解決辦法呼之欲出了:把RowBatch轉換成Weld vec!但我們更進了一步,何不直接把Row Batch餵給Weld從而省去內存轉換呢?本質上Row Batch也是滿足某種規範的字節流而已,Spark也提供了OffHeap模式把內存直接存堆外(僅針對Scan Stage。Shuffle數據和Broadcast數據需要讀到堆外),Weld可以直接訪問。Spark UnsafeRow的內存佈局大致如下:
Unsaferow.png
針對確定的schema,null bitmap和fixed-length data的結構是固定的,可以映射成struct,而針對var-length data我們的做法是把這些數據copy到連續的內存地址中。如此一來,針對無變長數據的RowBatch,我們直接把內存塊餵給Weld;針對有變長部分的數據,我們也只需做大粒度的內存拷貝(把定長部分和變長部分分別拷出來),而無需做列級別的細粒度拷貝轉換。
unsaferow.jpg
繼續舉前文的Filter+Project的例子,一條Record包含兩個int列,其UnsafeRow的內存佈局如下(為了對齊,Spark裡定長部分最少使用8字節)。

顯而易見,這個結構可以很方便映射成Weld struct:

{i64,i64,i64}

而整個Row Batch便映射成Weld vec:

vec[{i64,i64,i64}]

如此便解決了Input的問題。而Weld Output轉RowBatch本質是以上過程的逆向操作,不再贅述。

解決了Java和Native之間的數據轉換問題,剩下的就是如何執行了。首先我們根據當前Stage的Mode來決定走Java Runtime還是Native Runtime。在Native分支,首先會執行StageInit做Stage級別的初始化工作,包括初始化Weld,加載編譯好的Weld Module,拉取Broadcast數據(若有)等;接著是一個循環,每個循環讀取一個RowBatch(來自Scan或Shuffle Reader)餵給Native Runtime執行,Output轉換並餵給Shuffle Writer。如下圖所示:
native execution.png

總結

本文介紹了EMR團隊在Spark Native Codegen方向的探索實踐,限於篇幅若干技術點和優化沒有展開,後續可另開文詳解,例如:

1.極致Native算子優化
2.數據轉換詳解
3.Weld Dict優化

大家感興趣的任何內容歡迎溝通: )

[1] Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout
[2] MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz
[3] Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski
[4] Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann
[5] HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper
[6] Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang
[7] Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon
[8] Vectorization vs. Compilation in Query Execution. Juliusz Sompolski
[9] https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


相關閱讀推薦:
EMR Spark-SQL性能極致優化揭祕 RuntimeFilter Plus
EMR Spark-SQL性能極致優化揭祕 概覽篇


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
image.png

對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *