一、簡介
通常來講,我們的streaming有多種模式。包括Message Source based stream,File Source based stream,等等。
我們可以看到,針對一個streaming系統,在structured streaming的運行環境下,有主要的兩個因素。一個是input size,一個是state size。這兩個size完全指導於我們整個streaming的全部的流程,並且與我們真正的性能是切切相關的。
對於這樣的場景,我們怎麼樣去進行一個合適的調優。我們的多個部分,全部用如下的這樣一個例子。
二、輸入參數
對於一個streaming系統來講,數據來源主要是input上游。對於input的size的限制,直接決定了我們整個的時間複雜度。我們整個streaming的處理流程,或者說處理的每一個batch當中的時間複雜度是n×m,如下圖所示。
輸入參數的重要性有三點,如下圖所示。
我們常指的輸入參數,其實主要也就是trigger的單位。整個的size的控制,以及整體與我們的shuffle,partition,還有cluster的資源的聯動,對於我們的整個性能調優是至關重要的。
另外,我們剛才也提到了關於join場景。比方說我們接下來給到的例子,streaming以及靜態數據的這樣一個join的場景當中。如果說我們對於每個分片的shuffle size的大小,以及如何把它能夠轉化為一個內存shuffle的場景,也是對於我們輸入參數進行一個合適調優的過程。
如果我們用系統默認的maxFilesPerTrigger來進行這樣一個計算,會觸發哪些問題。對於Delta來講,比方說,默認的選項就是一千個文件。假設我們的生產環境,每個文件有200MB。那麼這樣的環境,會看到我們的mini-batch就會越變越慢。也就是說,我們的消費者永遠大於生產者。我們持續不斷的有這個狀態,以及數據的不停的積累,導致我們的流latency越來越大。這個場景,我相信大家在生產環境中也會經常遇到。
接下來,需要用某種方式調整我們的maxFilesPerTrigger。這裡我們有兩個目標值。第一個目標值是說,我們希望每一個shuffle partition size在100~200MB。這個值是怎麼來的,某種程度上,某一個集群配置,或者說某一種集群配置下,這個值都是不一致的。需要大家按照自己的集群大小,包括memory,配置進行一個預估和調整。第二個,我們需要shuffle partition和core的值是相等的。
可以看到,我們通過maxFilesPerTrigger調整到6個files,這個時候就沒有shuffle再發生了。同時,我們的Processed Records/Seconds比原來的這樣一個比例調整上升了30%。
大家可以看到,我們屏蔽了shuffle spill這樣一個耗時操作之後,我們的調優工作就結束了嗎,其實並不是。我們還可以做進一步的調優。也就是我們剛才這個背景介紹的那樣,我們的一條動態流和一條靜態的DataFrame進行join,但是我們的靜態DataFrame其實完全意義上來講是可以做broadcast hash join。
三、狀態參數
這一部分是關於狀態參數。在我們剛才一開始提到的影響streaming的兩個維度,一個是input size,另一個是state size。我們需要進一步的對state size有一個很好的限制。否則,我們每一個input,需要在state store當中去做查找,或者說做匹配,做一系列的兩個batch之間的狀態互通的操作的時候,就會導致一系列的性能問題。
當前來講,我們對state的定義,主要是這兩個部分。第一部分是State Store backed operations,第二部分是Delta Lake table or external system。
狀態參數的重要性如下圖所示。因為每一個batch都需要對state當中的操作進行一個查詢,並且更新的這樣一個步驟。無限增長的state肯定會讓你的作業越跑越慢,並且到最後消費者跟不上生產者。而且與此同時,它也會帶來我們剛才遇到的問題,比方說shuffle spill,還有常見的out of memory的問題。
接下來,我們主要會給大家演示的是這兩類的state store。第一種就是我們剛才提到的跟operation強相關的,比方說,watermarking是我們需要保留的history的一個水位線。也就是說,我們當前的系統考慮多久之前的數據就算過期了,我們就可以不用再考慮。另外一個就是哪一種state store我們正在被使用。第二種完全跟具體操作無關,需要去看我們的query的predicate,具體的query實現是什麼樣的。然後看我們每一次參與計算的batch對應的量,以及state的一個修改情況。
接下來看一下狀態參數的例子。第一部分和第二部分跟上文中的例子保持一致,沒有什麼變化。這個例子著重於第3部分,Aggregate sales per item per hour。
四、輸出參數
接下來我們看一下輸出參數,相對於前幾種場景比較特殊。在上文中提到的input state對structured streaming的影響的二維象限裡面,其實並沒有output。Structured streaming框架本身並不會受到輸出參數的影響。但是它更多的影響,其實是在於下游。對於streaming系統來講,我們期望整個端到端的延遲也好,吞吐也好,達到一個整體最優化。
我們可以看到,如果輸出參數不是一個很合適的值,最常見的問題就是小文件問題。我們會spill,或者說我們會寫出大量的這種小的文件,導致下游的讀取文件的操作會變得很慢。與此同時,以這樣的方式去寫出的時候,其實對框架本身也是一種開銷,或者說一種代價。
其實商業系統當中對於輸出參數是有一系列的優化的。比方說,Delta Lake系統當中對於output有一個Auto-Optimize的功能,它是默認打開的一個feature。Delta Lake在每一個batch寫出的時候,他都會根據自己的合適的partition,以及file size進行一個自動的調優的輸出。
五、部署
最後來看一下,我們在線下調整好streaming的參數,如何保證一系列的性能指標在上線之後也有同樣好的性能。首先需要考慮的就是關於driver節點的部署。常見的情況來講,一開始覺得非重要的streaming流,我們會將它跟一系列的,比方說,線下的batch job與其他的streaming job
進行一個混合部署,用同一個spark cluster。但是這種情況下,我們就會發現不同的作業會互相影響,尤其是driver節點會有一個對應的性能影響。所以在一開始部署,或者說在集群治理的角度,我們需要關注對應的streaming流是否可以和其他的作業進行混合部署。
與此同時,有可能的彈性伸縮的需求。我們之前期望core和shuffle partition儘量一致,來儘可能的用到系統cluster當中的所有資源。但是如果說你後期會有彈性的部署需求,比方說,我希望有臨時的一個集群擴展,對於streaming的cluster。這個時候,其實是有一個限制在裡面。就是說如果你的shuffle partition一開始設置的比原有的core要小,這種情況下,集群的擴展對你原有的系統是沒有用的。因為你的shuffle partition對應的參數寫在checkpoint file當中去了,所以不會增加額外的資源。
下一點就是關於capacity planning,跟剛才我提到的也有關係。因為shuffle partition是一個fixed的checkpoint裡邊對應的一個值。我們如果說打破了對應的這樣一個限制,真正重啟作業的時候需要將checkpoint清掉。另外,我們需要在一開始的時候就考慮state恢復的機制。
我們一共講了四個方向,對於streaming性能有一定影響,如下圖所示,分別是輸入參數,狀態參數,輸出參數,以及部署。
關鍵詞:Spark Structured Streaming,input parameter, state parameter,output parameter,參數調優