開發與維運

Apache Spark 3.0中的SQL性能改進概覽

今天主要跟大家分享一下spark 3.0在SQL方向上的一些優化工作。從spark 2.4開始,大概有超過一年半的時間。對於一個比較活躍的開源項目來說,這個時間是非常長的。所以裡面包含了大量的這種功能增強,性能優化,等各方面的新的feature在裡面。大概超過50%的相關的issue都是和SQL相關的。在SQL這個方向上主要做的工作,大概分成四個方面。第一方面是工具類的。就是說基於spark的一個開發者怎麼去和spark交互,提供一些更多的工具。第二個是dynamic optimization。簡單來說就是運行時的優化。在這裡面,包含了幾個重大的性能改進。第三個是在spark的catalyst優化器方面有很多新的改進。第四個是基礎依賴的更新。主要在語言層面引入了一些新的支持和依賴。

image.png

Spark 3.0是一個時間跨度非常長的release,包含了非常多的社區的工作。統計下來有接近3400多個issue在spark 3.0裡面進行了處理。針對這麼多的issue,我們用spark 3.0的時候,需要考慮有哪些東西對於實際的生產環境可能有好處,有哪些新的特性。

image.png

總結下來,大概可以把在SQL方向上的這種大的改動分成七個部分,分屬於上文中提到的四個類別。

image.png

第一部分是new explain format。當我們想去改進,去優化一個spark SQL的性能的時候,首先需要去了解SQL的查詢計劃大概是一個什麼樣子,有針對性的去進行這種SQL的重寫,或其他的一些改進。前提就是我的查詢計劃可讀性比較強,是非常容易去看的。

image.png

對於之前2.4的版本,可以通過explain SQL去展示。只不過是這種展示的方式看起來繁雜一點。我們可以看到針對於SQL,這麼一個物理查詢計劃,是一個樹狀的結構。也是可以去看的,但是可讀性相對來說不夠好。

image.png

在3.0裡面,針對查詢計劃的這種展示進行了一定的優化,以簡要的格式展示。根據節點的編號,可以找到對應的更詳細的信息。而且對於每一個節點展示的信息也做了一些歸類和整理,整理成input,output,condition等。通過這種方式,用戶可以更加清晰的看到整個的查詢計劃。

image.png

第二部分是all type of join hints。在spark 2.4只支持broadcast。而spark 3.0除了支持broadcast,還支持sort merge,shuffle hash和cartesian。

image.png

第三部分是adaptive query execution。社區為什麼要去做它,最主要的原因就是說,對於一些查詢計劃,在運行時能夠拿到更準確的數據統計信息,可以選擇最優的這種計劃,對數據進行處理,從而提升spark處理數據的性能。主要包括三種場景。第一種是調整reducer的數量,從而避免額外的內存和IO的開銷。第二種是說,選擇最合適的join的策略。第三種是說,針對傾斜數據,在join的時候提供更好的處理方式。上述場景都是自動的,根據運行時的情況,自動地收集相關的信息,然後去做判斷。

image.png

怎麼去動態的調整reducer的數量。在spark 2.4,默認指定partition數量,每一個partition經過shuffle之後,對應的要處理的數據的大小可能是不一樣的。這是由數據本身的特性來決定的,它的分佈可能本來就是不均衡的。

image.png

在spark 3.0中,在shuffle的時候,每一個partition有不同的數據量大小,需要把小的partition數據進行合併,給同一個reducer去處理,從而使得每一個reducer它所處理的數據量大小是相近的。

image.png

針對有數據傾斜的這種join,在spark 2.4中帶來的主要的問題就是說,在處理最大的partition時,要花費很長的時間,影響整個join。

image.png

在spark 3.0中,有數據傾斜的join,比在spark 2.4中更快。如圖所示,對於表A和表B,我把大表的數據做切分,小表的數據做全量的分發。第一個,滿足join的語義要求。第二個,在傾斜的這些key上面,它是被切成多分,然後在多個task裡面去處理。

image.png

第四部分是dynamic partitioning pruning。在join操作中,要避免讀取不必要的partition。而dynamic filter能夠避免讀取不必要的partition。

image.png

如下圖所示,在spark 2.4中,大表中的所有數據都被讀取。

image.png

而在spark 3.0中,通過pushdown with dynamic filter,能夠減少大表中需要被讀取的數據量。

image.png

如下圖所示,是一個dynamic partitioning pruning的例子。

image.png

第五部分是Enhanced nested column pruning & pushdown,是針對於這種嵌套的數據結構的支持。在spark 2.4裡面,其實已經提供了部分的這種支持。如下圖所示的表裡面,有column 1和column 2,而後者是一個嵌套的數據結構,它裡面有兩個字段。比如說,我查詢的時候只查了column 2裡面的第1個字段。去訪問這個數據的時候,我只需要把column 2的第1個字段拿出來就行了,而不需要把整個column 2都拿出來。但是在spark 2.4裡面它的支持是有限的。就是說,只能穿透有限的幾個算子,比如說LIMIT這種算子,對於其他的一些算子是沒辦法的。

image.png

而在spark 3.0裡面,對這一塊進行了進一步的優化,能夠支持把column pruning推到穿透所有的算子。

image.png

另外一種場景,就是說filter過濾的條件是根據嵌套字段裡面的某一個子字段去做過濾,是不是支持把過濾條件也推到table scan裡面。在spark 2.4裡面也是不能夠完全支持的。

image.png

而在spark 3.0裡面,針對嵌套字段的filter,也是一直可以往下推到具體訪問數據的table scan裡面。

image.png

第六部分是Improved aggregation code generation,針對aggregation擴件的一個優化。

image.png

就是說,在spark裡面我們去支持這種擴件,但是擴件會有一個限制。針對每個方法,如果大於8000 Java bytecode,HotSpot編譯器就rollback,放棄生成native code。所以,如果你的這種SQL比較複雜,可能會沒辦法利用到擴件的這種特性。

image.png

在spark 3.0裡面,針對這種情況做一些優化。簡單來說,把一個方法拆分成多個方法,從而避免碰到8000 Java bytecode的限制。

image.png

具體的例子如下圖所示。

image.png

第七部分是New Scala and Java,針對新的語言版本的支持。支持了新的Java 11這個版本,以及Scala 2.12版本。

image.png


關鍵詞:Spark 3.0,SQL性能改進,Interactions with developers,Dynamic optimizations,Catalyst improvements,Infrastructure updates

Leave a Reply

Your email address will not be published. Required fields are marked *