雲計算

Flink + Iceberg + 對象存儲,構建數據湖方案

本文整理自 Dell 科技集團高級軟件研發經理孫偉在 4 月 17 日 上海站 Flink Meetup 分享的《Iceberg 和對象存儲構建數據湖方案》,文章內容為:

  1. 數據湖和 Iceberg 簡介
  2. 對象存儲支撐 Iceberg 數據湖
  3. 演示方案
  4. 存儲優化的一些思考

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

一、數據湖和 Iceberg 簡介

1. 數據湖生態

img

如上圖所示,對於一個成熟的數據湖生態而言:

  • 首先我們認為它底下應具備海量存儲的能力,常見的有對象存儲,公有云存儲以及 HDFS;
  • 在這之上,也需要支持豐富的數據類型,包括非結構化的圖像視頻,半結構化的 CSV、XML、Log,以及結構化的數據庫表;
  • 除此之外,需要高效統一的元數據管理,使得計算引擎可以方便地索引到各種類型數據來做分析。
  • 最後,我們需要支持豐富的計算引擎,包括 Flink、Spark、Hive、Presto 等,從而方便對接企業中已有的一些應用架構。

2. 結構化數據在數據湖上的應用場景

img

上圖為一個典型的數據湖上的應用場景。

數據源上可能會有各種數據,不同的數據源和不同格式。比如說事物數據,日誌,埋點信息,IOT 等。這些數據經過一些流然後進入計算平臺,這個時候它需要一個結構化的方案,把數據組織放到一個存儲平臺上,然後供後端的數據應用進行實時或者定時的查詢。

這樣的數據庫方案它需要具備哪些特徵呢?

  • 首先,可以看到數據源的類型很多,因此需要支持比較豐富的數據 Schema 的組織;
  • 其次,它在注入的過程中要支撐實時的數據查詢,所以需要 ACID 的保證,確保不會讀到一些還沒寫完的中間狀態的髒數據;
  • 最後,例如日誌這些有可能臨時需要改個格式,或者加一列。類似這種情況,需要避免像傳統的數倉一樣,可能要把所有的數據重新提出來寫一遍,重新注入到存儲;而是需要一個輕量級的解決方案來達成需求。

Iceberg 數據庫的定位就在於實現這樣的功能,於上對接計算平臺,於下對接存儲平臺。

3. 結構化數據在數據湖上的典型解決方案

img

對於數據結構化組織,典型的解決方式是用數據庫傳統的組織方式。

如上圖所示,上方有命名空間,數據庫表的隔離;中間有多個表,可以提供多種數據 Schema 的保存;底下會放數據,表格需要提供 ACID 的特性,也支持局部 Schema 的演進。

4. Iceberg 表數據組織架構

img

  • 快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 路徑、當前快照等。
  • Manifest List:Manifest File 路徑及其 Partition,數據文件統計信息。
  • Manifest File:Data File 路徑及其每列數據上下邊界。
  • Data File:實際表內容數據,以 Parque,ORC,Avro 等格式組織。

接下來具體看一下 Iceberg 是如何將數據組織起來的。如上圖所示:

  • 可以看到右邊從數據文件開始,數據文件存放表內容數據,一般支持 Parquet、ORC、Avro 等格式;
  • 往上是 Manifest File,它會記錄底下數據文件的路徑以及每列數據的上下邊界,方便過濾查詢文件;
  • 再往上是 Manifest List,它來鏈接底下多個 Manifest File,同時記錄 Manifest File 對應的分區範圍信息,也是為了方便後續做過濾查詢;

    Manifest List 其實已經表示了快照的信息,它包含當下數據庫表所有的數據鏈接,也是 Iceberg 能夠支持 ACID 特性的關鍵保障。

    有了快照,讀數據的時候只能讀到快照所能引用到的數據,還在寫的數據不會被快照引用到,也就不會讀到髒數據。多個快照會共享以前的數據文件,通過共享這些 Manifest File 來共享之前的數據。

  • 再往上是快照元數據,記錄了當前或者歷史上表格 Scheme 的變化、分區的配置、所有快照 Manifest File 路徑、以及當前快照是哪一個。

    同時,Iceberg 提供命名空間以及表格的抽象,做完整的數據組織管理。

5. Iceberg 寫入流程

img

上方為 Iceberg 數據寫入的流程圖,這裡用計算引擎 Flink 為例。

  • 首先,Data Workers 會從元數據上讀出數據進行解析,然後把一條記錄交給 Iceberg 存儲;
  • 與常見的數據庫一樣,Iceberg 也會有預定義的分區,那些記錄會寫入到各個不同的分區,形成一些新的文件;
  • Flink 有個 CheckPoint 機制,文件到達以後,Flink 就會完成這一批文件的寫入,然後生成這一批文件的清單,接著交給 Commit Worker;
  • Commit Worker 會讀出當前快照的信息,然後與這一次生成的文件列表進行合併,生成一個新的 Manifest List 以及後續元數據的表文件的信息,之後進行提交,成功以後就形成一個新的快照。

6. Iceberg 查詢流程

img

上方為 Iceberg 數據查詢流程。

  • 首先是 Flink Table scan worker 做一個 scan,scan 的時候可以像樹一樣,從根開始,找到當前的快照或者用戶指定的一個歷史快照,然後從快照中拿出當前快照的 Manifest List 文件,根據當時保存的一些信息,就可以過濾出滿足這次查詢條件的 Manifest File;
  • 再往下經過 Manifest File 裡記錄的信息,過濾出底下需要的 Data Files。這個文件拿出來以後,再交給 Recorder reader workers,它從文件中讀出滿足條件的 Recode,然後返回給上層調用。

這裡可以看到一個特點,就是在整個數據的查詢過程中沒有用到任何 List,這是因為 Iceberg 完整地把它記錄好了,整個文件的樹形結構不需要 List,都是直接單路徑指向的,因此查詢性能上沒有耗時 List 操作,這點對於對象存儲比較友好,因為對象存儲在 List 上面是一個比較耗資源的操作。

7. Iceberg Catalog 功能一覽

Iceberg 提供 Catalog 用良好的抽象來對接數據存儲和元數據管理。任何一個存儲,只要實現 Iceberg 的 Catalog 抽象,就有機會跟 Iceberg 對接,用來組織接入上面的數據湖方案。

img

如上圖所示,Catalog 主要提供幾方面的抽象。

  • 它可以對 Iceberg 定義一系列角色文件;
  • 它的 File IO 都是可以定製,包括讀寫和刪除;
  • 它的命名空間和表的操作 (也可稱為元數據操作),也可以定製;
  • 包括表的讀取 / 掃描,表的提交,都可以用 Catalog 來定製。

這樣可以提供靈活的操作空間,方便對接各種底下的存儲。

二、對象存儲支撐 Iceberg 數據湖

1. 當前 Iceberg Catalog 實現

目前社區裡面已經有的 Iceberg Catalog 實現可分為兩個部分,一是數據 IO 部分,二是元數據管理部分。

img

如上圖所示,其實缺少面向私有對象存儲的 Catalog 實現,S3A 理論上可以接對象存儲,但它用的是文件系統語義,不是天然的對象存儲語義,模擬這些文件操作會有額外的開銷,而我們想實現的是把數據和元數據管理全部都交給一個對象存儲,而不是分離的設計。

2. 對象存儲和 HDFS 的比較

這裡存在一個問題,在有 HDFS 的情況下,為什麼還要用對象存儲?

如下所示,我們從各個角度將對象存儲和 HDFS 進行對比。

img

總結下來,我們認為:

  • 對象存儲在集群擴展性,小文件友好,多站點部署和低存儲開銷上更加有優勢;
  • HDFS 的好處就是提供追加上傳和原子性 rename,這兩個優勢正是 Iceberg 需要的。

下面對兩個存儲各自的優勢進行簡單闡述。

1)比較之:集群擴展性

img

  • HDFS 架構是用單個 Name Node 保存所有元數據,這就決定了它單節點的能力有限,所以在元數據方面沒有橫向擴展能力。
  • 對象存儲一般採用哈希方式,把元數據分隔成各個塊,把這個塊交給不同 Node 上面的服務來進行管理,天然地它元數據的上限會更高,甚至在極端情況下可以進行 rehash,把這個塊切得更細,交給更多的 Node 來管理元數據,達到擴展能力。

2)比較之:小文件友好

img

如今在大數據應用中,小文件越來越常見,並逐漸成為一個痛點。

  • HDFS 基於架構的限制,小文件存儲受限於 Name Node 內存等資源,雖然 HDFS 提供了 Archive 的方法來合併小文件,減少對 Name Node 的壓力,但這需要額外增加複雜度,不是原生的。

    同樣,小文件的 TPS 也是受限於 Name Node 的處理能力,因為它只有單個 Name Node。對象存儲的元數據是分佈式存儲和管理,流量可以很好地分佈到各個 Node 上,這樣單節點就可以存儲海量的小文件。

  • 目前,很多對象存儲提供多介質,分層加速,可以提升小文件的性能。

3)比較之:多站點部署

img

  • 對象存儲支持多站點部署

    • 全局命名空間
    • 支持豐富的規則配置
  • 對象存儲的多站點部署能力適用於兩地三中心多活的架構,而 HDFS 沒有原生的多站點部署能力。雖然目前看到一些商業版本給 HDFS 增加了多站點負責數據的能力,但由於它的兩個系統可能是獨立的,因此並不能支撐真正的全局命名空間下多活的能力。

4)比較之:低存儲開銷

img

  • 對於存儲系統來說,為了適應隨機的硬件故障,它一般會有副本機制來保護數據。

    • 常見的如三副本,把數據存三份,然後分開保存到三個 Node 上面,存儲開銷是三倍,但是它可以同時容忍兩個副本遇到故障,保證數據不會丟失。
    • 另一種是 Erasure Coding,通常稱為 EC。以 10+2 舉例,它把數據切成 10 個數據塊,然後用算法算出兩個代碼塊,一共 12 個塊。接著分佈到四個節點上,存儲開銷是 1.2 倍。它同樣可以容忍同時出現兩個塊故障,這種情況可以用剩餘的 10 個塊算出所有的數據,這樣減少存儲開銷,同時達到故障容忍程度。
  • HDFS 默認使用三副本機制,新的 HDFS 版本上已經支持 EC 的能力。經過研究,它是基於文件做 EC,所以它對小文件有天然的劣勢。因為如果小文件的大小小於分塊要求的大小時,它的開銷就會比原定的開銷更大,因為兩個代碼塊這邊是不能省的。在極端情況下,如果它的大小等同於單個代碼塊的大小,它就已經等同於三副本了。

    同時,HDFS 一旦 EC,就不能再支持 append、hflush、hsync 等操作,這會極大地影響 EC 能夠使用的場景。對象存儲原生支持 EC,對於小文件的話,它內部會把小文件合併成一個大的塊來做 EC,這樣確保數據開銷方面始終是恆定的,基於預先配置的策略。

3. 對象存儲的挑戰:數據的追加上傳

img

在 S3 協議中,對象在上傳時需要提供大小。

以 S3 標準為例,對象存儲跟 Iceberg 對接時,S3 標準對象存儲不支持數據追加上傳的接口,協議要求上傳文件時提供文件大小。所以在這種情況下,對於這種流式的 File IO 傳入,其實不太友好。

1)解決方案一:S3 Catalog 數據追加上傳 - 小文件緩存本地/內存

img

對於一些小文件,流式傳入的時候就寫入到本地緩存 / 內存,等它完全寫完後,再把它上傳到對象存儲裡。

2)解決方法二:S3 Catalog 數據追加上傳 - MPU 分段上傳大文件

img

對於大文件,會用到 S3 標準定義的 MPU 分段上傳。

它一般分為幾個步驟:

  • 第一步先創建初始化的 MPU,拿到一個 Upload ID,然後給每一個分段賦予一個 Upload ID 以及一個編號,這些分塊就可以並行上傳;
  • 在上傳完成以後,還需要一步 Complete 操作,這樣相當於通知系統,它會把基於同一個 Upload ID 以及所有的編號,從小到大排起來,組成一個大文件;
  • 把機制運用到數據追加上傳場景,常規實現就是寫入一個文件,把文件緩存到本地,當達到分塊要求大小時,就可以把它進行初始化 MPU,把它的一個分塊開始上傳。後面每一個分塊也是一樣的操作,直到最後一個分塊上傳完,最後再調用一個完成操作來完成上傳。

MPU 有優點也有缺點:

  • 缺點是 MPU 的分片數量有上限,S3 標準裡可能只有 1 萬個分片。想支持大文件的話,這個分塊就不能太小,所以對於小於分塊的文件,依然是要利用前面一種方法進行緩存上傳;
  • MPU 的優點在於並行上傳的能力。假設做一個異步的上傳,文件在緩存達到以後,不用等上一個分塊上傳成功,就可以繼續緩存下一個,之後開始上傳。當前面注入的速度足夠快時,後端的異步提交就變成了並行操作。利用這個機制,它可以提供比單條流上傳速度更快的上傳能力。

4. 對象存儲的挑戰:原子提交

img

下一個問題是對象存儲的原子提交問題。

前面提到在數據注入的過程中,最後的提交其實分為幾步,是一個線性事務。首先它要讀到當前的快照版本,然後把這一次的文件清單合併,接著提交自己新的版本。這個操作類似於我們編程裡常見的 “i=i+1”,它不是一個原子操作,對象存儲的標準裡也沒有提供這個能力。

img

上圖是併發提交元信息的場景。

  • 這裡 Commit Worker 1 拿到了 v006 版本,然後合併自己的文件,提交 v007 成功。
  • 此時還有另一個 Commit Worker 2,它也拿到了 v006,然後合併出來,且也要提供 v007。此時我們需要一個機制告訴它 v007 已經衝突,不能上傳,然後讓它自己去 Retry。Retry 以後取出新的 v007 合併,然後提交給 v008。

這是一個典型的衝突場景,這裡需要一套機制,因為如果它不能檢測到自己是一個衝突的情況的話,再提交 v007 會把上面 v007 覆蓋,會導致上一次提交的所有數據都丟失。

img

如上圖所示,我們可以使用一個分佈式鎖的機制來解決上述問題。

  • 首先,Commit Worker 1 拿到 v006,然後合併文件,在提交之前先要獲取這一把鎖,拿到鎖以後判斷當前快照版本。如果是 v006,則 v007 能提交成功,提交成功以後再解鎖。
  • 同樣,Commit Worker 2 拿到 v006 合併以後,它一開始拿不到鎖,要等 Commit Worker 1 釋放掉這個鎖以後才能拿到。等拿到鎖再去檢查的時候,會發現當前版本已經是 v007,與自己的 v007 有衝突,因此這個操作一定會失敗,然後它就會進行 Retry。

這是通過鎖來解決併發提交的問題。

5. Dell EMC ECS 的數據追加上傳

img

基於 S3 標準的對象存儲和 Iceberg 問題的解決方案存在一些問題,例如性能損失,或者需要額外部署鎖服務等。

Dell EMC ECS 也是個對象存儲,基於這個問題有不一樣的解答,它基於 S3 的標準協議有一些擴展,可以支持數據的追加上傳。

它的追加上傳與 MPU 不同的地方在於,它沒有分塊大小的限制。分塊可以設置得比較小一點,上傳後內部就會串聯起來,依然是一個有效的文件。

追加上傳和 MPU 這兩者可以在一定程度上適應不同的場景。

MPU 有加速上傳能力,追加上傳在速度在不是很快的情況下,性能也是足夠用,而且它沒有 MPU 的初始化和合並的操作,所以兩者在性能上能夠適應不同場景進行使用。

6. Dell EMC ECS 在併發提交下的解決方案

img

ECS 對象存儲還提供了一個 If-Match 的語義,在微軟的雲存儲以及谷歌的雲存儲上都有這樣一個接口能力。

  • If-Match 就是說在 Commit Worker 1 提交拿到 v006 的時候,同時拿到了文件的 eTag。提交的時候會帶上 eTag,系統需要判斷要覆蓋文件的 eTag 跟當前這個文件真實 eTag 是否相同,如果相同就允許這次覆蓋操作,那麼 v007 就能提交成功;
  • 另一種情況,是 Commit Worker 2 也拿到了 v006 的 eTag,然後上傳的時候發現拿到 eTag 跟當前系統裡文件不同,則會返回失敗,然後觸發 Retry。

這個實現是和鎖機制一樣的效果,不需要外部再重新部署鎖服務來保證原子提交的問題。

7. S3 Catalog - 統一存儲的數據

img

回顧一下,上方我們解決了文件 IO 中上傳數據 IO 的問題,和解決了元數據表格的原子提交問題。

解決這些問題以後,就可以把數據以及元數據的管理全部都交到對象存儲,不再需要額外部署元數據服務,做到真正統一數據存儲的概念。

三、演示方案

img

如上所示,演示方案用到了 Pravega,可以簡單理解為 Kafka 的一個替代,但是對它進行了性能優化。

在這個例子中,我們會把數據注入 Pravega 的流裡,然後 Flink 會從 Pravega 中讀出數據進行解析,然後存入 Iceberg 組織。Iceberg 利用 ECS Catalog,直接對接對象存儲,這裡面沒有任何其他部署,最後用 Flink 讀出這個數據。

四、存儲優化的一些思考

img

上圖為當前 Iceberg 支持的數據組織結構,可以看到它直接 Parquet 文件存在存儲裡面。

我們的想法是如果這個湖跟元數據的湖其實是一個湖,有沒有可能生成的 Parquet 文件跟源文件存在很大的數據冗餘度,是否可以減少冗餘信息的存儲。

比如最極端的情況,源文件的一個信息記錄在 Iceberg 中,就不存這個 Parquet 數據文件。當要查詢的時候,通過定製 File IO,讓它根據原文件在內存中實時生成一個類似於 Parquet 的格式,提交給上層應用查詢,就可以達到一樣的效果。

但是這種方式,侷限於對存儲的成本有很高的要求,但是對查詢的性能要求卻不高的情況。能夠實現這個也要基於 Iceberg 好的抽象,因為它的文件元數據和 File IO 都是抽象出來的,可以把源文件拆進去,讓它以為這是一個 Parquet 文件。

進一步思考,能否優化查詢性能,同時節省存儲空間。

比如預計算一下,把源文件某些常用的列拿出來,然後統計信息到 Iceberg 中,在讀的時候利用源文件和雲計算的文件,可以很快查詢到信息,同時又節省了不常用的數據列存儲空間。

這是比較初步的想法,如果能夠實現,則用 Iceberg 不僅可以索引結構化的 Parquet 文件格式,甚至可以索引一些半結構化、結構化的數據,通過臨時的計算來解決上層的查詢任務,變成一個更完整的 Data Catalog。


活動推薦

阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版現開啟活動:
99元試用實時計算 Flink 全託管版本(包年包月、10CU)即可得定製 Flink 獨家定製T恤;另包3個月及以上還有85折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *