大數據

字節跳動基於 Flink 的 MQ-Hive 實時數據集成

在數據中臺建設過程中,一個典型的數據集成場景是將 MQ (Message Queue,例如 Kafka、RocketMQ 等)的數據導入到 Hive 中,以供下游數倉建設以及指標統計。由於 MQ-Hive 是數倉建設第一層,因此對數據的準確性以及實時性要求比較高。

本文主要圍繞 MQ-Hive 場景,針對目前字節跳動內已有解決方案的痛點,提出基於 Flink 的實時解決方案,並介紹新方案在字節跳動內部的使用現狀。

已有方案及痛點

字節跳動內已有解決方案如下圖所示,主要分了兩個步驟:

  1. 通過 Dump 服務將 MQ 的數據寫入到 HDFS 文件
  2. 再通過 Batch ETL 將 HDFS 數據導入到 Hive 中,並添加 Hive 分區

1.jpg

痛點

  1. 任務鏈較長,原始數據需要經過多次轉換最終才能進入 Hive
  2. 實時性比較差,Dump Service、Batch ETL 延遲都會導致最終數據產出延遲
  3. 存儲、計算開銷大,MQ 數據重複存儲和計算
  4. 基於原生 Java 打造,數據流量持續增長後,存在單點故障和機器負載不均衡等問題
  5. 運維成本較高,架構上無法複用公司內 Hadoop/Flink/Yarn 等現有基礎設施
  6. 不支持異地容災

基於 Flink 實時解決方案

優勢

針對目前公司傳統解決方案的痛點,我們提出基於 Flink 的實時解決方案,將 MQ 的數據實時寫入到 Hive,並支持事件時間以及 Exactly Once 語義。相比老方案,新方案優勢如下所示:

  1. 基於流式引擎 Flink 開發,支持 Exactly Once 語義
  2. 實時性更高,MQ 數據直接進入 Hive,無中間計算環節
  3. 減少中間存儲,整個流程數據只會落地一次
  4. 支撐 Yarn 部署模式,方便用戶遷移
  5. 資源管理彈性,方便擴容以及運維
  6. 支持雙機房容災

整體架構

整體架構如下圖所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模塊,具體功能如下:

  1. DTS Source 接入不同 MQ 數據源,支持 Kafka、RocketMQ 等
  2. DTS Sink 將數據輸出到目標數據源,支持 HDFS、Hive 等
  3. DTS Core 貫穿整個數據同步流程,通過 Source 讀取源端數據,經過 DTS Framework 處理,最後通過 Sink 將數據輸出到目標端。
  4. DTS Framework 集成類型系統、文件切分、Exactly Once、任務信息採集、事件時間、髒數據收集等核心功能
  5. 支持 Yarn 部署模式,資源調度、管理比較彈性

2.jpg
(DTS Dump架構圖)

Exactly Once

Flink 框架通過 Checkpoint 機制,能夠提供 Exactly Once 或者 At Least Once 語義。為了實現 MQ-Hive 全鏈路支持 Exactly-once 語義,還需要 MQ Source、Hive Sink 端支持 Exactly Once 語義。本文通過 Checkpoint + 2PC 協議實現,具體過程如下:

  1. 數據寫入時,Source 端從上游 MQ 拉取數據併發送到 Sink 端;Sink 端將數據寫入到臨時目錄中
  2. Checkpoint Snapshot 階段,Source 端將 MQ Offset 保存到 State 中;Sink 端關閉寫入的文件句柄,並保存當前 Checkpoint ID 到 State 中;
  3. Checkpoint Complete 階段,Source 端 Commit MQ Offset;Sink 端將臨時目錄中的數據移動到正式目錄下
  4. Checkpoint Recover 階段,加載最新一次成功的 Checkpoint 目錄並恢復 State 信息,其中 Source 端將 State 中保存的 MQ Offset 作為起始位置;Sink 端恢復最新一次成功的 Checkpoint ID,並將臨時目錄的數據移動到正式目錄下

■ 實現優化

在實際使用場景中,特別是大併發場景下,HDFS 寫入延遲容易有毛刺,因為個別 Task Snapshot 超時或者失敗,導致整個 Checkpoint 失敗的問題會比較明顯。因此針對 Checkpoint 失敗,提高系統的容錯性以及穩定性就比較重要。

這裡充分利用 Checkpoint ID 嚴格單調遞增的特性,每一次做 Checkpoint 時,當前 Checkpoint ID 一定比以前大,因此在 Checkpoint Complete 階段,可以提交小於等於當前 Checkpoint ID 的臨時數據。具體優化策略如下:

  1. Sink 端臨時目錄為{dump_path}/{next_cp_id},這裡 next_cp_id 的定義是當前最新的 cp_id + 1
  2. Checkpoint Snapshot 階段,Sink 端保存當前最新 cp_id 到 State,同時更新 next_cp_id 為 cp_id + 1
  3. Checkpoint Complete 階段,Sink 端將臨時目錄中所有小於等於當前 cp_id 的數據移動到正式目錄下
  4. Checkpoint Recover 階段,Sink 端恢復最新一次成功的 cp_id,並將臨時目錄中小於等於當前 cp_id 的數據移動到正式目錄下

類型系統

由於不同數據源支持的數據類型不一樣,為了解決不同數據源間的數據同步以及不同類型轉換兼容的問題,我們支持了 DTS 類型系統,DTS 類型可細化為基礎類型和複合類型,其中複合類型支持類型嵌套,具體轉換流程如下:

  1. 在 Source 端,將源數據類型,統一轉成系統內部的 DTS 類型
  2. 在 Sink 端,將系統內部的 DTS 類型轉換成目標數據源類型
  3. 其中 DTS 類型系統支持不同類型間的相互轉換,比如 String 類型與 Date 類型的相互轉換

3.jpg
(DTS Dump架構圖)

Rolling Policy

Sink 端是併發寫入,每個 Task 處理的流量不一樣,為了避免生成太多的小文件或者生成的文件過大,需要支持自定義文件切分策略,以控制單個文件的大小。目前支持三種文件切分策略:文件大小、文件最長未更新時間、Checkpoint。

■ 優化策略

Hive 支持 Parquet、Orc、Text 等多種存儲格式,不同的存儲格式數據寫入過程不太一樣,具體可以分為兩大類:

  1. RowFormat:基於單條寫入,支持按照 Offset 進行 HDFS Truncate 操作,例如 Text 格式
  2. BulkFormat:基於 Block 寫入,不支持 HDFS Truncate 操作,例如 Parquet、ORC 格式

為了保障 Exactly Once 語義,並同時支持 Parquet、Orc、Text 等多種格式,在每次 Checkpoint 時,強制做文件切分,保證所有寫入的文件都是完整的,Checkpoint 恢復時不用做 Truncate 操作。

容錯處理

理想情況下流式任務會一直運行不需要重啟,但實際不可避免會遇到以下幾個場景:

  • Flink 計算引擎升級,需要重啟任務
  • 上游數據增加,需要調整任務併發度
  • Task Failover

■ 併發度調整

目前 Flink 原生支持 State Rescale。具體實現中,在 Task 做 Checkpoint Snapshot 時,將 MQ Offset 保存到 ListState 中;Job 重啟後,Job Master 會根據 Operator 併發度,將 ListState 平均分配到各個 Task 上。

■ Task Failover

由於網絡抖動、寫入超時等外部因素的影響,Task 不可避免會出現寫入失敗,如何快速、準確的做 Task Failover 就顯得比較重要。目前 Flink 原生支持多種 Task Failover 策略,本文使用 Region Failover 策略,將失敗 Task 所在 Region 的所有 Task 都重啟。

異地容災

■ 背景

大數據時代,數據的準確性和實時性顯得尤為重要。本文提供多機房部署及異地容災解決方案,當主機房因為斷網、斷電、地震、火災等原因暫時無法對外提供服務時,能快速將服務切換到備災機房,並同時保障 Exactly Once 語義。

■ 容災組件

整體解決方案需要多個容災組件一起配合實現,容災組件如下圖所示,主要包括 MQ、YARN、HDFS,具體如下:

  1. MQ 需要支持多機房部署,當主機房故障時,能將 Leader 切換到備機房,以供下游消費
  2. Yarn 集群在主機房、備機房都有部署,以便 Flink Job 遷移
  3. 下游 HDFS 需要支持多機房部署,當主機房故障時,能將 Master 切換到備機房
  4. Flink Job 運行在 Yarn 上,同時任務 State Backend 保存到 HDFS,通過 HDFS 的多機房支持保障 State Backend 的多機房

4.jpg

■ 容災過程

整體容災過程如下所示:

  1. 正常情況下,MQ Leader 以及 HDFS Master 部署在主機房,並將數據同步到備機房。同時 Flink Job 運行在主機房,並將任務 State 寫入到 HDFS 中,注意 State 也是多機房部署模式
  2. 災難情況下,MQ Leader 以及 HDFS Master 從主機房遷移到備災機房,同時 Flink Job 也遷移到備災機房,並通過 State 恢復災難前的 Offset 信息,以提供 Exactly Once 語義

5.jpg
6.jpg

事件時間歸檔

■ 背景

在數倉建設中,處理時間(Process Time)和事件時間(Event Time)的處理邏輯不太一樣,對於處理時間會將數據寫到當前系統時間所對應的時間分區下;對於事件時間,則是根據數據的生產時間將數據寫到對應時間分區下,本文也簡稱為歸檔。

在實際場景中,不可避免會遇到各種上下游故障,並在持續一段時間後恢復,如果採用 Process Time 的處理策略,則事故期間的數據會寫入到恢復後的時間分區下,最終導致分區空洞或者數據漂移的問題;如果採用歸檔的策略,會按照事件時間寫入,則沒有此類問題。

由於上游數據事件時間會存在亂序,同時 Hive 分區生成後就不應該再繼續寫入,因此實際寫入過程中不可能做到無限歸檔,只能在一定時間範圍內歸檔。歸檔的難點在於如何確定全局最小歸檔時間以及如何容忍一定的亂序。

■ 全局最小歸檔時間

Source 端是併發讀取,並且一個 Task 可能同時讀取多個 MQ Partition 的數據,對於 MQ 的每一個 Parititon 會保存當前分區歸檔時間,取分區中最小值作為 Task 的最小歸檔時間,最終取 Task 中最小值,作為全局最小歸檔時間。

7.jpg

■ 亂序處理

為了支持亂序的場景,會支持一個歸檔區間的設置,其中 Global Min Watermark 為全局最小歸檔時間,Partition Watermark 為分區當前歸檔時間,Partition Min Watermark 為分區最小歸檔時間,只有當事件時間滿足以下條件時,才會進行歸檔:

  1. 事件時間大於全局最小歸檔時間
  2. 事件時間大於分區最小歸檔時間

8.jpg

Hive 分區生成

■ 原理

Hive 分區生成的難點在於如何確定分區的數據是否就緒以及如何添加分區。由於 Sink 端是併發寫入,同時會有多個 Task 寫同一個分區數據,因此只有當所有 Task 分區數據寫入完成,才能認為分區數據是就緒,本文解決思路如下:

  1. 在 Sink 端,對於每個 Task 保存當前最小處理時間,需要滿足單調遞增的特性
  2. 在 Checkpoint Complete 時,Task 上報最小處理時間到 JM 端
  3. JM 拿到所有 Task 的最小處理時間後,可以得到全局最小處理時間,並以此作為 Hive 分區的最小就緒時間
  4. 當最小就緒時間更新時,可判斷是否添加 Hive 分區

10.jpg

■ 動態分區

動態分區是根據上游輸入數據的值,確定數據寫到哪個分區目錄,而不是寫到固定分區目錄,例如 date={date}/hour={hour}/app={app}的場景,根據分區時間以及 app 字段的值確定最終的分區目錄,以實現每個小時內,相同的 app 數據在同一個分區下。

在靜態分區場景下,每個 Task 每次只會寫入一個分區文件,但在動態分區場景下,每個 Task 可能同時寫入多個分區文件。對於 Parque 格式的寫入,會先將數據寫到做本地緩存,然後批次寫入到 Hive,當 Task 同時處理的文件句柄過多時,容易出現 OOM。為了防止單 Task OOM,會週期性對文件句柄做探活檢測,及時釋放長時間沒有寫入的文件句柄。

11.jpg

Messenger

Messenger 模塊用於採集 Job 運行狀態信息,以便衡量 Job 健康度以及大盤指標建設。

■ 元信息採集

元信息採集的原理如下所示,在 Sink 端通過 Messenger 採集 Task 的核心指標,例如流量、QPS、髒數據、寫入 Latency、事件時間寫入效果等,並通過 Messenger Collector 彙總。其中髒數據需要輸出到外部存儲中,任務運行指標輸出到 Grafana,用於大盤指標展示。

12.jpg

■ 髒數據收集

數據集成場景下,不可避免會遇到髒數據,例如類型配置錯誤、字段溢出、類型轉換不兼容等場景。對於流式任務來說,由於任務會一直運行,因此需要能夠實時統計髒數據流量,並且將髒數據保存到外部存儲中以供排查,同時在運行日誌中採樣輸出。

■ 大盤監控

大盤指標覆蓋全局指標以及單個 Job 指標,包括寫入成功流量和 QPS、寫入 Latency、寫入失敗流量和 QPS、歸檔效果統計等,具體如下圖所示:

13.jpg
14.jpg

未來規劃

基於 Flink 實時解決方案目前已在公司上線和推廣,未來主要關注以下幾個方面:

  1. 數據集成功能增強,支持更多數據源的接入,支持用戶自定義數據轉換邏輯等
  2. Data Lake 打通,支持 CDC 數據實時導入
  3. 流批架構統一,支持全量、增量場景數據集成
  4. 架構升級,支持更多部署環境,比如 K8S
  5. 服務化完善,降低用戶接入成本

總結

隨著字節跳動業務產品逐漸多元化快速發展,字節跳動內部一站式大數據開發平臺功能也越來越豐富,並提供離線、實時、全量、增量場景下全域數據集成解決方案,從最初的幾百個任務規模增長到數萬級規模,日處理數據達到 PB 級,其中基於 Flink 實時解決方案目前已在公司內部大力推廣和使用,並逐步替換老的 MQ-Hive 鏈路。

參考文獻:

  1. Real-time Exactly-once ETL with Apache Flink
    http://shzhangji.com/blog/2018/12/23/real-time-exactly-once-etl-with-apache-flink/
  2. Implementing the Two-Phase Commit Operator in Flink
    https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
  3. A Deep Dive into Rescalable State in Apache Flink
    https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
  4. Data Streaming Fault Tolerance
    https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html

Leave a Reply

Your email address will not be published. Required fields are marked *