大數據

Flink集成Iceberg在同程藝龍的實踐

本文由同城藝龍大數據開發工程師張軍分享,主要介紹同城藝龍 Flink 集成 Iiceberg 的生產實踐。內容包括:

  1. 背景及痛點
  2. Flink + Iceberg 的落地
  3. Iceberg 優化實踐
  4. 後續工作
  5. 收益及總結

一、背景及痛點

業務背景

同程藝龍是一個提供機票、住宿、交通等服務的在線旅遊服務平臺,目前我所在的部門屬於公司的研發部門,主要職責是為公司內其他業務部門提供一些基礎服務,我們的大數據系統主要承接的業務是部門內的一些大數據相關的數據統計、分析工作等。數據來源有網關日誌數據、服務器監控數據、K8s 容器的相關日誌數據,App 的打點日誌, MySQL 的 binlog 日誌等。我們主要的大數據任務是基於上述日誌構建實時報表,提供基於 Presto 的報表展示和即時查詢服務,同時也會基於 Flink 開發一些實時、批處理任務,為業務方提供準確及時的數據支撐。

原架構方案

由於我們所有的原始數據都是存儲在 Kafka 的,所以原來的技術架構就是首先是 Flink 任務消費 Kafka 的數據,經過 Flink SQL 或者 Flink jar 的各種處理之後實時寫入 Hive,其中絕大部分任務都是 Flink SQL 任務,因為我認為 SQL 開發相對代碼要簡單的多,並且維護方便、好理解,所以能用 SQL 寫的都儘量用 SQL 來寫。
提交 Flink 的平臺使用的是 Zeppelin,其中提交 Flink SQL 任務是 Zeppelin 自帶的功能,提交 jar 包任務是我自己基於 Application 模式開發的 Zeppelin 插件。
對於落地到 Hive 的數據,使用開源的報表系統 metabase (底層使用 Presto) 提供實時報表展示、定時發送郵件報表,以及自定義 SQL 查詢服務。由於業務對數據的實時性要求比較高,希望數據能儘快的展示出來,所以我們很多的 Flink 流式任務的 checkpoint 設置為 1 分鐘,數據格式採用的是 orc 格式。

痛點

由於採用的是列式存儲格式 ORC,無法像行式存儲格式那樣進行追加操作,所以不可避免的產生了一個大數據領域非常常見且非常棘手的問題,即 HDFS 小文件問題。

開始的時候我們的小文件解決方案是自己寫的一個小文件壓縮工具,定期去合併,我們的 Hive 分區一般都是天級別的,所以這個工具的原理就是每天凌晨啟動一個定時任務去壓縮昨天的數據,首先把昨天的數據寫入一個臨時文件夾,壓縮完,和原來的數據進行記錄數的比對檢驗,數據條數一致之後,用壓縮後的數據覆蓋原來的數據,但是由於無法保證事務,所以出現了很多問題:

  • 壓縮的同時由於延遲數據的到來導致昨天的 Hive 分區又有數據寫入了,檢驗就會失敗,導致合併小文件失敗。
  • 替換舊數據的操作是沒有事務保證的,如果替換的過程中舊分區有新的數據寫入,就會覆蓋新寫入的數據,造成數據丟失。
  • 沒有事務的支持,無法實時合併當前分區的數據,只能合併壓縮前一個分區的,最新的分區數據仍然有小文件的問題,導致最新數據查詢性能提高不了。

二、Flink+Iceberg 的落地

Iceberg 技術調研

所以基於以上的 HDFS 小文件、查詢慢等問題,結合我們的現狀,我調研了目前市面上的數據湖技術:Delta、Apache Iceberg 和 Apache Hudi,考慮了目前數據湖框架支持的功能和以後的社區規劃,最終我們是選擇了 Iceberg,其中考慮的原因有以下幾方面:

■ Iceberg 深度集成 Flink

前面講到,我們的絕大部分任務都是 Flink 任務,包括批處理任務和流處理任務,目前這三個數據湖框架,Iceberg 是集成 Flink 做的最完善的,如果採用 Iceberg 替代 Hive 之後,遷移的成本非常小,對用戶幾乎是無感知的,
比如我們原來的 SQL 是這樣的:

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table

遷移到 Iceberg 以後,只需要修改 catalog 就行。

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table

Presto 查詢也是和這個類似,只需要修改 catalog 就行了。

■Iceberg 的設計架構使得查詢更快

image.png

在 Iceberg 的設計架構中,manifest 文件存儲了分區相關信息、data files 的相關統計信息(max/min)等,去查詢一些大的分區的數據,就可以直接定位到所要的數據,而不是像 Hive 一樣去 list 整個 HDFS 文件夾,時間複雜度從 O(n) 降到了 O(1),使得一些大的查詢速度有了明顯的提升,在 Iceberg PMC Chair Ryan Blue 的演講中,我們看到命中 filter 的任務執行時間從 61.5 小時降到了 22 分鐘。

■使用 Flink SQL 將 CDC 數據寫入 Iceberg

Flink CDC 提供了直接讀取 MySQL binlog 的方式,相對以前需要使用 canal 讀取 binlog 寫入 Iceberg,然後再去消費 Iceberg 數據。少了兩個組件的維護,鏈路減少了,節省了維護的成本和出錯的概率。並且可以實現導入全量數據和增量數據的完美對接,所以使用 Flink SQL 將 MySQL binlog 數據導入 Iceberg 來做 MySQL->Iceberg 的導入將會是一件非常有意義的事情。

此外對於我們最初的壓縮小文件的需求,雖然 Iceberg 目前還無法實現自動壓縮,但是它提供了一個批處理任務,已經能滿足我們的需求。

■Hive 表遷移 Iceberg 表

遷移準備工作

目前我們的所有數據都是存儲在 Hive 表的,在驗證完 Iceberg 之後,我們決定將 Hive 的數據遷移到 Iceberg,所以我寫了一個工具,可以使用 Hive 的數據,然後新建一個 Iceberg 表,為其建立相應的元數據,但是測試的時候發現,如果採用這種方式,需要把寫入 Hive 的程序停止,因為如果 Iceberg 和 Hive 使用同一個數據文件,而壓縮程序會不斷地壓縮 Iceberg 表的小文件,壓縮完之後,不會馬上刪除舊數據,所以 Hive 表就會查到雙份的數據,故我們採用雙寫的策略,原來寫入 Hive 的程序不動,新啟動一套程序寫入 Iceberg,這樣能對 Iceberg 表觀察一段時間。還能和原來 Hive 中的數據進行比對,來驗證程序的正確性。

經過一段時間觀察,每天將近幾十億條數據、壓縮後幾個 T 大小的 Hive 表和 Iceberg 表,一條數據也不差。所以在最終對比數據沒有問題之後,把 Hive 錶停止寫入,使用新的 Iceberg 表。

遷移工具

我將這個 Hive 表遷移 Iceberg 表的工具做成了一個基於 Flink batch job 的 Iceberg Action,提交了社區,不過目前還沒合併:https://github.com/apache/iceberg/pull/2217。這個功能的思路是使用 Hive 原始的數據不動,然後新建一個 Iceberg table,再為這個新的 Iceberg table 生成對應的元數據,大家有需要的話可以先看看。

此外,Iceberg 社區,還有一個把現有的數據遷移到已存在的 Iceberg table 的工具,類似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存儲過程做的,大家也可以關注下:https://github.com/apache/iceberg/pull/2210

三、Iceberg 優化實踐

壓縮小文件

目前壓縮小文件是採用的一個額外批任務來進行的,Iceberg 提供了一個 Spark 版本的 action,我在做功能測試的時候發現了一些問題,此外我對 Spark 也不是非常熟悉,擔心出了問題不好排查,所以參照 Spark 版本的自己實現了一個 Flink 版本,並修復了一些 bug,進行了一些功能的優化。

由於我們的 Iceberg 的元數據都是存儲在 Hive 中的,也就是我們使用了 HiveCatalog,所以壓縮程序的邏輯是把 Hive 中所有的 Iceberg 表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區表還是非分區表,都進行全表的壓縮,這樣做是為了處理某些使用 eventtime 的 Flink 任務。如果有延遲的數據的到來,就會把數據寫入以前的分區,如果不是全表壓縮只壓縮當天分區的話,新寫入的其他天的數據就不會被壓縮。

之所以沒有開啟定時任務來壓縮,是因為比如定時五分鐘壓縮一個表,如果五分鐘之內這個壓縮任務沒完成,沒有提交新的 snapshot,下一個定時任務又開啟了,就會把上一個沒有完成的壓縮任務中的數據重新壓縮一次,所以每個表依次壓縮的策略可以保證某一時刻一個表只有一個任務在壓縮。

代碼示例參考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();

目前系統運行穩定,已經完成了幾萬次任務的壓縮。

image.png

注意:
不過目前對於新發布的 Iceberg 0.11 來說,還有一個已知的 bug,即當壓縮前的文件大小大於要壓縮的大小(targetSizeInBytes)時,會造成數據丟失,其實這個問題我在最開始測試小文件壓縮的時候就發現了,並且提了一個 pr,我的策略是大於目標文件的數據文件不參與壓縮,不過這個 pr 沒有合併到 0.11 版本中,後來社區另外一個兄弟也發現了相同的問題,提交了一個 pr( https://github.com/apache/iceberg/pull/2196 ) ,策略是將這個大文件拆分到目標文件大小,目前已經合併到 master,會在下一個 bug fix 版本 0.11.1 中發佈。

查詢優化

■ 批處理定時任務

目前對於定時調度中的批處理任務,Flink 的 SQL 客戶端還沒 Hive 那樣做的很完善,比如執行 hive-f 來執行一個文件。而且不同的任務需要不同的資源,並行度等。

所以我自己封裝了一個 Flink 程序,通過調用這個程序來進行處理,讀取一個指定文件裡面的 SQL,來提交批任務。在命令行控制任務的資源和並行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql

■ 優化

批任務的查詢這塊,我做了一些優化工作,比如 limit 下推,filter 下推,查詢並行度推斷等,可以大大提高查詢的速度,這些優化都已經推回給社區,並且在 Iceberg 0.11 版本中發佈。

運維管理

■ 清理 orphan 文件

  1. 定時任務刪除

在使用 Iceberg 的過程中,有時候會有這樣的情況,我提交了一個 Flink 任務,由於各種原因,把它停了,這個時候 Iceberg 還沒提交相應的快照。此外由於一些異常導致程序失敗,會產生一些不在 Iceberg 元數據裡面的孤立的數據文件,這些文件對 Iceberg 來說是不可達的,也是沒用的。所以我們需要像 jvm 的垃圾回收一樣來清理這些文件。

目前 Iceberg 提供了一個 Spark 版本的 action 來處理這些沒用的文件,我們採取的策略和壓縮小文件一樣,獲取 Hive 中的所有的 Iceberg 表。每隔一個小時執行一次定時任務來刪除這些沒用的文件。

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
  1. 踩坑

我們在程序運行過程中出現了正常的數據文件被刪除的問題,經過調研,由於快照保留設置是一小時,這個清理程序清理時間也是設置一個小時,通過日誌發現是這個清理程序刪除了正常的數據。查了查代碼,應該是設置了一樣的時間,在清理孤立文件的時候,有其他程序正在讀取要 expired 的 snapshot,導致刪除了正常的數據。最後把這個清理程序的清理時間改成默認的三天,沒有再出現刪除數據文件的問題。
當然,為了保險起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個備份文件夾,檢查沒有問題之後,手工刪除。

■ 快照過期處理

我們的快照過期策略,是和壓縮小文件的批處理任務寫在一起的,壓縮完小文件之後,進行表的快照過期處理,目前保留的時間是一個小時。這是因為對於有一些比較大的表,分區比較多,而且 checkpoint 比較短,如果保留的快照過長的話,還是會保留過多小文件,我們暫時沒有查詢歷史快照的需求,所以我將快照的保留時間設置了一個小時。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();

■ 數據管理

寫入了數據之後,當想查看相應的快照有多少數據文件時,直接查詢 Spark 無法知道哪個是有用的,哪個是沒用的。所以需要有對應的管理工具。目前 Flink 這塊還不太成熟,我們可以使用 Spark3 提供的工具來查看。

  1. DDL

目前 create table 這些操作我們是通過 Flink SQL Client 來做的。其他相關的 DDL 的操作可以使用 Spark 來做:https://iceberg.apache.org/spark/#ddl-commands

  1. DML

一些相關的數據的操作,比如刪除數據等可以通過 MySQL 來實現,Presto 目前只支持分區級別的刪除功能。

  1. show partitions & show create table

在我們操作 Hive 的時候,有一些很常用的操作,比如 show partitions、 show create table 等,這些目前 Flink 還沒有支持,所以在操作 Iceberg 的時候就很不方便,我們自己基於 Flink 1.12 做 了修改,不過目前還沒有完全提交到社區,後續有時間會提交到 Flink 和 Iceberg 社區。

四、後續工作

  • Flink SQL 接入 CDC 數據到 Iceberg

目前在我們內部的版本中,我已經測試通過可以使用 Flink SQL 將 CDC 數據(比如 MySQL binlog)寫入 Iceberg,社區的版本中實現該功能還需要做一些工作,我也提交了一些相關的 PR 來推進這個工作。

  • 使用 SQL 進行刪除和更新

對於 copy-on-write 表,我們可以使用 Spark SQL 來進行行級的刪除和更新。具體的支持的語法可以參考源碼中的測試類:

org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,這些功能我在測試環境測試是可以的,但是還沒有來得及更新到生產。

  • 使用 Flink SQL 進行 streaming read

在工作中會有一些這樣的場景,由於數據比較大,Iceberg 的數據只存了較短的時間,如果很不幸因為程序寫錯了等原因,想從更早的時間來消費就無能為力了。
當引入了 Iceberg 的 streaming read 之後,這些問題就可以解決了,因為 Iceberg 存儲了所有的數據,當然這裡有一個前提就是對於數據沒有要求特別精確,比如達到秒級別,因為目前 Flink 寫入 Iceberg 的事務提交是基於 Flink Checkpoint 間隔的。

五、收益及總結

經過對 Iceberg 大概一個季度的調研,測試,優化和 bug 修復,我們將現有的 Hive 表都遷移到了 Iceberg,完美解決了原來的所有的痛點問題,目前系統穩定運行,而且相對 Hive 得到了很多的收益:

  • Flink 寫入的資源減少

舉一個例子,默認配置下,原來一個 flink 讀取 kafka 寫入 hive 的任務,需要60個並行度才不會讓 Kafka 產生積壓。改成寫入 iceberg 之後,只需要20個並行度就夠了。

  • 查詢速度變快

前面我們講到 Iceberg 查詢的時候不會像 Hive 一樣去 list 整個文件夾來獲取分區數據,而是先從 manifest 文件中獲取相關數據,查詢的性能得到了顯著的提升,一些大的報表的查詢速度從 50 秒提高到 30 秒。

  • 併發讀寫

由於 Iceberg 的事務支持,我們可以實現對一個表進行併發讀寫,Flink 流式數據實時入湖,壓縮程序同時壓縮小文件,清理過期文件和快照的程序同時清理無用的文件,這樣就能更及時的提供數據,做到分鐘級的延遲,查詢最新分區數據的速度大大加快了,並且由於 Iceberg 的 ACID 特性可以保證數據的準確性。

  • time travel

可以回溯查詢以前某一時刻的數據。

總結一下,我們目前可以實現使用 Flink SQL 對 Iceberg 進行批、流的讀寫,並可以對小文件進行實時的壓縮,使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,後續可以使用 Flink SQL 將 CDC 的數據寫入 Iceberg。目前對 Iceberg 的所有的優化和 bug fix,我已經貢獻給社區。由於筆者水平有限,有時候也難免有錯誤,還請大家不吝賜教。

作者介紹:
張軍,同程藝龍大數據開發工程師

活動推薦:

僅需99元即可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *