今天主要跟大家分享一下spark 3.0在SQL方向上的一些優化工作。從spark 2.4開始,大概有超過一年半的時間。對於一個比較活躍的開源項目來說,這個時間是非常長的。所以裡面包含了大量的這種功能增強,性能優化,等各方面的新的feature在裡面。大概超過50%的相關的issue都是和SQL相關的。在SQL這個方向上主要做的工作,大概分成四個方面。第一方面是工具類的。就是說基於spark的一個開發者怎麼去和spark交互,提供一些更多的工具。第二個是dynamic optimization。簡單來說就是運行時的優化。在這裡面,包含了幾個重大的性能改進。第三個是在spark的catalyst優化器方面有很多新的改進。第四個是基礎依賴的更新。主要在語言層面引入了一些新的支持和依賴。
Spark 3.0是一個時間跨度非常長的release,包含了非常多的社區的工作。統計下來有接近3400多個issue在spark 3.0裡面進行了處理。針對這麼多的issue,我們用spark 3.0的時候,需要考慮有哪些東西對於實際的生產環境可能有好處,有哪些新的特性。
總結下來,大概可以把在SQL方向上的這種大的改動分成七個部分,分屬於上文中提到的四個類別。
第一部分是new explain format。當我們想去改進,去優化一個spark SQL的性能的時候,首先需要去了解SQL的查詢計劃大概是一個什麼樣子,有針對性的去進行這種SQL的重寫,或其他的一些改進。前提就是我的查詢計劃可讀性比較強,是非常容易去看的。
對於之前2.4的版本,可以通過explain SQL去展示。只不過是這種展示的方式看起來繁雜一點。我們可以看到針對於SQL,這麼一個物理查詢計劃,是一個樹狀的結構。也是可以去看的,但是可讀性相對來說不夠好。
在3.0裡面,針對查詢計劃的這種展示進行了一定的優化,以簡要的格式展示。根據節點的編號,可以找到對應的更詳細的信息。而且對於每一個節點展示的信息也做了一些歸類和整理,整理成input,output,condition等。通過這種方式,用戶可以更加清晰的看到整個的查詢計劃。
第二部分是all type of join hints。在spark 2.4只支持broadcast。而spark 3.0除了支持broadcast,還支持sort merge,shuffle hash和cartesian。
第三部分是adaptive query execution。社區為什麼要去做它,最主要的原因就是說,對於一些查詢計劃,在運行時能夠拿到更準確的數據統計信息,可以選擇最優的這種計劃,對數據進行處理,從而提升spark處理數據的性能。主要包括三種場景。第一種是調整reducer的數量,從而避免額外的內存和IO的開銷。第二種是說,選擇最合適的join的策略。第三種是說,針對傾斜數據,在join的時候提供更好的處理方式。上述場景都是自動的,根據運行時的情況,自動地收集相關的信息,然後去做判斷。
怎麼去動態的調整reducer的數量。在spark 2.4,默認指定partition數量,每一個partition經過shuffle之後,對應的要處理的數據的大小可能是不一樣的。這是由數據本身的特性來決定的,它的分佈可能本來就是不均衡的。
在spark 3.0中,在shuffle的時候,每一個partition有不同的數據量大小,需要把小的partition數據進行合併,給同一個reducer去處理,從而使得每一個reducer它所處理的數據量大小是相近的。
針對有數據傾斜的這種join,在spark 2.4中帶來的主要的問題就是說,在處理最大的partition時,要花費很長的時間,影響整個join。
在spark 3.0中,有數據傾斜的join,比在spark 2.4中更快。如圖所示,對於表A和表B,我把大表的數據做切分,小表的數據做全量的分發。第一個,滿足join的語義要求。第二個,在傾斜的這些key上面,它是被切成多分,然後在多個task裡面去處理。
第四部分是dynamic partitioning pruning。在join操作中,要避免讀取不必要的partition。而dynamic filter能夠避免讀取不必要的partition。
如下圖所示,在spark 2.4中,大表中的所有數據都被讀取。
而在spark 3.0中,通過pushdown with dynamic filter,能夠減少大表中需要被讀取的數據量。
如下圖所示,是一個dynamic partitioning pruning的例子。
第五部分是Enhanced nested column pruning & pushdown,是針對於這種嵌套的數據結構的支持。在spark 2.4裡面,其實已經提供了部分的這種支持。如下圖所示的表裡面,有column 1和column 2,而後者是一個嵌套的數據結構,它裡面有兩個字段。比如說,我查詢的時候只查了column 2裡面的第1個字段。去訪問這個數據的時候,我只需要把column 2的第1個字段拿出來就行了,而不需要把整個column 2都拿出來。但是在spark 2.4裡面它的支持是有限的。就是說,只能穿透有限的幾個算子,比如說LIMIT這種算子,對於其他的一些算子是沒辦法的。
而在spark 3.0裡面,對這一塊進行了進一步的優化,能夠支持把column pruning推到穿透所有的算子。
另外一種場景,就是說filter過濾的條件是根據嵌套字段裡面的某一個子字段去做過濾,是不是支持把過濾條件也推到table scan裡面。在spark 2.4裡面也是不能夠完全支持的。
而在spark 3.0裡面,針對嵌套字段的filter,也是一直可以往下推到具體訪問數據的table scan裡面。
第六部分是Improved aggregation code generation,針對aggregation擴件的一個優化。
就是說,在spark裡面我們去支持這種擴件,但是擴件會有一個限制。針對每個方法,如果大於8000 Java bytecode,HotSpot編譯器就rollback,放棄生成native code。所以,如果你的這種SQL比較複雜,可能會沒辦法利用到擴件的這種特性。
在spark 3.0裡面,針對這種情況做一些優化。簡單來說,把一個方法拆分成多個方法,從而避免碰到8000 Java bytecode的限制。
具體的例子如下圖所示。
第七部分是New Scala and Java,針對新的語言版本的支持。支持了新的Java 11這個版本,以及Scala 2.12版本。
關鍵詞:Spark 3.0,SQL性能改進,Interactions with developers,Dynamic optimizations,Catalyst improvements,Infrastructure updates