大數據

Flink 消息聚合處理方案

微博機器學習平臺使用 Flink 實時處理用戶行為日誌和生成標籤,並且在生成標籤後寫入存儲系統。為了降低存儲系統的 IO 負載,有批量寫入的需求,同時對數據延遲也需要進行一定的控制,因此需要一種有效的消息聚合處理方案。

在本篇文章中我們將詳細介紹 Flink 中對消息進行聚合處理的方案,描述不同方案中可能遇到的問題和解決方法,並進行對比。

基於 flatMap 的解決方案

這是我們能夠想到最直觀的解決方案,即在自定義的 flatMap 方法中對消息進行聚合,偽代碼如下:

640 1.png

對應的作業拓撲和運行狀態如下:

640 2.png
640_3

該方案的優點如下:

  1. 邏輯簡單直觀,各併發間負載均勻。
  2. flatMap 可以和上游算子 chain 到一起,減少網絡傳輸開銷。
  3. 使用 operator state 完成 checkpoint,支持正常和改併發恢復。

與此同時,由於使用 operator state,因此所有數據都保存在 JVM 堆上,當數據量較大時有 GC/OOM 風險。

使用 Count Window 的解決方案

對於大規模 state 數據,Flink 推薦使用 RocksDB backend,並且只支持在 KeyedStream 上使用。與此同時,KeyedStream 支持通過 Count Window 來實現消息聚合,因此 Count Window 成為第二個可選方案。

由於需要使用 KeyedStream,我們面臨的第一個問題就是如何生成 key。一個比較自然的想法是直接使用隨機數,偽代碼示例如下:

640_4

對應的作業拓撲如下:

640_5

然而實際上線測試時出現了數據傾斜,不同併發間會出現負載不均,部分 task 接收不到數據從而 TPS 為 0:

640_6

在我們的場景下,除了有批量寫入降低 IO 的需求,對數據延遲也需要控制,當 key set 太大時,每個 key 累積指定數據條數的時間將增加,會導致數據寫入的延遲增大,因此我們需要控制 key set 的大小。經過分析,當 key set 較小時,Flink 默認的數據分發策略在併發間分佈不均,從而導致了上述數據傾斜的問題。下面我們從源碼級別對此進行說明。

首先,Flink 為了保證從 state 中恢復數據時產生最小的 IO,引入了 key group 的概念。Key group 數目等於最大併發數(max parallelism),取值範圍是 128-32768。當做數據分發的時候,key 會按照規則被分發到 key group 裡面,相關代碼如下所示:

keyGroup->KeyGroupRangeAssignment.assignToKeyGroup(key,maxParallelism);

640_7

然後,key group 會按照規則被分發到每個 task 上,代碼示例如下:

Task->String.valueOf(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroup));

640_8

通過 debug 可以發現,當 key 的數量較小時,該分發策略會導致 key 在 task 之間分配不均勻,測試代碼如下:

640_9

輸出結果如下:

{0=4, 1=4, 2=1}
{0=651, 1=686, 2=710}

可以看到,當只有 10 個 key 時,併發間分佈很不均勻;但當 key 的數量增加到 2048 時,就相對均勻了。

在瞭解了 key 的分發策略之後,我們可以相應的調整 key 的生成規則,來達到指定併發度和 key set 大小前提下的數據均勻,如下述代碼所示:

640_10
640_11

我們利用 maxParallelism 和 parallelism 生成 key,並將其存儲到一個大小為 parallelism 的 map 裡,以 taskid 作為 map key ,每個 task 對應的 key list 作為 value,來保證每個 taskid 對應的 list 都存儲了相同數量的 key。

最後,再將 map 打平,存儲到一個數組裡。在使用的時候,我們可以從該數組裡隨機取數來作為key,就能達到平均分配的目的了。

640_12

該方案的執行效果如下:

640_13

可以看到數據傾斜的問題得以解決,每個任務的負載都比較均勻。但需要注意的是由於引入了 key by,因此會有數據 shuffle,對比 flatmap 方案會有額外的網絡開銷。另外由於生成 key 的規則和實際併發度有關,因此該方案不支持改併發恢復,或者說如果修改併發,那麼在 restore 的時候會發生數據錯亂的問題,這一點需要尤為注意。

方案對比和總結

最後我們將兩種解決方案的優缺點對比總結如下:

640_14

在數據量不大且內存充足的情況下,建議使用 flatmap 方案;在數據量較大且可以保證不修改併發的情況下,建議使用 count window 方案並使用 RocksDB 進行 state數據存儲;在數據量較大且需要修改併發的情況下,當前給出的兩種方案都無法解決,需要尋求新的解決方案。

作者介紹:

曹富強、張穎,微博機器學習研發中心-系統工程師。現負責微博機器學習平臺數據計算模塊,主要涉及實時計算 Flink、Storm、Spark Streaming,離線計算 Hive、Spark 等。目前專注於 Flink 在微博機器學習場景的應用。

Leave a Reply

Your email address will not be published. Required fields are marked *