大數據

基於 Flink+Iceberg 構建企業級實時數據湖

Apache Flink 是大數據領域非常流行的流批統一的計算引擎,數據湖是順應雲時代發展潮流的新型技術架構。那麼當 Apache Flink 遇見數據湖時,會碰撞出什麼樣的火花呢?本次分享主要包括以下核心內容:

  1. 數據湖的相關背景介紹;
  2. 經典業務場景介紹;
  3. 為什麼選擇 Apache Iceberg;
  4. 如何通過 Flink+Iceberg 實現流式入湖
  5. 社區未來規劃工作。

視頻回顧:https://www.bilibili.com/video/BV14A411J7e6?p=4

數據湖的相關背景介紹

數據湖是個什麼概念呢?一般來說我們把一家企業產生的數據都維護在一個平臺內,這個平臺我們就稱之為“數據湖”。

看下面這幅圖,這個湖的數據來源多種多樣,有的可能是結構化數據,有的可能是非結構數據,有的甚至是二進制數據。有一波人站在湖的入口,用設備在檢測水質,這對應著數據湖上的流處理作業;有一批抽水機從湖裡面抽水,這對應著數據湖的批處理作業;還有一批人在船頭釣魚或者在岸上捕魚,這對應著數據科學家從數據湖中通過機器學習的手段來提取數據價值。

1.jpg

  1. 我們總結起來,其實數據湖主要有 4 個方面的特點。
  2. 第一個特點是存儲原始數據,這些原始數據來源非常豐富;
  3. 第二個特點是支持多種計算模型;
  4. 第三個特點是有完善的數據管理能力,要能做到多種數據源接入,實現不同數據之間的連接,支持 schema 管理和權限管理等;
  5. 第四個特點是靈活的底層存儲,一般用 ds3、oss、hdfs 這種廉價的分佈式文件系統,採用特定的文件格式和緩存,滿足對應場景的數據分析需求。

2.jpg

那麼開源數據湖架構一般是啥樣的呢?這裡我畫了一個架構圖,主要分為四層:

  1. 最底下是分佈式文件系統,雲上用戶 S3 和 oss 這種對象存儲會用的更多一些,畢竟價格便宜很多;非雲上用戶一般採用自己維護的 HDFS。
  2. 第二層是數據加速層。數據湖架構是一個存儲計算徹底分離的架構,如果所有的數據訪問都遠程讀取文件系統上的數據,那麼性能和成本開銷都很大。如果能把經常訪問到的一些熱點數據緩存在計算節點本地,這就非常自然的實現了冷熱分離,一方面能收穫到不錯的本地讀取性能,另一方面還節省了遠程訪問的帶寬。這一層裡面,我們一般會選擇開源的 alluxio,或者選擇阿里雲上的 Jindofs。
  3. 第三層就是 Table format 層,主要是把一批數據文件封裝成一個有業務意義的 table,提供 ACID、snapshot、schema、partition 等表級別的語義。一般對應這開源的 Delta、Iceberg、Hudi 等項目。對一些用戶來說,他們認為Delta、Iceberg、Hudi 這些就是數據湖,其實這幾個項目只是數據湖這個架構裡面的一環,只是因為它們離用戶最近,屏蔽了底層的很多細節,所以才會造成這樣的理解。
  4. 最上層就是不同計算場景的計算引擎了。開源的一般有 Spark、Flink、Hive、Presto、Hive MR 等,這一批計算引擎是可以同時訪問同一張數據湖的表的。

3.jpg

經典業務場景介紹

那麼,Flink 和數據湖結合可以有哪些經典的應用場景呢?這裡我們探討業務場景時默認選型了 Apache Iceberg 來作為我們的數據湖選型,後面一節會詳細闡述選型背後的理由。

4.jpg

首先,Flink+Iceberg 最經典的一個場景就是構建實時的 Data Pipeline。業務端產生的大量日誌數據,被導入到 Kafka 這樣的消息隊列。運用 Flink 流計算引擎執行 ETL後,導入到 Apache Iceberg 原始表中。有一些業務場景需要直接跑分析作業來分析原始表的數據,而另外一些業務需要對數據做進一步的提純。那麼我們可以再新起一個 Flink 作業從 Apache Iceberg 表中消費增量數據,經過處理之後寫入到提純之後的 Iceberg 表中。此時,可能還有業務需要對數據做進一步的聚合,那麼我們繼續在iceberg 表上啟動增量 Flink 作業,將聚合之後的數據結果寫入到聚合表中。

有人會想,這個場景好像通過 Flink+Hive 也能實現。 Flink+Hive 的確可以實現,但寫入到 Hive 的數據更多地是為了實現數倉的數據分析,而不是為了做增量拉取。一般來說,Hive 的增量寫入以 partition 為單位,時間是 15min 以上,Flink 長期高頻率地寫入會造成 partition 膨脹。而 Iceberg 容許實現 1 分鐘甚至 30秒的增量寫入,這樣就可以大大提高了端到端數據的實時性,上層的分析作業可以看到更新的數據,下游的增量作業可以讀取到更新的數據。

5.jpg

第二個經典的場景,就是可以用 Flink+Iceberg 來分析來自 MySQL 等關係型數據庫的 binlog 等。一方面,Apache Flink 已經原生地支持 CDC 數據解析,一條 binlog 數據通過 ververica flink-cdc-connector 拉取之後,自動轉換成 Flink Runtime 能識別的 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 四種消息,供用戶做進一步的實時計算。

另外一方面,Apache Iceberg 已經較為完善地實現了 equality delete 功能,也就是用戶定義好待刪除的 Record,直接寫到 Apache Iceberg 表內就可以刪除對應的行,本身就是為了實現數據湖的流式刪除。在 Iceberg 未來的版本中,用戶將不需要設計任何額外的業務字段,不用寫幾行代碼就可以完成 binlog 流式入湖到 Apache Iceberg(社區的這個 Pull Request 已經提供了一個 flink 寫入 CDC 數據的原型)。

此外,CDC 數據成功入湖 Iceberg 之後,我們還會打通常見的計算引擎,例如 Presto、Spark、Hive 等,他們都可以實時地讀取到 Iceberg 表中的最新數據。

6.jpg

第三個經典場景是近實時場景的流批統一。在常用的 lambda 架構中,我們有一條實時鏈路和一條離線鏈路。實時鏈路一般由 Flink、Kafka、HBase 這些組件構建而成,而離線鏈路一般會用到 Parquet、Spark 等組件構建。這裡面涉及到計算組件和存儲組件都非常多,系統維護成本和業務開發成本都非常高。有很多場景,他們的實時性要求並沒有那麼苛刻,例如可以放鬆到分鐘級別,這種場景我們稱之為近實時場景。那麼,我們是不是可以通過 Flink + Iceberg 來優化我們常用的 lambda 架構呢?

7.jpg

我們可以用 Flink+Iceberg 把整個架構優化成上圖所示。實時的數據通過 Flink 寫入到 Iceberg 表中,近實時鏈路依然可以通過flink計算增量數據,離線鏈路也可以通過 flink 批計算讀取某個快照做全局分析,得到對應的分析結果,供不同場景下的用戶讀取和分析。經過這種改進之後,我們把計算引擎統一成了 Flink,把存儲組件統一成了 Iceberg,整個系統的維護開發成本大大降低。

8.jpg

第四個場景,是採用 Iceberg 全量數據和 Kafka 的增量數據來 Bootstrap 新的 Flink 作業。我們現有的流作業在線上跑著,突然有一天某個業務方跑過來說,他們遇到一個新的計算場景,需要設計一個新的 Flink 作業,跑一遍去年一年的歷史數據,跑完之後再對接到正在產生的 Kafka 增量數據。那麼這時候應該怎麼辦呢?

我們依然可以採用常見的 lambda 架構,離線鏈路通過 kafka->flink->iceberg 同步寫入到數據湖,由於 Kafka 成本較高,保留最近 7 天數據即可,Iceberg 存儲成本較低,可以存儲全量的歷史數據(按照 checkpoint 拆分成多個數據區間)。啟動新 Flink 作業的時候,只需要去拉 Iceberg 的數據,跑完之後平滑地對接到 kafka 數據即可。

9.jpg

第五個場景和第四個場景有點類似。同樣是在 lambda 架構下,實時鏈路由於事件丟失或者到達順序的問題,可能導致流計算端結果不一定完全準確,這時候一般都需要全量的歷史數據來訂正實時計算的結果。而我們的 Iceberg 可以很好地充當這個角色,因為它可以高性價比地管理好歷史數據。

為什麼選擇 Apache Iceberg

回到上一節遺留的一個問題,為什麼當時 Flink 在眾多開源數據湖項目中會選擇 Apache Iceberg 呢?

10.jpg

我們當時詳細地調研了 Delta、Hudi、Iceberg 三個開源項目,並寫了一篇調研報告。我們發現 Delta 和 Hudi 跟 Spark 的代碼路徑綁定太深,尤其是寫入路徑。畢竟當時這兩個項目設計之初,都多多少少把 Spark 作為的他們默認的計算引擎了。而Apache Iceberg 的方向非常堅定,宗旨就是要做一個通用化設計的 Table Format。因此,它完美地解耦了計算引擎和底下的存儲系統,便於接入多樣化計算引擎和文件格式,可以說正確地完成了數據湖架構中的 Table Format 這一層的實現。我們認為它也更容易成為 Table Format 層的開源事實標準。

另外一方面,Apache Iceberg 正在朝著流批一體的數據湖存儲層發展,manifest 和snapshot 的設計,有效地隔離不同 transaction 的變更,非常方便批處理和增量計算。而我們知道 Apache Flink 已經是一個流批一體的計算引擎,可以說這二者的長遠規劃完美匹配,未來二者將合力打造流批一體的數據湖架構。

最後,我們還發現 Apache Iceberg 這個項目背後的社區資源非常豐富。在國外, Netflix、Apple、Linkedin、Adobe 等公司都有 PB 級別的生產數據運行在 Apache Iceberg 上;在國內,騰訊這樣的巨頭也有非常龐大的數據跑在 Apache Iceberg 之上,他們最大的一個業務每天有幾十T的增量數據寫入到 Apache Iceberg。社區成員同樣非常資深和多樣化,擁有來自其他項目的 7 位 Apache PMC,1 為 VP。體現在代碼和設計的 review 上,就變得非常苛刻,一個稍微大一點的 PR 涉及 100+ 的comment 很常見。在我個人看來,這些都使得 Apache Iceberg 的設計+代碼質量比較高。

正式基於以上考慮,Apache Flink 最終選擇了 Apache Iceberg 作為第一個數據湖接入項目。

如何通過 Flink+Iceberg 實現流式入湖

目前,我們已經在 Apache Iceberg 0.10.0 版本上實現 Flink 流批入湖功能,同時還支持 Flink 批作業查詢 Iceberg 數據湖的數據。具體關於 Flink 如何讀寫 Apache Iceberg 表,可以參考 Apache Iceberg 社區的使用文檔,這裡不再贅述。

https://github.com/apache/iceberg/blob/master/site/docs/flink.md

下面來簡要闡述下 Flink iceberg sink 的設計原理:由於 Iceberg 採用樂觀鎖的方式來實現 Transaction 的提交,也就是說兩個人同時提交更改事務到 Iceberg 時,後開始的一方會不斷重試,等先開始的一方順利提交之後再重新讀取 metadata 信息提交 transaction。考慮到這一點,採用多個併發算子去提交 transaction 是不合適的,容易造成大量事務衝突,導致重試。

所以,我們把 Flink 寫入流程拆成了兩個算子,一個叫做 IcebergStreamWriter,主要用來寫入記錄到對應的 avro、parquet、orc 文件,生成一個對應的 Iceberg DataFile,併發送給下游算子;另外一個叫做 IcebergFilesCommitter,主要用來在 checkpoint 到來時把所有的 DataFile 文件收集起來,並提交 Transaction 到 Apache iceberg,完成本次 checkpoint 的數據寫入。

11.jpg

理解了 Flink Sink 算子的設計後,下一個比較重要的問題就是:如何正確地設計兩個算子的 state ?

首先,IcebergStreamWriter 的設計比較簡單,主要任務是把記錄轉換成 DataFile,並沒有複雜的 State 需要設計。IcebergFilesCommitter 相對複雜一點,它為每個checkpointId 維護了一個 DataFile 文件列表,即 map>,這樣即使中間有某個 checkpoint的transaction 提交失敗了,它的 DataFile 文件仍然維護在 State 中,依然可以通過後續的 checkpoint 來提交數據到 Iceberg 表中。

社區未來規劃工作等

Apache Iceberg 0.10.0 版本的發佈,已經拉開集成 Flink 和 Iceberg 的序幕。在未來的 Apache Iceberg 0.11.0 和 0.12.0 版本中,我們規劃了更多高級功能及特性。

對於 Apache 0.11.0 版本來說,主要解決兩個問題:

第一個事情是小文件合併的問題,當然 Apache Iceberg 0.10.0 版本已經支持了Flink 批作業定時去合併小文件,這個功能還相對較為初級。在 0.11.0 版本中,我們將設計自動合併小文件功能,簡單來說就是在 Flink checkpoint 到達,觸發 Apache Iceberg transaction 提交後,有一個專門的算子,專門負責處理小文件的合併工作。

第二個事情是 Flink streaming reader 的開發,目前我們已經在私有倉庫做了一些 PoC 工作,在未來的時間內我們將貢獻到 Apache Iceberg 社區。

對於 0.12.0 版本來說,主要解決 row-level delete 的問題。如前面提到,我們已經在 PR 1663 中實現 Flink UPSERT 更新數據湖的全鏈路打通。後續在社區達成一致之後,將逐步推動該功能到社區版本。到時候用戶將能通過 Flink 完成 CDC 數據的實時寫入和分析,也可以方便地把 Flink 的聚合結果 upsert 到 Apache Iceberg 內。

作者介紹:

胡爭(子毅),阿里巴巴技術專家,目前主要負責 Flink 數據湖方案的設計和開發工作,Apache Iceberg 及 Apache Flink 項目的長期活躍貢獻者,《HBase 原理與實踐》作者。

開發者社區二維碼.png

Leave a Reply

Your email address will not be published. Required fields are marked *