本文介紹了 SmartNews 利用 Flink 加速 Hive 日表的生產,將 Flink 無縫地集成到以 Airflow 和 Hive 為主的批處理系統的實踐。詳細介紹過程中遇到的技術挑戰和應對方案,以供社區分享。主要內容為:
- 項目背景
- 問題的定義
- 項目的目標
- 技術選型
- 技術挑戰
- 整體方案及挑戰應對
- 項目成果和展望
- 後記
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、項目背景
SmartNews 是一家機器學習驅動的互聯網公司。自 2012 年於日本東京成立,並在美國和中國設有辦公室。經過 8 年多的發展,SmartNews 已經成長為日本排名第一,美國成長最快的新聞類應用,覆蓋全球超過 150 多個國家市場。據 2019 年初統計,SmartNews 的 iOS 和 Android 版本全球累計下載量已經超過 5000 萬次。
SmartNews 在過去 9 年的時間,基於 Airflow, Hive, EMR 等技術棧構建了大量的數據集。隨著數據量的增長,這些離線表的處理時間在逐漸拉長。另外,隨著業務方迭代節奏的加快,對錶的實時性也提出了更高的要求。因此,SmartNews 內部發起了 Speedy Batch 的項目,以加快現有離線表生產效率。
本次分享便是 Speedy Batch 項目中的一個例子,加速用戶行為 (actions) 表的實踐。
APP 端上報的用戶行為日誌,每日通過 Hive 作業生成日表,這個表是許多其他表的源頭,至關重要。這個作業需要運行 3 個小時,進而拉高了許多下游表的延遲 (Latency),明顯影響數據科學家、產品經理等用戶的使用體驗。因此我們需要對這些作業進行提速,讓各個表能更早可用。
公司業務基本上都在公有云上,服務器的原始日誌以文件形式上傳至雲存儲,按日分區;目前的作業用 Airflow 調度到 EMR 上運行,生成 Hive 日表,數據存儲在雲存儲。
二、問題的定義
1. 輸入
新聞服務器每隔 30 秒上傳一個原始日誌文件,文件上傳至相應日期和小時的雲存儲目錄。
2. 輸出
原始日誌經過 ETL 處理之後,按日 (dt) 和行為 (action) 兩級分區輸出。action 種類約 300 個,不固定,常有增減。
3. 用戶
對這個表的使用是廣泛的,多途徑的。有從 Hive 裡查詢,也有從 Presto,Jupyter 和 Spark 裡查詢,我們甚至不能確定以上就是全部的訪問途徑。
三、項目的目標
- 將 actions 表的時延從 3 小時縮短至 30 分鐘;
-
對下游用戶保持透明。透明又分兩個方面:
- 功能方面:用戶無需修改任何代碼,做到完全無感
- 性能方面:新項目產生的表,不應該導致下游讀取時的性能下降
四、技術選型
在本項目之前,同事已經對該作業做了多輪次改進,效果不是很顯著。
嘗試過的方案包括增加資源,投入更多的機器,但遇到了雲存儲的 IOPS 限制:每個 prefix 最多支持 3000 個併發讀寫,這個問題在輸出階段尤為明顯,即多個 reducer 同時向同一個 action 子目錄輸出的時候,容易碰到這個限制。另外還嘗試了按小時預處理,然後到每日凌晨再合併成日表,但合併過程亦耗時較多,整體時延還是在 2.5 小時左右,效果不夠顯著。
鑑於服務器端的日誌是近實時上傳至雲存儲,團隊提出了流式處理的思路,摒棄了批作業等待一天、處理 3 小時的模式,而是把計算分散在一整天,進而降低當天結束後的處理用時。團隊對 Flink 有比較好的背景,加上 Flink 近期對 Hive 的改進較多,因此決定採用基於 Flink 的方案。
五、技術挑戰
挑戰是多方面的。
1. 輸出 RC 文件格式
當前 Hive 表的文件格式為 RCFile,為了保證對用戶的透明,我們只能在現有的 Hive 表上做 in-place 的 upgrade,也就是我們得重用當前表,那麼 Flink 輸出的文件格式也得符合 RCFile 格式,因為一張 Hive 表只能有一個格式。
RCFile 屬於 bulk format (相對應的是 row format),在每次 checkpoint 時必須一次性輸出。如果我們選擇 5 分鐘一次 checkpoint,那麼每個 action 每 5 分鐘必須輸出一個文件,這會大量增加結果文件數,進而影響下游的讀取性能。特別是對於低頻 action,文件數會上百倍的增加。我們瞭解了 Flink 的文件合併功能,但那是在一個 checkpoint 內多個 sink 數據的合併,這並不能解決我們的問題,我們需要的是跨 checkpoint 的文件合併。
團隊考慮過以 row format (e.g. CSV) 輸出,然後實現自定義的 Hive SerDe,使之兼容 RCFile 和 CSV。但很快我們放棄了這個設想,因為那樣的話,需要為每個查詢場景實現這個 Hybrid 的 SerDe,例如需要為 Presto 實現,為 Spark 實現,等等。
- 一方面我們沒法投入這麼多資源;
- 另一方面那種方案也是用戶有感的,畢竟用戶還是需要安裝這個自定義的 SerDe。
我們之前提出了生成一個新格式的表,但也因為對用戶不夠透明而被否決。
2. Partition 的可感知性和完整性
如何讓下游作業能感知到當天這個 partition 已經 ready?actions 表分兩級 partition, dt 和 action。action 屬於 Hive 的 dynamic partition,數量多且不固定。當前 Airflow 下游作業是等待 insert_actions 這個 Hive 任務完成後,再開始執行的。這個沒問題,因為 insert_actions 結束時,所有 action 的 partition 都已經 ready 了。但對於 Flink 作業來說,沒有結束的信號,它只能往 Hive 裡面提交一個個的 partition,如 dt=2021-05-29/action=refresh。因為 action 數量多,提交 partition 的過程可能持續數分鐘,因此我們也不能讓 Airflow 作業去感知 dt 級別的 partition,那樣很可能在只有部分 action 的情況下觸發下游。
3. 流式讀取雲存儲文件
項目的輸入是不斷上傳的雲存儲文件,並非來自 MQ (message queue)。Flink 支持 FileStreamingSource,可以流式的讀入文件,但那是基於定時 list 目錄以發現新的文件。但這個方案不適合我們的場景,因為我們的目錄太大,雲存儲 list 操作根本無法完成。
4. Exactly Once 保證
鑑於 actions 表的重要性,用戶無法接受任何的數據丟失或者重複,因此整個方案需要保證恰好一次的處理。
六、整體方案及挑戰應對
1. 輸出 RCFile 並且避免小文件
我們最終選擇的方案是分兩步走,第一個 Flink 作業以 json (row format) 格式輸出,然後用另外一個 Flink 作業去做 Json 到 RC 格式的轉化。以此解決 Flink 不能愉快的輸出合適大小 RC 文件的問題。
輸出 json 的中間結果,這樣我們可以通過 Rolling Policy 控制輸出文件的大小,可以跨多個 checkpoint 攢成足夠大,或者時間足夠長,然後再輸出到雲存儲。這裡 Flink 其實利用的是雲存儲的 Multi Part Upload (MPU) 的功能,即每次 checkpoint Flink 也是把當前 checkpoint 攢下來的數據上傳至 雲存儲,但輸出的不是文件,而是一個 part。最後當多個 part 達到大小或者時間要求,就可以調用雲存儲的接口將多個 part 合併成一個文件,這個合併操作在雲存儲端完成,應用端無需再次讀取這個 part 到本地合併然後再上傳。而 Bulk format 均需要一次性全局處理,因此無法分段上傳然後合併,必須一次性全部上傳。
當第二個作業感知到一個新的 json 文件上傳後,加載它,轉化成 RCFile,然後上傳到最終的路徑。這個過程帶來的延遲較小,一個文件可以控制在 10s 以內,這是可以接受的。
2. 優雅的感知輸入文件
輸入端,沒有采用 Flink 的 FileStreamingSource,而是採用雲存儲的 event notification 來感知新文件的產生,接受到這個通知後再主動去加載文件。
3. Partition 的可感知性和完整性
輸出端,我們輸出 dt 級別的 success file,來讓下游可靠地感知日表的 ready。我們實現自定義的 StreamingFileWriter,使之輸出 partitionCreated 和 partitionInactive 的信號,並且通過實現自定義的 PartitionCommitter,來基於上述信號判斷日表的結束。
其機制如下,每個雲存儲 writer 開始寫某個 action,會發出一個 partitionCreated 信號,當它結束時又發出 partitionInactive 信號。PartitionCommitter 判斷某一天之內是否所有的 partittion 都 inactive 了,如果是,則一天的數據都處理了,輸出 dt 級別的 success file,在 Airflow 通過感知這個文件來判斷 Flink 是否完成了日表的處理。
4. Exactly Once
雲存儲的 event notification 提供 At Least once 保證。Flink 作業內對文件級別進行去重,作業採用 Exactly Once 的 checkpoint 設定,雲存儲文件輸出基於 MPU 機制等價於支持 truncate,因此雲存儲輸出等價於冪等,因此等價於端到端的 Exactly Once。
七、項目成果和展望
項目已經上線,時延維持在 34 分鐘上下,其中包括 15 分鐘的等待遲到文件。
- 第一個 Flink 作業需要 8 分鐘左右完成 checkpoint 和輸出,json 轉 rc 作業需要 12 分鐘完成全部處理。我們可以把這個時間繼續壓縮,但是綜合時效性和成本,我們選擇當前的狀態。
- json 轉 rc 作業耗時比當初的預想的要大,因為上游作業最後一個 checkpoint 輸出太多的文件,導致整體耗時長,這個可以通過增加作業的併發度線性的下降。
- 輸出的文件數比批作業輸出的文件數有所增加,增加 50% 左右。這是流式處理於批處理的劣勢,流式處理需要在時間到達時就輸出一個文件,而此時文件大小未必達到預期。好在這個程度的文件數增加不明顯影響下游的性能。
- 做到了下游的完全透明,整個上線前後,沒有收到任何用戶異常反饋。
該項目讓我們在生產環境驗證了利用流式處理框架 Flink 來無縫介入批處理系統,實現用戶無感的局部改進。將來我們將利用同樣的技術,去加速更多其他的 Hive 表的生產,並且廣泛提供更細粒度 Hive 表示的生產,例如小時級。另一方面,我們將探索利用 data lake 來管理批流一體的數據,實現技術棧的逐步收斂。
八、後記
由於採用完全不同的計算框架,且需要與批處理系統完全保持一致,團隊踩過不少的坑,限於篇幅,無法一一列舉。因此我們挑選幾個有代表的問題留給讀者思考:
- 為了驗證新作業產出的結果與原來 Hive 產出一致,我們需要對比兩者的輸出。那麼,如何才能高效的比較兩個 Hive 表的一致性呢?特別是每天有百億級數據,每條有數百個字段,當然也包含複雜類型 (array, map, array等)。
- 兩個 Flink 作業的 checkpoint 模式都必須是 Exactly Once 嗎?哪個可以不是,哪個必須是?
- StreamFileWriter 只有在 checkpoint 時才接受到 partitionCreated 和 partitionInactive 信號,那麼我們可以在它的 snapshotState() 函數裡面輸出給下游 (下游會保存到 state) 嗎?
- 最後一問:你們有更好的方案可供我們參考嗎?
更多 Flink 相關技術問題,可掃碼加入社區釘釘交流群
第一時間獲取最新技術文章和社區動態,請關注公眾號~
活動推薦
阿里雲基於 Apache Flink 構建的企業級產品-實時計算Flink版現開啟活動:
99 元試用 實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定製T恤;另包 3 個月及以上還有 85 折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc