雲計算

Hudi on Flink 快速上手指南

摘要:本文由阿里巴巴的陳玉兆分享,主要介紹 Flink 集成 Hudi 的最新版本功能以及快速上手實踐指南。內容包括:

  1. 背景
  2. 環境準備
  3. Batch 模式的讀寫
  4. Streaming 讀
  5. 總結

一、背景

Apache Hudi 是目前最流行的數據湖解決方案之一,Data Lake Analytics[1] 集成了 Hudi 服務高效的數據 MERGE(UPDATE/DELETE)場景;AWS 在 EMR 服務中 預安裝[2] 了 Apache Hudi,為用戶提供高效的 record-level updates/deletes 和高效的數據查詢管理;Uber [3]已經穩定運行 Apache Hudi 服務 4 年多,提供了低延遲的數據庫同步和高效率的查詢[4]。自 2016 年 8 月上線以來,數據湖存儲規模已經超過 100PB[5]。

Apache Flink 作為目前最流行的流計算框架,在流式計算場景有天然的優勢,當前,Flink 社區也在積極擁抱 Hudi 社區,發揮自身 streaming 寫/讀的優勢,同時也對 batch 的讀寫做了支持。

Hudi 和 Fink 在 0.8.0 版本做了大量的集成工作[6]。核心的功能包括:

  • 實現了新的 Flink streaming writer
  • 支持 batch 和 streaming 模式 reader
  • 支持 Flink SQL API

Flink streaming writer 通過 state 實現了高效的 index 方案,同時 Hudi 在 UPDATE/DELETE 上的優秀設計使得 Flink Hudi 成為當前最有潛力的 CDC 數據入湖方案,因為篇幅關係,將在後續的文章中介紹。

本文用 Flink SQL Client 來簡單的演示通過 Flink SQL API 的方式實現 Hudi 表的操作,包括 batch 模式的讀寫和 streaming 模式的讀。

二、環境準備

本文使用 Flink Sql Client[7] 作為演示工具,SQL CLI 可以比較方便地執行 SQL 的交互操作。

第一步:下載 Flink jar

Hudi 集成了 Flink 的 1.11 版本。您可以參考這裡[8]來設置 Flink 環境。hudi-flink-bundle jar 是一個集成了 Flink 相關的 jar 的 uber jar, 目前推薦使用 scala 2.11 來編譯。

第二步:設置 Flink 集群

啟動一個 standalone 的 Flink 集群。啟動之前,建議將 Flink 的集群配置設置如下:

  • 在 $FLINK_HOME/conf/flink-conf.yaml 中添加配置項 taskmanager.numberOfTaskSlots: 4
  • 在 $FLINK_HOME/conf/workers 中將條目 localhost 設置成 4 行,這裡的行數代表了本地啟動的 worker 數

啟動集群:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the flink standalone cluster
./bin/start-cluster.sh

第三步:啟動 Flink SQL Client

Hudi 的 bundle jar 應該在 Sql Client 啟動的時候加載到 CLASSPATH 中。您可以在路徑 hudi-source-dir/packaging/hudi-flink-bundle 下手動編譯 jar 包或者從 Apache Official Repository [9]下載。

啟動 SQL CLI:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell

備註:

  • 推薦使用 hadoop 2.9.x+ 版本,因為一些對象存儲(aliyun-oss)從這個版本開始支持
  • flink-parquet 和 flink-avro 已經被打進 hudi-flink-bundle jar
  • 您也可以直接將 hudi-flink-bundle jar 拷貝到 $FLINK_HOME/lib 目錄下
  • 本文的存儲選取了對象存儲 aliyun-oss,為了方便,您也可以使用本地路徑

演示的工作目錄結構如下:

/Users/chenyuzhao/workspace/hudi-demo
  /- flink-1.11.3
  /- hadoop-2.9.2

三、Batch 模式的讀寫

插入數據

使用如下 DDL 語句創建 Hudi 表:

Flink SQL> create table t2(
>   uuid varchar(20),
>   name varchar(10),
>   age int,
>   ts timestamp(3),
>   `partition` varchar(20)
> )
> PARTITIONED BY (`partition`)
> with (
>   'connector' = 'hudi',
>   'path' = 'oss://vvr-daily/hudi/t2'
> );
[INFO] Table has been created.

DDL 裡申明瞭表的 path,record key 為默認值 uuid,pre-combine key 為默認值 ts 。

然後通過 VALUES 語句往表中插入數據:

Flink SQL> insert into t2 values
>   ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
>   ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
>   ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
>   ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
>   ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
>   ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
>   ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
>   ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 59f2e528d14061f23c552a7ebf9a76bd

這裡看到 Flink 的作業已經成功提交到集群,可以本地打開 web UI 觀察作業的執行情況:

image.png

查詢數據

作業執行完成後,通過 SELECT 語句查詢表結果:

Flink SQL> set execution.result-mode=tableau;
[INFO] Session property has been set.
Flink SQL> select * from t2;
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |
|   + |                  id4 |               Fabian |          31 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
|   + |                  id1 |                Danny |          23 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id2 |              Stephen |          33 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id5 |               Sophia |          18 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
Received a total of 8 rows

這裡執行語句 set execution.result-mode=tableau; 可以讓查詢結果直接輸出到終端。

通過在 WHERE 子句中添加 partition 路徑來裁剪 partition:

Flink SQL> select * from t2 where `partition` = 'par1';
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id1 |                Danny |          23 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id2 |              Stephen |          33 |     1970-01-01T00:00:02 |                 par1 |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
Received a total of 2 rows

更新數據

相同的 record key 的數據會自動覆蓋,通過 INSERT 相同 key 的數據可以實現數據更新:

Flink SQL> insert into t2 values
>   ('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01','par1'),
>   ('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02','par1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 944de5a1ecbb7eeb4d1e9e748174fe4c
Flink SQL> select * from t2;
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id1 |                Danny |          24 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id2 |              Stephen |          34 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |
|   + |                  id4 |               Fabian |          31 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id5 |               Sophia |          18 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
Received a total of 8 rows

可以看到 uuid 為 id1 和 id2 的數據 age 字段值發生了更新。

再次 insert 新數據觀察結果:

Flink SQL> insert into t2 values
>   ('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),
>   ('id5','Sophia',19,TIMESTAMP '1970-01-01 00:00:05','par3');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: fdeb7fd9f08808e66d77220f43075720
Flink SQL> select * from t2;
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id5 |               Sophia |          19 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |
|   + |                  id4 |               Fabian |          32 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id1 |                Danny |          24 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id2 |              Stephen |          34 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
Received a total of 8 rows

四、Streaming 讀

通過如下語句創建一張新的表並注入數據:

Flink SQL> create table t1(
>   uuid varchar(20),
>   name varchar(10),
>   age int,
>   ts timestamp(3),
>   `partition` varchar(20)
> )
> PARTITIONED BY (`partition`)
> with (
>   'connector' = 'hudi',
>   'path' = 'oss://vvr-daily/hudi/t1',
>   'table.type' = 'MERGE_ON_READ',
>   'read.streaming.enabled' = 'true',
>   'read.streaming.check-interval' = '4'
> );
[INFO] Table has been created.
Flink SQL> insert into t1 values
>   ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
>   ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
>   ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
>   ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
>   ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
>   ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
>   ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
>   ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 9e1dcd37fd0f8ca77534c30c7d87be2c

這裡將 table option read.streaming.enabled 設置為 true,表明通過 streaming 的方式讀取表數據;opiton read.streaming.check-interval 指定了 source 監控新的 commits 的間隔為 4s;option table.type 設置表類型為 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 讀。

以上操作發生在一個 terminal 中,我們稱之為 terminal_1。

從新的 terminal(我們稱之為 terminal_2)再次啟動 Sql Client,重新創建 t1 表並查詢:

Flink SQL> set execution.result-mode=tableau;
[INFO] Session property has been set.
Flink SQL> create table t1(
>   uuid varchar(20),
>   name varchar(10),
>   age int,
>   ts timestamp(3),
>   `partition` varchar(20)
> )
> PARTITIONED BY (`partition`)
> with (
>   'connector' = 'hudi',
>   'path' = 'oss://vvr-daily/hudi/t1',
>   'table.type' = 'MERGE_ON_READ',
>   'read.streaming.enabled' = 'true',
>   'read.streaming.check-interval' = '4'
> );
[INFO] Table has been created.
Flink SQL> select * from t1;
2021-03-22 18:36:37,042 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id2 |              Stephen |          33 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id1 |                Danny |          23 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
|   + |                  id5 |               Sophia |          18 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id4 |               Fabian |          31 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |

回到 terminal_1,繼續執行 batch mode 的 INSERT 操作:

Flink SQL> insert into t1 values
>   ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 2dad24e067b38bc48c3a8f84e793e08b

幾秒之後,觀察 terminal_2 的輸出多了一行:

+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id2 |              Stephen |          33 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id1 |                Danny |          23 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
|   + |                  id5 |               Sophia |          18 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id4 |               Fabian |          31 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |
|   + |                  id1 |                Danny |          27 |     1970-01-01T00:00:01 |                 par1 |

再次在 terminal_1 中執行 INSERT 操作:

Flink SQL> insert into t1 values
>   ('id4','Fabian',32,TIMESTAMP '1970-01-01 00:00:04','par2'),
>   ('id5','Sophia',19,TIMESTAMP '1970-01-01 00:00:05','par3');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: ecafffda3d294a13b0a945feb9acc8a5

觀察 terminal_2 的輸出變化:

+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
| +/- |                 uuid |                 name |         age |                      ts |            partition |
+-----+----------------------+----------------------+-------------+-------------------------+----------------------+
|   + |                  id2 |              Stephen |          33 |     1970-01-01T00:00:02 |                 par1 |
|   + |                  id1 |                Danny |          23 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id6 |                 Emma |          20 |     1970-01-01T00:00:06 |                 par3 |
|   + |                  id5 |               Sophia |          18 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id8 |                  Han |          56 |     1970-01-01T00:00:08 |                 par4 |
|   + |                  id7 |                  Bob |          44 |     1970-01-01T00:00:07 |                 par4 |
|   + |                  id4 |               Fabian |          31 |     1970-01-01T00:00:04 |                 par2 |
|   + |                  id3 |               Julian |          53 |     1970-01-01T00:00:03 |                 par2 |
|   + |                  id1 |                Danny |          27 |     1970-01-01T00:00:01 |                 par1 |
|   + |                  id5 |               Sophia |          19 |     1970-01-01T00:00:05 |                 par3 |
|   + |                  id4 |               Fabian |          32 |     1970-01-01T00:00:04 |                 par2 |

五、總結

通過一些簡單的演示,我們發現 HUDI Flink 的集成已經相對完善,讀寫路徑均已覆蓋,關於詳細的配置,可以參考 Flink SQL Config Options[10]。

Hudi 社區正在積極的推動和 Flink 的深度集成,包括但不限於:

  • Flink streaming reader 支持 watermark,實現數據湖/倉的中間計算層 pipeline
  • Flink 基於 Hudi 的物化視圖,實現分鐘級的增量視圖,服務於線上的近實時查詢

也希望更多社區小夥伴能夠參與到社區貢獻中,更多數據湖技術交流可掃碼加入社區釘釘交流群~

image.png

註釋:
[1] https://www.alibabacloud.com/help/zh/product/70174.htm
[2]https://aws.amazon.com/cn/emr/features/hudi/
[3]https://www.uber.com/
[4]http://www.slideshare.net/vinothchandar/hadoop-strata-talk-uber-your-hadoop-has-arrived/32
[5]https://eng.uber.com/uber-big-data-platform/
[6]https://issues.apache.org/jira/browse/HUDI-1521
[7]https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
[8]https://flink.apache.org/downloads.html
[9]https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink-bundle_2.11/
[10]https://hudi.apache.org/docs/configurations.html#flink-options

活動推薦:

僅需99元即可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *