大數據

自適應查詢執行AQE:在運行時加速SparkSQL

一、自適應查詢執行AQE簡介

關於自適應查詢執行,在數據庫領域早有充分研究。在Spark社區,最早在Spark 1.6版本就已經提出發展自適應執行(Adaptive Query Execution,下文簡稱AQE);到了Spark 2.x時代,Intel大數據團隊進行了相應的原型開發和實踐;到了Spark 3.0時代,Databricks和Intel一起為社區貢獻了新的AQE。

什麼是AQE呢?簡單來說就是根據在運行時統計信息(runtime statistics)在查詢執行的過程中進行動態(Dynamic)的查詢優化。那麼我們為什麼需要AQE呢?在Spark 2.x時代,為了選擇最佳執行計劃,我們引入了CBO(Cost-based optimization),但是在一些場景下,效果非常不好,缺點明顯,比如:

  • 統計信息過期或者缺失導致估計錯誤;
  • 收集統計信息代價較大(比如column histograms);;
  • 某些謂詞使用自定義UDF導致無法預估;
  • 手動指定執行hint跟不上數據變化。

而在Spark 3.0時代,AQE完全基於精確的運行時統計信息進行優化,引入了一個基本的概念Query Stages,並且以Query Stage為粒度,進行運行時的優化,其工作原理如下所示:
image.png

整個AQE的工作原理以及流程為:

  1. 運行沒有依賴的stage;
  2. 在一個stage完成時再依據新的統計信息優化剩餘部分;
  3. 執行其他已經滿足依賴的stage;
  4. 重複步驟(2)(3)直至所有stage執行完成。

二、Spark 3.0中主要的AQE特性

Spark 3.0中主要的AQE特性包括:

  • 動態合併shuffle分區;
  • 動態轉換join策略;
  • 動態優化join中的數據傾斜。

(一)動態合併shuffle分區

Shuffle分區數量和大小對查詢性能很關鍵。在Spark 3.0以前,Shuffle分區是一個固定值,存在著明顯的缺點,如果分區過小會導致I/O低效、調度開銷和任務啟動開銷,但是如果分區過大又會帶來GC壓力和溢寫硬盤等問題。另一方面,在Spark 3.0之前,整個查詢執行過程中使用統一的分區數,而在查詢執行的不同階段,數據規模會發生明顯變化,如果保持統一的分區數,則大大降低了效率。基於以上,動態合併Shuffle分區是非常必要的。

AQE解決上面問題的具體做法是設置較大的初始分區數來滿足整個查詢執行過程中最大的分區數,並且在每個Query stage結束的時候按需自動合併分區,其具體的流程如下圖所示:
image.png

具體來說,動態合併Shuffle分區的原理如下:

對於普通的Shuffle來說,沒有自動合併的過程,每個MAP讀取Shuffle後,會根據指定分區數進行分區,比如下圖為5:
image.png

進行上圖所示的分區後發現,REDUCE1和REDUCE5要處理的數據量明顯高於其餘三個REDUCE,而我們理想的情況下是每個REDUCE處理的數據量是相當的,所以AQE進行了動態合併分區,將相鄰的小分區2,3,4進行合併,輸出三個REDUCE,大大提高了後續的效率,如下圖所示:

image.png

(二)動態轉換join策略
在Spark中,我們希望當Join的某一邊可以完全放入內存時,Spark選擇Broadcast Hash Join,但是實際上會出現預估可能不夠準確,導致本來可以優化為BHJ的沒有被優化的情況,原因也很多,比如;

  • 統計信息不夠準確;
  • 子查詢太複雜;
  • 黑盒的謂詞,比如自定義UDF。

對於以上問題,AQE的解決方法就是使用運行時數據大小重新選擇執行計劃,其整個流程與原理如下圖所示:
image.png

(三)動態優化join中的數據傾斜

在Join中的數據傾斜會導致一系列的問題,比如性能下降、某一個task影響整個stage的運行等,處理數據量比較大的partitions時候還可能會出現溢寫磁盤的情況。AQE針對上述問題使用運行時的統計信息自動優化查詢執行,動態的發現傾斜數據的數量,並且把傾斜的分區分成更小的子分區來處理。其做法如下圖所示:
image.png

具體來說其原理如下:
對於普通的sort merge join來說,沒有傾斜優化,可能會造成某個Shuffle分區的數據數量明顯高於其他分區,如下圖中的PART.A0,這種情況會造成A0和B0的這個Join執行速度明顯慢於其他的Join。

image.png

有了AQE之後,根據數據傾斜優化後的sort merge join,使用skew Shuffle reader,如下圖所示將A0分成三個子分區,並將對應的B0複製三份,整個Join任務的運行效率大大提升。

image.png

上述的幾個特性可以在Demo中查看https://docs.databricks.com/_static/notebooks/aqe-demo.html

三、TPC-DS性能測試

進行TPC-DS性能測試的集群配置如下圖所示:
image.png

測試結果顯示,2條Query獲得了1.5倍的性能提升,37條Query獲得了1.1倍的性能提升。

image.png

下面兩張圖是關於分區合併和Join策略的性能測試結果,可以看出AQE對於性能的提升還是非常明顯的。

image.png
image.png

除了在TPC-DS的測試中AQE表現優秀,在實際生產環境中AQE對於性能的提升也非常優秀,比如某電商公司分享在某些典型的傾斜查詢中使用了AQE之後獲得了十幾倍的性能提升,某互聯網巨頭使用了AQE之後發現在2個典型的查詢中性能分別有了5倍和1.38倍的提升等等。

四、Q&A

Q1:Shuffle是如何對大量小文件進行優化的?
A1:如果是初試輸入的小文件,會在MAP階段進行合併;Shuffle的數據是寫本地的文件,並且Spark會進行合併,對本地的數據壓力不是很大;在Shuffle之後由於動態分區產生的小文件,如果使用的是Hive,可以在後續的Stage進行合併,如果是Spark,可以使用AQE來進行合併,也可以讓Spark顯式的做一次Shuffle,避免產生大量的小文件。

Q2:AQE是否支持外部的Shuffle Service?
A2:當前開源的Shuffle Service都是支持的。


關鍵詞:Spark 3.0 、AQE、自適應查詢執行、Join、小文件

Leave a Reply

Your email address will not be published. Required fields are marked *