大數據

Iceberg 在基於 Flink 的流式數據入庫場景中的應用

本文以流式數據入庫的場景為基礎,介紹引入 Iceberg 作為落地格式和嵌入 Flink sink 的收益,並分析了當前可實現的框架及要點。

應用場景

流式數據入庫,是大數據和數據湖的典型應用場景。上游的流式數據,如日誌,或增量修改,通過數據總線,經過必要的處理後,匯聚並存儲於數據湖,供下游的應用(如報表或者商業智能分析)使用。

640 1.jpg

上述的應用場景通常有如下的痛點,需要整個流程不斷的優化:

  • 支持流式數據寫入,並保證端到端的不重不丟(即 exactly-once);
  • 儘量減少中間環節,能支持更實時(甚至是 T+0)的讀取或導出,給下游提供更實時更準確的基礎數據;
  • 支持 ACID,避免髒讀等錯誤發生;
  • 支持修改已落地的數據,雖然大數據和數據湖長於處理靜態的或者緩慢變化的數據,即讀多寫少的場景,但方便的修改功能可以提升用戶體驗,避免用戶因為極少的修改,手動更換整個數據文件,甚至是重新導出;
  • 支持修改表結構,如增加或者變更列;而且變更不要引起數據的重新組織。

引入 Iceberg 作為 Flink sink

為了解決上述痛點,我們引入了 Iceberg 作為數據落地的格式。Iceberg 支持 ACID 事務、修改和刪除、獨立於計算引擎、支持表結構和分區方式動態變更等特性,很好的滿足我們的需求。

同時,為了支持流式數據的寫入,我們引入 Flink 作為流式處理框架,並將 Iceberg 作為 Flink sink。

下文主要介紹 Flink Iceberg sink 的實現框架和要點。但在這之前,需要先介紹一些實現中用到的 Flink 基本概念。

Flink 基本概念

從 Flink 的角度如何理解"流"和"批"

640 2.png

Flink 使用 DataFrame API 來統一的處理流和批數據。

Stream, Transformation 和 Operator

一個 Flink 程序由 stream 和 transformation 組成:

  • Stream: Transformation 之間的中間結果數據;
  • Transformation:對(一個或多個)輸入 stream 進行操作,輸出(一個或多個)結果 stream。

當 Flink 程序執行時,其被映射成 Streaming Dataflow,由如下的部分組成:

  • Source (operator):接收外部輸入給 Flink;
  • Transformation (operator):中間對 stream 做的任何操作;
  • Sink (operator):Flink 輸出給外部。

下圖為 Flink 官網的示例,展示了一個以 Kafka 作為輸入 Source,經過中間兩個 transformation,最終通過 sink 輸出到 Flink 之外的過程。

640 3.jpg

State, Checkpoint and Snapshot

Flink 依靠 checkpoint 和基於 snapshot 的恢復機制,保證程序 state 的一致性,實現容錯。

Checkpoint 是對分佈式的數據流,以及所有 operator 的 state,打 snapshot 的過程。

■ State

一個 operator 的 state,即它包含的所有用於恢復當前狀態的信息,可分為兩類:

  • 系統 state:如 operator 中對數據的緩存。
  • 用戶自定義 state:和用戶邏輯相關,可以利用 Flink 提供的 managed state,如 ValueState、ListState,來存儲。

State 的存儲位置,可以分為:

  • Local:內存,或者本地磁盤
  • State backend:遠端的持久化存儲,如 HDFS。

如下圖所示:

640 4.jpg

■ Checkpoint

Flink 做 checkpoint 的過程如下:

  1. Checkpoint coordinator 首先發送 barrier 給 source。
  2. Source 做 snapshot,完成後向 coordinator 確認。
  3. Source 向下遊發送 barrier。
  4. 下游 operator 收到所有上游的 barrier 後,做 snapshot,完成後向 coordinator 確認。
  5. 繼續往下游發送 barrier,直到 sink。
  6. Sink 通知 coordinator 自己完成 checkpoint。
  7. Coordinator 確認本週期 snapshot 做完。

如下圖所示:

640 5.jpg

■ Barrier

Barrier 是 Flink 做分佈式 snapshot 的重要概念。它作為一個系統標記,被插入到數據流中,隨真實數據一起,按照數據流的方向,從上游向下遊傳遞。

由於每個 barrier 唯一對應 checkpoint id,所以數據流中的 record 實際被 barrier 分組,如下圖所示,barrier n 和 barrier n-1 之間的 record,屬於 checkpoint n。

640 6.jpg

Barrier 的作用是在分佈式的數據流中,將 operator 的多個輸入流按照 checkpoint對齊(align),如下圖所示:

640 7.jpg

Flink Iceberg sink

瞭解了上述 Flink 的基本概念,這些概念又是如何被應用和映射到 Flink Iceberg sink 當中的呢?

總體框架

640 8.jpg

如圖,Flink Iceberg sink 有兩個主要模塊和兩個輔助模塊組成:

640 9.png

實現要點

■ Writer

  1. 在當前的實現中,Java 的 Map 作為每條記錄,輸入給 writer。內部邏輯先將其轉化為作為中間格式的 Avro IndexedRecord,而後通過 Iceberg 裡的 Parquet 相關 API,累積的寫入 DataFile。
  2. 使用 Avro 作為中間格式是一個臨時方案,為簡化適配,並最大限度的利用現有邏輯。但長期來看,使用中間格式會影響處理效率,社區也在試圖通過 ISSUE-870 來去掉 Avro,進而使用 Iceberg 內建的數據類型作為輸入,同時也需要加入一個到 Flink 內建數據類型的轉換器。
  3. 在做 checkpoint 的過程中,發送 writer 自己的 barrier 到下游的 committer 之前,關閉單個 Parquet 文件,構建 DataFile,併發送 DataFile 的信息給下游。

■ Committer

  1. 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以後,將收到的 DataFile 的信息填入 manifest file,並使用 ListState 把 manifest file 作為用戶自定義的 state,保存於 snapshot 中。
  2. 當 checkpoint 完成以後,通過 merge append 將 manifest file 提交給 Iceberg。Iceberg 內部通過後續的一系列操作完成 commit。最終讓新加入的數據對其他的讀任務可見。

試用 Flink Iceberg sink

社區上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以試用的原型代碼。下載該 patch 放入 master 分支,編譯並構建即可。如下的程序展示瞭如何將該 sink 嵌入到 Flink 數據流中:

// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
    new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
    META_STORE_URIS);
hadoopConf.set(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_WAREHOUSE);
 
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
 
// Create Iceberg table
Schema schema = new Schema(
   ...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
    TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
 
// Obtain an execution environment
StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
 
// Enable checkpointing
env.enableCheckpointing(...);
  
// Add Source
DataStream<Map<String, Object>> dataStream =
    env.addSource(source, typeInformation);
 
// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
    org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
    META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
 
// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
   new IcebergSinkAppender<Map<String, Object>>(conf, "test")
       .withSerializer(MapAvroSerializer.getInstance())
       .withWriterParallelism(1);
appender.append(dataStream);
 
// Trigger the execution
env.execute("Sink Test");

後續規劃

Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作為中間格式;以及在各種失敗的情況下是否仍能保證端到端的 exactly-once;按固定時長做 checkpoint,在高低峰時生成不同大小的 DataFile,是否對後續讀不友好等。這些問題都在我們的後續規劃中,也會全數貢獻給社區。

參考資料:

[1] Iceberg 官網:
https://iceberg.apache.org/
[2] Flink 1.10文 檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 設計文檔:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容錯機制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html

# 社區活動推薦 #

普惠全球開發者,這一次,格外與眾不同!首個 Apache 頂級項目在線會議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規劃一覽無餘。點擊下方鏈接可瞭解更多大會詳情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx

Leave a Reply

Your email address will not be published. Required fields are marked *