本文以流式數據入庫的場景為基礎,介紹引入 Iceberg 作為落地格式和嵌入 Flink sink 的收益,並分析了當前可實現的框架及要點。
應用場景
流式數據入庫,是大數據和數據湖的典型應用場景。上游的流式數據,如日誌,或增量修改,通過數據總線,經過必要的處理後,匯聚並存儲於數據湖,供下游的應用(如報表或者商業智能分析)使用。
上述的應用場景通常有如下的痛點,需要整個流程不斷的優化:
- 支持流式數據寫入,並保證端到端的不重不丟(即 exactly-once);
- 儘量減少中間環節,能支持更實時(甚至是 T+0)的讀取或導出,給下游提供更實時更準確的基礎數據;
- 支持 ACID,避免髒讀等錯誤發生;
- 支持修改已落地的數據,雖然大數據和數據湖長於處理靜態的或者緩慢變化的數據,即讀多寫少的場景,但方便的修改功能可以提升用戶體驗,避免用戶因為極少的修改,手動更換整個數據文件,甚至是重新導出;
- 支持修改表結構,如增加或者變更列;而且變更不要引起數據的重新組織。
引入 Iceberg 作為 Flink sink
為了解決上述痛點,我們引入了 Iceberg 作為數據落地的格式。Iceberg 支持 ACID 事務、修改和刪除、獨立於計算引擎、支持表結構和分區方式動態變更等特性,很好的滿足我們的需求。
同時,為了支持流式數據的寫入,我們引入 Flink 作為流式處理框架,並將 Iceberg 作為 Flink sink。
下文主要介紹 Flink Iceberg sink 的實現框架和要點。但在這之前,需要先介紹一些實現中用到的 Flink 基本概念。
Flink 基本概念
從 Flink 的角度如何理解"流"和"批"
Flink 使用 DataFrame API 來統一的處理流和批數據。
Stream, Transformation 和 Operator
一個 Flink 程序由 stream 和 transformation 組成:
- Stream: Transformation 之間的中間結果數據;
- Transformation:對(一個或多個)輸入 stream 進行操作,輸出(一個或多個)結果 stream。
當 Flink 程序執行時,其被映射成 Streaming Dataflow,由如下的部分組成:
- Source (operator):接收外部輸入給 Flink;
- Transformation (operator):中間對 stream 做的任何操作;
- Sink (operator):Flink 輸出給外部。
下圖為 Flink 官網的示例,展示了一個以 Kafka 作為輸入 Source,經過中間兩個 transformation,最終通過 sink 輸出到 Flink 之外的過程。
State, Checkpoint and Snapshot
Flink 依靠 checkpoint 和基於 snapshot 的恢復機制,保證程序 state 的一致性,實現容錯。
Checkpoint 是對分佈式的數據流,以及所有 operator 的 state,打 snapshot 的過程。
■ State
一個 operator 的 state,即它包含的所有用於恢復當前狀態的信息,可分為兩類:
- 系統 state:如 operator 中對數據的緩存。
- 用戶自定義 state:和用戶邏輯相關,可以利用 Flink 提供的 managed state,如 ValueState、ListState,來存儲。
State 的存儲位置,可以分為:
- Local:內存,或者本地磁盤
- State backend:遠端的持久化存儲,如 HDFS。
如下圖所示:
■ Checkpoint
Flink 做 checkpoint 的過程如下:
- Checkpoint coordinator 首先發送 barrier 給 source。
- Source 做 snapshot,完成後向 coordinator 確認。
- Source 向下遊發送 barrier。
- 下游 operator 收到所有上游的 barrier 後,做 snapshot,完成後向 coordinator 確認。
- 繼續往下游發送 barrier,直到 sink。
- Sink 通知 coordinator 自己完成 checkpoint。
- Coordinator 確認本週期 snapshot 做完。
如下圖所示:
■ Barrier
Barrier 是 Flink 做分佈式 snapshot 的重要概念。它作為一個系統標記,被插入到數據流中,隨真實數據一起,按照數據流的方向,從上游向下遊傳遞。
由於每個 barrier 唯一對應 checkpoint id,所以數據流中的 record 實際被 barrier 分組,如下圖所示,barrier n 和 barrier n-1 之間的 record,屬於 checkpoint n。
Barrier 的作用是在分佈式的數據流中,將 operator 的多個輸入流按照 checkpoint對齊(align),如下圖所示:
Flink Iceberg sink
瞭解了上述 Flink 的基本概念,這些概念又是如何被應用和映射到 Flink Iceberg sink 當中的呢?
總體框架
如圖,Flink Iceberg sink 有兩個主要模塊和兩個輔助模塊組成:
實現要點
■ Writer
- 在當前的實現中,Java 的 Map 作為每條記錄,輸入給 writer。內部邏輯先將其轉化為作為中間格式的 Avro IndexedRecord,而後通過 Iceberg 裡的 Parquet 相關 API,累積的寫入 DataFile。
- 使用 Avro 作為中間格式是一個臨時方案,為簡化適配,並最大限度的利用現有邏輯。但長期來看,使用中間格式會影響處理效率,社區也在試圖通過 ISSUE-870 來去掉 Avro,進而使用 Iceberg 內建的數據類型作為輸入,同時也需要加入一個到 Flink 內建數據類型的轉換器。
- 在做 checkpoint 的過程中,發送 writer 自己的 barrier 到下游的 committer 之前,關閉單個 Parquet 文件,構建 DataFile,併發送 DataFile 的信息給下游。
■ Committer
- 全局唯一的 Committer 在收到上游所有 writer 的 barrier 以後,將收到的 DataFile 的信息填入 manifest file,並使用 ListState 把 manifest file 作為用戶自定義的 state,保存於 snapshot 中。
- 當 checkpoint 完成以後,通過 merge append 將 manifest file 提交給 Iceberg。Iceberg 內部通過後續的一系列操作完成 commit。最終讓新加入的數據對其他的讀任務可見。
試用 Flink Iceberg sink
社區上 https://github.com/apache/incubator-iceberg/pull/856 提供了可以試用的原型代碼。下載該 patch 放入 master 分支,編譯並構建即可。如下的程序展示瞭如何將該 sink 嵌入到 Flink 數據流中:
// Configurate catalog
org.apache.hadoop.conf.Configuration hadoopConf =
new org.apache.hadoop.conf.Configuration();
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS.varname,
META_STORE_URIS);
hadoopConf.set(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_WAREHOUSE);
Catalog icebergCatalog = new HiveCatalog(hadoopConf);
// Create Iceberg table
Schema schema = new Schema(
...
);
PartitionSpec partitionSpec = builderFor(schema)...
TableIdentifier tableIdentifier =
TableIdentifier.of(DATABASE_NAME, TABLE_NAME);
// If needed, check the existence of table by loadTable() and drop it
// before creating it
icebergCatalog.createTable(tableIdentifier, schema, partitionSpec);
// Obtain an execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing
env.enableCheckpointing(...);
// Add Source
DataStream<Map<String, Object>> dataStream =
env.addSource(source, typeInformation);
// Configure Ieberg sink
Configuration conf = new Configuration();
conf.setString(
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
META_STORE_URIS);
conf.setString(IcebergConnectorConstant.DATABASE, DATABASE_NAME);
conf.setString(IcebergConnectorConstant.TABLE, TABLE_NAME);
// Append Iceberg sink to data stream
IcebergSinkAppender<Map<String, Object>> appender =
new IcebergSinkAppender<Map<String, Object>>(conf, "test")
.withSerializer(MapAvroSerializer.getInstance())
.withWriterParallelism(1);
appender.append(dataStream);
// Trigger the execution
env.execute("Sink Test");
後續規劃
Flink Iceberg sink 有很多需要完善的地方,例如:上文中提到的去掉 Avro 作為中間格式;以及在各種失敗的情況下是否仍能保證端到端的 exactly-once;按固定時長做 checkpoint,在高低峰時生成不同大小的 DataFile,是否對後續讀不友好等。這些問題都在我們的後續規劃中,也會全數貢獻給社區。
參考資料:
[1] Iceberg 官網:
https://iceberg.apache.org/
[2] Flink 1.10文 檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/
[3] Neflix 提供的 Flink Iceberg connector 原型:
https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg
[4] Flink Iceberg sink 設計文檔:
https://docs.google.com/document/d/19M-sP6FlTVm7BV7MM4Om1n_MVo1xCy7GyDl_9ZAjVNQ/edit?usp=sharing
[5] Flink 容錯機制(checkpoint) :
https://www.cnblogs.com/starzy/p/11439988.html
# 社區活動推薦 #
普惠全球開發者,這一次,格外與眾不同!首個 Apache 頂級項目在線會議 Flink Forward 全球直播中文精華版來啦,聚焦 Alibaba、Google、AWS、Uber、Netflix、新浪微博等海內外一線廠商,經典 Flink 應用場景,最新功能、未來規劃一覽無餘。點擊下方鏈接可瞭解更多大會詳情:https://developer.aliyun.com/live/2594?spm=a2c6h.14242504.J_6074706160.2.3fca361f4cYyQx