大數據

EMR Spark-SQL性能極致優化揭祕 RuntimeFilter Plus

作者:陸路,花名世儀,阿里巴巴計算平臺事業部EMR團隊高級開發工程師,大數據領域技術愛好者,對Spark、Hive等有濃厚興趣和一定的瞭解,目前主要專注於EMR產品中開源計算引擎的優化工作。


背景介紹

TPC-DS 測試集採用星型和雪花型等多維數據模型,包含 7 張事實表和 17 張維度表,以 store channel 為例,事實表和維度表的關聯關係如下所示:

image.png

分析 TPC-DS 全部 99 個查詢語句不難發現,絕大部分語句的過濾條件都不是直接作用於事實表,而是通過過濾維度表並將結果集與事實表 join 來間接完成。因此,優化器很難直接利用事實表索引來減少數據掃描量。如何利用好查詢執行時的維度表過濾信息,並將這些信息下推至存儲層來完成事實表的過濾,對於性能提升至關重要。

在 2019 年的打榜測試中,我們基於 Spark SQL Catalyst Optimizer 開發的 RuntimeFilter 優化 對於 10TB 數據 99 query 的整體性能達到 35% 左右的提升。簡單來說,RuntimeFilter 包括兩點核心優化:

  1. 動態分區裁剪:事實表以日期列(date_sk)為分區列建表,當事實表與 date_dim 表 join 時,optimizer 在運行時收集 date_dim 過濾結果集的所有 date_sk 取值,並在掃描事實表前過濾掉所有未命中的分區文件。
  2. 非分區列動態過濾:當事實表與維度表的 join 列為非分區列時,optimizer 動態構建和收集維度表結果集中 join 列的 Min-Max Range 或 BloomFilter,並在掃描事實表時下推至存儲層,利用存儲層索引(如 Parquet、ORCFile 的 zone map 索引)來減少掃描數據量。

問題分析

為了進一步挖掘 RuntimeFilter 優化的潛力,我們選取了部分執行時間較長的 query 進行了細緻的性能剖析。這些 query 均包含大於一個事實表和多個維度表的複雜 join。在分析了 RuntimeFilter 對各個 query 的性能提升效果後,我們發現:

  1. 動態分區裁剪的性能提升效果明顯,但很難有進一步的優化空間
  2. 非分區列動態過濾對整體提升貢獻相比分區裁剪小很多,主要是因為很多下推至存儲層的過濾條件並沒有達到索引掃描的效果

聰明的同學應該已經發現,只有 date_dim 這一張維度表和分區列相關,那麼所有與其它維度表的 join 查詢從 RuntimeFilter 優化中受益都較為有限。對於這種情況,我們做了進一步的拆解分析:

  1. 絕大部分 join 列均為維度表的自增主鍵,且與過濾條件沒有相關性,因此結果集取值常常均勻稀疏地散佈在該列的整個取值空間中
  2. 對於事實表,考慮最常見的 Zone Map 索引方式,由於 load 階段沒有針對非分區列做任何聚集操作(Clustering),每個 zone 的取值一般也稀疏分散在各個列的值域中。
  3. 相比 BloomFilter,Min-Max Range 的構建開銷和索引查詢開銷要低得多,但由於信息粒度太粗,索引過濾命中的效果也會差很多

綜合以上幾點考慮,一種可能的優化方向是在 load 階段按照 join 列對事實表進行 Z-Order 排序。但是這種方式會顯著增加 load 階段執行時間,有可能導致 TPC-DS 評測總分反而下降。同時,由於建表階段優化的複雜性,實際生產環境的推廣使用也會比較受限。

RuntimeFilter Plus

基於上述分析,我們認為依賴過濾條件下推至存儲層這一方式很難再提升查詢性能,嘗試往其它方向進行探索:

  1. 不依賴存儲層索引
  2. 不僅優化事實表與維度表 join

最終我們提煉兩個新的運行時過濾優化點:維度表過濾廣播和事實表 join 動態過濾,並在原版 RuntimeFilter 優化的基礎上進行了擴展實現。

維度表過濾廣播

這一優化的思想來源於 Lookahead Information Passing(LIP),在論文《Looking Ahead Makes Query Plans Robust》中首次提出。其針對的場景如下圖所示:
image.png

當事實表(lineorder)連續與多個維度表過濾結果做 multi-join 時,可將所有維度表的過濾信息下推至 join 之前。該方法與我們的 RuntimeFilter 的主要不同在於下推時考慮了完整的 multi-join tree 而不是局部 binary-join tree。其優化效果是即使 join ordering 為 bad case,無用的事實表數據也能夠被儘早過濾掉,即讓查詢執行更加 robust。

我們參考論文算法實現了第一版過濾下推規則,但並沒有達到預期的性能提升,主要原因在於:

  1. Spark CBO Join-Reorder 結合我們的遺傳算法優化,已經達到了接近最優的 join ordering 效果
  2. 前置的 LIP filters 執行性能並沒有明顯優於 Spark BroadcastHashJoin 算子

基於過濾條件可以傳遞至複雜 multi-join tree 的任意節點這一思想去發散思考,我們發現,當 multi-join tree 中存在多個事實表時,可將維度表過濾條件廣播至所有的事實表 scan,從而減少後續事實表 SortMergeJoin 等耗時算子執行時所需處理的數據量。以一個簡化版的 query 64 為例:

with cs_ui as
(select cs_item_sk
,sum(cs_ext_list_price) as sale
from catalog_sales
,catalog_returns
where cs_item_sk = cr_item_sk
and cs_order_number = cr_order_number
group by cs_item_sk)
select i_product_name product_name
,i_item_sk item_sk
,sum(ss_wholesale_cost) s1
from store_sales
,store_returns
,cs_ui
,item
where ss_item_sk = i_item_sk and
ss_item_sk = sr_item_sk and
ss_ticket_number = sr_ticket_number and
ss_item_sk = cs_ui.cs_item_sk and
i_color in ('almond','indian','sienna','blue','floral','rosy') and
i_current_price between 19 and 19 + 10 and
i_current_price between 19 + 1 and 19 + 15
group by i_product_name
,i_item_sk

該查詢的 plan tree 如下圖所示:
image.png

考慮未實現維度表過濾廣播的執行流程,store_sales 數據經過 RuntimeFilter 和 BroadcastHashJoin 算子進行過濾,但由於過濾後數據仍然較大,後續的所有 join 都需要走昂貴的 SortMergeJoin 算子。但如果將 LIP filter 下推至 4 張事實表的 scan 算子(無需下推至存儲層),不僅減少了 join 數據量,也減少了 catalog_sales 和 catalog_returns 表 join 後的 group-by aggregation 數據量 。

LIP 實現

在 optimizer 層,我們在原版 RuntimeFilter 的 SyntheticJoinPredicate 規則後插入 PropagateDynamicValueFilter 規則,將合成的動態謂詞廣播至所有合法的 join 子樹中;同時結合原有的謂詞下推邏輯,保證動態謂詞最終傳播到所有相關的 scan 算子上。在算子層,LIP filters 的底層實現可以是 HashMap 或 BloomFilter,針對 TPC-DS 的數據特性,我們選擇 BitMap 作為廣播過濾條件的底層實現。由於 BitMap 本身是精確的(Exact Filter),可以結合主外鍵約束信息進一步做 semi-join 消除優化。基於主外鍵約束的優化規則將在系列後續文章做詳細介紹。

應用該優化後,query 64 執行時間由 177 秒降低至 63 秒,加速比達到 2.8 倍。

事實表 Join 動態過濾

使用 BloomFilter 來優化大表 join 是一種常見的查詢優化技術,比如在論文《Building a Hybrid Warehouse: Efficient Joins between Data Storedin HDFS and Enterprise Warehouse》中提出對 join 兩表交替應用 BloomFilter 的 zig-zag join 方法,降低分佈式 join 中的數據傳輸總量。對於 TPC-DS 測試集,以 query 93 為例,store_sales 與 store_returns join 後的結果集大小遠小於 store_sales 原始數據量,非常適合應用這一優化。

BloomFilter 的構建和應用都存在較高的計算開銷,對於 selectivity 較大的join,盲目使用這一優化可能反而導致性能回退。基於靜態 stats 的 join selectivity 估算往往誤差,Spark 現有的 CBO 優化規則難以勝任魯棒的 BloomFilter join 優化決策。因此,我們基於 Spark Adaptive Execution(AE) 運行時重優化機制來實現動態的 BloomFilter join 優化規則。AE 的基本原理是在查詢作業的每個 stage 執行完成後,允許優化器根據運行時採集的 stage stats 信息重新調整後續的物理執行計劃。目前主要支持三種優化:
(1)reduce stage 併發度調整;
(2)針對 skew 情況的 shuffle 數據均衡分佈;
(3)SortMergeJoin 轉換為 BroadcastHashJoin

基於 AE 的優化規則流程如下:

  1. 根據靜態 stats 判斷 join 的一端的 size 是否可能適合構建 BloomFilter( build side),如果是,則 build side 和 stream side 的 scan stage 會依次串行提交執行;否則這兩個 stage 將並行執行。
  2. 在 build side 的 scan stage 執行完成後,AE 根據運行時收集的 size 和 join 列 histogram 進行代價估算,並決定最終走 BroadcastHashJoin、BloomFilter-SortMergeJoinJoin 還是原本的 SortMergeJoin。
  3. 當物理執行計劃為 BloomFilter-SortMergeJoinJoin,優化器會插入一個新的作業並行掃描 build side 的 shuffle 數據來構建 BloomFilter,並下推至 stream side 的 scan stage 中。

BloomFilter 算子實現

為了減少 BloomFilter 帶來的額外開銷,我們重新實現了高效的 BuildBloomFiler 和 Native-InBloomFilter 的算子。在構建階段,使用 RDD aggregate 來合併各個數據分片的 BloomFiler 會導致 driver 成為數據傳輸和 bitmap 合併計算的性能瓶頸;使用 RDD treeAggregate 實現並行分層合併顯著降低了整體的構建延遲。在過濾階段,Native-InBloomFilter 的算子會被推入 scan 算子中合併執行。該算子直接訪問 Spark 列式讀取內存格式,按批量數據來調用 SIMD 優化的 native 函數,降低 CPU 執行開銷;同時,我們將原版算法替換為 Blocked BloomFilter 算法實現,該算法通過犧牲少量的 bitmap 存儲空間來換取訪存時更低的 CPU cache miss 率。

應用該優化後,query 93 執行時間由 225 秒降低至 50 秒,加速比達到 4.5 倍。


推薦閱讀:EMR Spark-SQL性能極致優化揭祕 概覽篇


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

image.png

對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
image.png

Apache Spark技術交流社區公眾號,微信掃一掃關注

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *