微博機器學習平臺使用 Flink 實時處理用戶行為日誌和生成標籤,並且在生成標籤後寫入存儲系統。為了降低存儲系統的 IO 負載,有批量寫入的需求,同時對數據延遲也需要進行一定的控制,因此需要一種有效的消息聚合處理方案。
在本篇文章中我們將詳細介紹 Flink 中對消息進行聚合處理的方案,描述不同方案中可能遇到的問題和解決方法,並進行對比。
基於 flatMap 的解決方案
這是我們能夠想到最直觀的解決方案,即在自定義的 flatMap 方法中對消息進行聚合,偽代碼如下:
對應的作業拓撲和運行狀態如下:
該方案的優點如下:
- 邏輯簡單直觀,各併發間負載均勻。
- flatMap 可以和上游算子 chain 到一起,減少網絡傳輸開銷。
- 使用 operator state 完成 checkpoint,支持正常和改併發恢復。
與此同時,由於使用 operator state,因此所有數據都保存在 JVM 堆上,當數據量較大時有 GC/OOM 風險。
使用 Count Window 的解決方案
對於大規模 state 數據,Flink 推薦使用 RocksDB backend,並且只支持在 KeyedStream 上使用。與此同時,KeyedStream 支持通過 Count Window 來實現消息聚合,因此 Count Window 成為第二個可選方案。
由於需要使用 KeyedStream,我們面臨的第一個問題就是如何生成 key。一個比較自然的想法是直接使用隨機數,偽代碼示例如下:
對應的作業拓撲如下:
然而實際上線測試時出現了數據傾斜,不同併發間會出現負載不均,部分 task 接收不到數據從而 TPS 為 0:
在我們的場景下,除了有批量寫入降低 IO 的需求,對數據延遲也需要控制,當 key set 太大時,每個 key 累積指定數據條數的時間將增加,會導致數據寫入的延遲增大,因此我們需要控制 key set 的大小。經過分析,當 key set 較小時,Flink 默認的數據分發策略在併發間分佈不均,從而導致了上述數據傾斜的問題。下面我們從源碼級別對此進行說明。
首先,Flink 為了保證從 state 中恢復數據時產生最小的 IO,引入了 key group 的概念。Key group 數目等於最大併發數(max parallelism),取值範圍是 128-32768。當做數據分發的時候,key 會按照規則被分發到 key group 裡面,相關代碼如下所示:
keyGroup->KeyGroupRangeAssignment.assignToKeyGroup(key,maxParallelism);
然後,key group 會按照規則被分發到每個 task 上,代碼示例如下:
Task->String.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup));
通過 debug 可以發現,當 key 的數量較小時,該分發策略會導致 key 在 task 之間分配不均勻,測試代碼如下:
輸出結果如下:
{0=4, 1=4, 2=1}
{0=651, 1=686, 2=710}
可以看到,當只有 10 個 key 時,併發間分佈很不均勻;但當 key 的數量增加到 2048 時,就相對均勻了。
在瞭解了 key 的分發策略之後,我們可以相應的調整 key 的生成規則,來達到指定併發度和 key set 大小前提下的數據均勻,如下述代碼所示:
我們利用 maxParallelism 和 parallelism 生成 key,並將其存儲到一個大小為 parallelism 的 map 裡,以 taskid 作為 map key ,每個 task 對應的 key list 作為 value,來保證每個 taskid 對應的 list 都存儲了相同數量的 key。
最後,再將 map 打平,存儲到一個數組裡。在使用的時候,我們可以從該數組裡隨機取數來作為key,就能達到平均分配的目的了。
該方案的執行效果如下:
可以看到數據傾斜的問題得以解決,每個任務的負載都比較均勻。但需要注意的是由於引入了 key by,因此會有數據 shuffle,對比 flatmap 方案會有額外的網絡開銷。另外由於生成 key 的規則和實際併發度有關,因此該方案不支持改併發恢復,或者說如果修改併發,那麼在 restore 的時候會發生數據錯亂的問題,這一點需要尤為注意。
方案對比和總結
最後我們將兩種解決方案的優缺點對比總結如下:
在數據量不大且內存充足的情況下,建議使用 flatmap 方案;在數據量較大且可以保證不修改併發的情況下,建議使用 count window 方案並使用 RocksDB 進行 state數據存儲;在數據量較大且需要修改併發的情況下,當前給出的兩種方案都無法解決,需要尋求新的解決方案。
作者介紹:
曹富強、張穎,微博機器學習研發中心-系統工程師。現負責微博機器學習平臺數據計算模塊,主要涉及實時計算 Flink、Storm、Spark Streaming,離線計算 Hive、Spark 等。目前專注於 Flink 在微博機器學習場景的應用。