大數據

Flink 和 Iceberg 如何解決數據入湖面臨的挑戰

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

一、數據入湖的核心挑戰

數據實時入湖可以分成三個部分,分別是數據源、數據管道和數據湖(數倉),本文的內容將圍繞這三部分展開。

img

1. Case #1:程序 BUG 導致數據傳輸中斷

img

  • 首先,當數據源通過數據管道傳到數據湖(數倉)時,很有可能會遇到作業有 BUG 的情況,導致數據傳到一半,對業務造成影響;
  • 第二個問題是當遇到這種情況的時候,如何重起作業,並保證數據不重複也不缺失,完整地同步到數據湖(數倉)中。

2. Case #2:數據變更太痛苦

  • 數據變更

    當發生數據變更的情況時,會給整條鏈路帶來較大的壓力和挑戰。以下圖為例,原先是一個表定義了兩個字段,分別是 ID 和 NAME。此時,業務方面的同學表示需要將地址加上,以方便更好地挖掘用戶的價值。

    首先,我們需要把 Source 表加上一個列 Address,然後再把到 Kafka 中間的鏈路加上鍊,然後修改作業並重啟。接著整條鏈路得一路改過去,添加新列,修改作業並重啟,最後把數據湖(數倉)裡的所有數據全部更新,從而實現新增列。這個過程的操作不僅耗時,而且會引入一個問題,就是如何保證數據的隔離性,在變更的過程中不會對分析作業的讀取造成影響。

    img

  • 分區變更

    如下圖所示,數倉裡面的表是以 “月” 為單位進行分區,現在希望改成以 “天” 為單位做分區,這可能就需要將很多系統的數據全部更新一遍,然後再用新的策略進行分區,這個過程十分耗時。

    img

3. Case #3:越來越慢的近實時報表?

當業務需要更加近實時的報表時,需要將數據的導入週期,從 “天” 改到 “小時”,甚至 “分鐘” 級別,這可能會帶來一系列問題。

img

img

如上圖所示,首先帶來的第一個問題是:文件數以肉眼可見的速度增長,這將對外面的系統造成越來越大的壓力。壓力主要體現在兩個方面:

  • 第一個壓力是,啟動分析作業越來越慢,Hive Metastore 面臨擴展難題,如下圖所示。

    img

    • 隨著小文件越來越多,使用中心化的 Metastore 的瓶頸會越來越嚴重,這會造成啟動分析作業越來越慢,因為啟動作業的時候,會把所有的小文件原數據都掃一遍。
    • 第二是因為 Metastore 是中心化的系統,很容易碰到 Metastore 擴展難題。例如 Hive,可能就要想辦法擴後面的 MySQL,造成較大的維護成本和開銷。
  • 第二個壓力是掃描分析作業越來越慢。

    隨著小文件增加,在分析作業起來之後,會發現掃描的過程越來越慢。本質是因為小文件大量增加,導致掃描作業在很多個 Datanode 之間頻繁切換。

    img

4. Case #4:實時地分析 CDC 數據很困難

大家調研 Hadoop 裡各種各樣的系統,發現整個鏈路需要跑得又快又好又穩定,並且有好的併發,這並不容易。

  • 首先從源端來看,比如要將 MySQL 的數據同步到數據湖進行分析,可能會面臨一個問題,就是 MySQL 裡面有存量數據,後面如果不斷產生增量數據,如何完美地同步全量和增量數據到數據湖中,保證數據不多也不少。

    img

  • 此外,假設解決了源頭的全量跟增量切換,如果在同步過程中遇到異常,如上游的 Schema 變更導致作業中斷,如何保證 CDC 數據一行不少地同步到下游。

    img

  • 整條鏈路的搭建,需要涉及源頭全量跟同步的切換,包括中間數據流的串通,還有寫入到數據湖(數倉)的流程,搭建整個鏈路需要寫很多代碼,開發門檻較高。

    img

  • 最後一個問題,也是關鍵的一個問題,就是我們發現在開源的生態和系統中,很難找到高效、高併發分析 CDC 這種變更性質的數據。

    img

5. 數據入湖面臨的核心挑戰

  • 數據同步任務中斷

    • 無法有效隔離寫入對分析的影響;
    • 同步任務不保證 exactly-once 語義。
  • 端到端數據變更

    • DDL 導致全鏈路更新升級複雜;
    • 修改湖/倉中存量數據困難。
  • 越來越慢的近實時報表

    • 頻繁寫入產生大量小文件;
    • Metadata 系統壓力大, 啟動作業慢;
    • 大量小文件導致數據掃描慢。
  • 無法近實時分析 CDC 數據

    • 難以完成全量到增量同步的切換;
    • 涉及端到端的代碼開發,門檻高;
    • 開源界缺乏高效的存儲系統。

二、Apache Iceberg 介紹

1. Netflix:Hive 上雲痛點總結

Netflix 做 Iceberg 最關鍵的原因是想解決 Hive 上雲的痛點,痛點主要分為以下三個方面:

1.1 痛點一:數據變更和回溯困難

  1. 不提供 ACID 語義。在發生數據改動時,很難隔離對分析任務的影響。典型操作如:INSERT OVERWRITE;修改數據分區;修改 Schema;
  2. 無法處理多個數據改動,造成衝突問題;
  3. 無法有效回溯歷史版本。

1.2 痛點二:替換 HDFS 為 S3 困難

  1. 數據訪問接口直接依賴 HDFS API;
  2. 依賴 RENAME 接口的原子性,這在類似 S3 這樣的對象存儲上很難實現同樣的語義;
  3. 大量依賴文件目錄的 list 接口,這在對象存儲系統上很低效。

1.3 痛點三:太多細節問題

  1. Schema 變更時,不同文件格式行為不一致。不同 FileFormat 甚至連數據類型的支持都不一致;
  2. Metastore 僅維護 partition 級別的統計信息,造成不 task plan 開銷; Hive Metastore 難以擴展;
  3. 非 partition 字段不能做 partition prune。

2. Apache Iceberg 核心特性

  • 通用化標準設計

    • 完美解耦計算引擎
    • Schema 標準化
    • 開放的數據格式
    • 支持 Java 和 Python
  • 完善的 Table 語義

    • Schema 定義與變更
    • 靈活的 Partition 策略
    • ACID 語義
    • Snapshot 語義
  • 豐富的數據管理

    • 存儲的流批統一
    • 可擴展的 META 設計支持
    • 批更新和 CDC
    • 支持文件加密
  • 性價比

    • 計算下推設計
    • 低成本的元數據管理
    • 向量化計算
    • 輕量級索引

3. Apache Iceberg File Layout

img

上方為一個標準的 Iceberg 的 TableFormat 結構,核心分為兩部分,一部分是 Data,一部分是 Metadata,無論哪部分都是維護在 S3 或者是 HDFS 之上的。

4. Apache Iceberg Snapshot View

img

上圖為 Iceberg 的寫入跟讀取的大致流程。

可以看到這裡面分三層:

  • 最上面黃色的是快照;
  • 中間藍色的是 Manifest;
  • 最下面是文件。

每次寫入都會產生一批文件,一個或多個 Manifest,還有快照。

比如第一次形成了快照 Snap-0,第二次形成快照 Snap-1,以此類推。但是在維護原數據的時候,都是增量一步一步做追加維護的。

這樣的話可以幫助用戶在一個統一的存儲上做批量的數據分析,也可以基於存儲之上去做快照之間的增量分析,這也是 Iceberg 在流跟批的讀寫上能夠做到一些支持的原因。

5. 選擇 Apache Iceberg 的公司

img

上圖為目前在使用 Apache Iceberg 的部分公司,國內的例子大家都較為熟悉,這裡大致介紹一下國外公司的使用情況。

  • NetFlix 現在是有數百PB的數據規模放到 Apache Iceberg 之上,Flink 每天的數據增量是上百T的數據規模。
  • Adobe 每天的數據新增量規模為數T,數據總規模在幾十PB左右。
  • AWS 把 Iceberg 作為數據湖的底座。
  • Cloudera 基於 Iceberg 構建自己整個公有云平臺,像 Hadoop 這種 HDFS 私有化部署的趨勢在減弱,上雲的趨勢逐步上升,Iceberg 在 Cloudera 數據架構上雲的階段中起到關鍵作用。
  • 蘋果有兩個團隊在使用:

    • 一是整個 iCloud 數據平臺基於 Iceberg 構建;
    • 二是人工智能語音服務 Siri,也是基於 Flink 跟 Iceberg 來構建整個數據庫的生態。

三、Flink 和 Iceberg 如何解決問題

回到最關鍵的內容,下面闡述 Flink 和 Iceberg 如何解決第一部分所遇到的一系列問題。

1. Case #1:程序 BUG 導致數據傳輸中斷

img

首先,同步鏈路用 Flink,可以保證 exactly once 的語義,當作業出現故障時,能夠做嚴格的恢復,保證數據的一致性。

img

第二個是 Iceberg,它提供嚴謹的 ACID 語義,可以幫用戶輕鬆隔離寫入對分析任務的不利影響。

2. Case #2:數據變更太痛苦

img

img

如上所示,當發生數據變更時,用 Flink 和 Iceberg 可以解決這個問題。

Flink 可以捕捉到上游 Schema 變更的事件,然後把這個事件同步到下游,同步之後下游的 Flink 直接把數據往下轉發,轉發之後到存儲,Iceberg 可以瞬間把 Schema 給變更掉。

當做 Schema 這種 DDL 的時候,Iceberg 直接維護了多個版本的 Schema,然後老的數據源完全不動,新的數據寫新的 Schema,實現一鍵 Schema 隔離。

img

另外一個例子是分區變更的問題,Iceberg 做法如上圖所示。

之前按 “月” 做分區(上方黃色數據塊),如果希望改成按 “天” 做分區,可以直接一鍵把 Partition 變更,原來的數據不變,新的數據全部按 “天” 進行分區,語義做到 ACID 隔離。

3. Case #3:越來越慢的近實時報表?

img

img

第三個問題是小文件對 Metastore 造成的壓力。

首先對於 Metastore 而言,Iceberg 是把原數據統一存到文件系統裡,然後用 metadata 的方式維護。整個過程其實是去掉了中心化的 Metastore,只依賴文件系統擴展,所以擴展性較好。

img

img

另一個問題是小文件越來越多,導致數據掃描會越來越慢。在這個問題上,Flink 和 Iceberg 提供了一系列解決方案:

  • 第一個方案是在寫入的時候優化小文件的問題,按照 Bucket 來 Shuffle 方式寫入,因為 Shuffle 這個小文件,寫入的文件就自然而然的小。
  • 第二個方案是批作業定期合併小文件。
  • 第三個方案相對智能,就是自動增量地合併小文件。

4. Case #4:實時地分析CDC數據很困難

img

img

  • 首先是是全量跟增量數據同步的問題,社區其實已有 Flink CDC Connected 方案,就是說 Connected 能夠自動做全量跟增量的無縫銜接。

img

img

  • 第二個問題是在同步過程中,如何保證 Binlog 一行不少地同步到湖中, 即使中間碰到異常。

    對於這個問題,Flink 在 Engine 層面能夠很好地識別不同類型的事件,然後藉助 Flink 的 exactly once 的語義,即使碰到故障,它也能自動做恢復跟處理。

img

img

  • 第三個問題是搭建整條鏈路需要做不少代碼開發,門檻太高。

    在用了 Flink 和 Data Lake 方案後,只需要寫一個 source 表和 sink 表,然後一條 INSERT INTO,整個鏈路就可以打通,無需寫任何業務代碼。

img

  • 最後是存儲層面如何支持近實時的 CDC 數據分析。

四、社區 Roadmap

img

上圖為 Iceberg 的 Roadmap,可以看到 Iceberg 在 2019 年只發了一個版本, 卻在 2020 年直接發了三個版本,並在 0.9.0 版本就成為頂級項目。

img

上圖為 Flink 與 Iceberg 的 Roadmap,可以分為 4 個階段。

  • 第一個階段是 Flink 與 Iceberg 建立連接。
  • 第二階段是 Iceberg 替換 Hive 場景。在這個場景下,有很多公司已經開始上線,落地自己的場景。
  • 第三個階段是通過 Flink 與 Iceberg 解決更復雜的技術問題。
  • 第四個階段是把這一套從單純的技術方案,到面向更完善的產品方案角度去做。

活動推薦

阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版現開啟6月限時活動:
0元試用實時計算 Flink 全託管版本(包年包月、10CU)即可有機會獲得 Flink 獨家定製T恤;另包3個月及以上還有85折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *