大數據

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式發佈啦!

來源 | Apache Flink 官方博客
翻譯 | 高贇(雲騫)

Apache Flink 社區很榮幸的宣佈 Flink 1.11.0 版本正式發佈!超過 200 名貢獻者參與了 Flink 1.11.0 的開發,提交了超過 1300 個修復或優化。這些修改極大的提高了 Flink 的可用性,並且增強了各個 API 棧的功能。其中一些比較重要的修改包括:

  1. 核心引擎部分引入了非對齊的 Checkpoint 機制。這一機制是對 Flink 容錯機制的一個重要改進,它可以提高嚴重反壓作業的 Checkpoint 速度。
  2. 實現了一套新的 Source 接口。通過統一流和批作業 Source 的運行機制,提供常用的內部實現如事件時間處理,watermark 生成和空閒併發檢測,這套新的 Source 接口可以極大的降低實現新的 Source 時的開發複雜度。
  3. Flink SQL 引入了對 CDC(Change Data Capture,變動數據捕獲)的支持,它使 Flink 可以方便的通過像 Debezium 這類工具來翻譯和消費數據庫的變動日誌。Table API 和 SQL 也擴展了文件系統連接器對更多用戶場景和格式的支持,從而可以支持將流式數據從 Kafka 寫入 Hive 等場景。
  4. PyFlink 優化了多個部分的性能,包括對向量化的用戶自定義函數(Python UDF)的支持。這些改動使 Flink Python 接口可以與常用的 Python 庫(如 Pandas 和 NumPy)進行互操作,從而使 Flink 更適合數據處理與機器學習的場景。

Flink 1.11.0 的二進制發佈包和源代碼可以在 Flink 官網的下載頁面獲得,對應的 PyFlink 發佈包可以在 PyPI 網站下載。詳情可以參閱發佈說明,發佈功能更新與更新後的文檔。

我們希望您下載試用這一版本後,可以通過 Flink 郵件列表和 JIRA 網站和我們分享您的反饋意見。

▼ GitHub 下載地址 ▼

https://flink.apache.org/downloads.html#apache-flink-1110

新的功能和優化

非對齊的 Checkpoints(Beta 版本)

當 Flink 發起一次 Checkpoint 時, Checkpoint Barrier 會從整個拓撲的 Source 出發一直流動到 Sink。對於超過一個輸入的算子,來自各個輸入的 Barrier 首先需要對齊,然後這個算子才能進行 state 的快照操作以及將 Barrier 發佈給後續的算子。一般情況下對齊可以在幾毫秒內完成,但是當反壓時,對齊可能成為一個瓶頸:

  1. Checkpoint Barrier 在有反壓的輸入通道中傳播的速度非常慢(需要等待前面的數據處理完成),這將會阻塞對其它輸入通道的數據處理並最終進一步反壓上游的算子。
  2. Checkpoint Barrier 傳播慢還會導致 Checkpoint 時間過長甚至超時,在最壞的情況下,這可能導致整個作業進度無法更新。

為了提高 Checkpoint 在反壓情況下的性能,Flink 社區在 1.11.0 版本中初步實現了非對齊的 Checkpoint 機制(FLIP-76)。與對齊的 Checkpoint(圖1)相比,這種方式下算子不需要等待來自各個輸入通道的 Barrier 對齊,相反,這種方式允許 Barrier 越過前面的待處理的數據(即在輸出和輸入 Buffer 中的數據)並且直接觸發 Checkpoint 的同步階段。這一過程如圖2所示。

image1.gif
圖1. 對齊的Checkpoint

image2.png
圖2. 非對齊的Checkpoint

由於被越過的傳播中的數據必須作為快照的一部分被持久化,非對齊的 Checkpoint 機制會增加 Checkpoint 的大小。但是,好的方面是它可以極大的減少 Checkpoint 需要的時間,因此即使在非穩定的環境中,用戶也可以看到更多的作業進度。這是由於非對齊的 Checkpoint 可以減少 Recovery 的負載。關於非對齊的 Checkpoint 更詳細的信息以及未來的開發計劃,可以分別參考相關文檔和 FLINK-14551。

和其它 Beta 版本的特性一樣,我們非常期待和感謝您試用之後和社區分享您的感受。

注意:開啟這一特徵需要通過 Chekpoint 選項配置 enableUnalignedCheckpoints 參數。需要注意的是,非對齊的 Checkpoint 只有在 CheckpointMode 被設置為 CheckpointMode.EXACTLY_ONCE 的時候才有效。

統一的 Watermark 生成器

目前 Flink 的 Watermark 生成(也叫做分配)依賴於兩個接口:AssignerWithPunctuatedWatermarks 與 AssignerWithPeriodicWatermarks,這兩個接口與記錄時間戳提取的關係也比較混亂,從而使 Flink 難以實現一些用戶急需的功能,如支持空閒檢測;此外,這還會導致代碼重複且難以維護。通過 FLIP-126,現有的 watermark 生成接口被統一為一個單獨的接口,即 WatermarkGenerator,並且它和 TimestampAssigner 獨立。

這一修改使用戶可以更好的控制 watermark 的發送邏輯,並且簡化實現支持watermark 生成和時間戳提取的 Source 的難度(可以參考新的 Source 接口)。基於這一接口,Flink 1.11 中還提供了許多內置的 Watermark 生成策略(例如 forBoundedOutOfOrderness, forMonotonousTimestamps),並且用戶可以使用自己的實現。

■ 支持 Watermark 空閒檢測

WatermarkStrategy.withIdleness()方法允許用戶在配置的時間內(即超時時間內)沒有記錄到達時將一個流標記為空閒,從而進一步支持 Flink 正確處理多個併發之間的事件時間傾斜的問題,並且避免了空閒的併發延遲整個系統的事件時間。通過將 Kafka 連接器遷移至新的接口(FLINK-17669),用戶可以受益於針對單個併發的空閒檢測。

注意:這一 FLIP 的修改目前不會影響現有程序,但是我們推薦用戶後續儘量使用新的 Watermark 生成接口,避免後續版本禁用之前的 Watermark 生成器帶來的影響。

新的 Source 接口(Beta)

1.11 以編寫一個生產可用的 Flink Source 連接器並不是一個簡單的任務,它需要用戶對 Flink 內部實現有一定的瞭解,並且需要在連接器中自行實現事件時間提取、Watermark 生成和空閒檢測等功能。針對這一問題,Flink 1.11 引入了一套新的Source 接口 FLIP-27 來解決上述問題,並且同時解決了需要為批作業和流作業編寫兩套 Source 實現的問題。

image3.png

通過將分區發現和實現消費每一個分區的數據分成不同的組件(即 SplitEnumerator 和 SourceReader),新的 Source 接口允許將不同的分區發現策略和分區消費的具體實現任意組合。

例如,現有的 Kafka 連接器提供了多種不同的分區發現策略,這些策略的實現和其實代碼的實現耦合在一起。如果遷移到新的接口,Kafka Source 將可以使用相同的分區消費的實現(即 SourceReader),並且針對不同的分區發現策略編寫單獨的 SplitEnumerator 的實現。

■ 流批統一

使用新版 Source 接口的 Source 連接器將可以同時用於有限數據(批)作業和無限數據(流)作業。這兩種場景僅有一個很小的區別:在有限數據的情況下,分區發現策略將返回一個固定大小的分區並且每一個分區的數據都是有限的;在無限數據的情況下,要麼每個分區的數據量是無限的,要麼分區發現策略可以不斷的產生新的分區。

■ 內置的 Watermark 和事務時間處理

在新版 Source 接口中,TimestampAssigner 和 WatermarkGenerator 將透明的作為分區消費具體實現(SourceReader)的一部分,因此用戶不需要實現任何時間戳提取和 Watermark 生成的代碼。

注意:現有的 Source 連接器尚未基於新的 Source 接口重新實現,這將在後續版本中逐漸完成。如果想要基於新的 Source 接口實現自己的 Source,可以參考 Data Source 文檔和 Source 開發的一些建議。

Application 部署模式

在1.11之前,Flink 的作業有兩種部署模式,其中 Session 模式是將作業提交到一個長期運行的 Flink Session 集群,Job 模式是為每個作業啟動一個專門的 Flink 作業集群。這兩種模式下用戶作業的 main 方法都是客戶端執行的,但是這種方式存在一定的問題:如果客戶端是更大程序的一部分的話,生成 JobGraph 容易成為系統的瓶頸;其次,這種方式也不能很好的適應像 Docker 和 K8s 這樣的容器環境。

Flink 1.11 引入了一種新的部署模式,即 Application 模式(FLIP-85)。這種模式下用戶程序的 main 方法將在集群中而不是客戶端運行。這樣,作業提交就會變得非常簡單:用戶將程序邏輯和依賴打包進一人可執行的 jar 包裡,集群的入口程序(ApplicationClusterEntryPoint)負責調用其中的 main 方法來生成 JobGraph。

Flink 1.11 已經可以支持基於 K8s 的 Application 模式(FLINK-10934)。

其它功能修改

■ 統一 JM 的內存配置(FLIP-116)

在1.10中,Flink 統一了 TM 端的內存管理和配置,相應的在1.11中,Flink 進一步對JM 端的內存配置進行了修改,使它的選項和配置方式與 FLIP-49 中引入的 TM 端的配置方式保持一致。這一修改影響所有的部署類型,包括 standalone,Yarn,Mesos 和新引入的 K8s。

注意:複用之前的 Flink 配置將會得到不同的 JVM 參數,從而可能影響性能甚至導致異常。如果想要更新到 1.11 的話,請一定要參考遷移文檔。

■ Web UI 功能增強

在1.11中,社區對 Flink Web UI 進行了一系列的優化。首要的修改是優化了 TM 和 JM 的日誌展示(FLIP-103),其次,Flink Web UI 還引入了打印所有線程列表的工具(FLINK-14816)。在後續的版本中,Web UI 還將進一步優化,包括更好的反壓檢測,更靈活和可配置的異常展示以及對 Task 出錯歷史的展示。

■ 統一 Docker 鏡像

1.11 將所有 Docker 相關的資源都統一整理到了 apache/flink-docker項目中,並且擴展了入口腳本從而允許用戶在不同模式下使用默認的 docker 鏡像,避免了許多情況下用戶自己創建鏡像的麻煩。關於如何在不同環境和部署模式下使用和定製 Flink 官方 Docker 鏡像,請參考詳細文檔。

Table API/SQL:支持 CDC(Change Data Capture)

CDC 是數據庫中一種常用的模式,它捕獲數據庫提交的修改並且將這些修改廣播給其它的下游消費者。CDC 可以用於像同步多個數據存儲和避免雙寫導致的問題等場景。長期以來 Flink 的用戶都希望能夠將 CDC 數據通過 Table API/SQL 導入到作業中,而 Flink 1.11 實現了這一點。

為了能夠在 Table API / SQL 中使用 CDC,Flink 1.11 更新了 Table Source 與 Sink 的接口來支持 changelog 模式(參考新的 Table Source 與 Sink 接口)並且支持了 Debezium 與 Canal 格式(FLIP-105)。這一改動使動態 Table Source 不再只支持 append-only 的操作,而且可以導入外部的修改日誌(插入事件)將它們翻譯為對應的修改操作(插入,修改和刪除)並將這些操作與操作的類型發送到後續的流中。

image4.png

為了消費 CDC 數據,用戶需要在使用 SQL DDL 創建表時指指定“format=debezium-json“或者“format=canal-json”:

  CREATE TABLE my_table (
  ...
) WITH (
  'connector'='...', -- e.g. 'kafka'
  'format'='debezium-json',
  'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
  'debezium-json.ignore-parse-errors'='true' -- default: false
);

Flink 1.11 僅支持 Kafka 作為修改日誌的數據源以及 JSON 編碼格式的修改日誌;後續 Flink 將進一步支持 Avro(Debezium)和 Protobuf(Canal)格式。Flink 還計劃在未來支持 UDF MySQL 的 Binlog 以及 Kafka 的 Compact Topic 作為數據源,並且將對修改日誌的支持擴展到批作業。

注意:目前有一個已知的 BUG(FLINK-18461)會導致使用修改日誌的 Source 無法寫入到 Upsert Sink 中(例如,MySQL,HBase,ElasticSearch)。這個問題會在下一個版本(即 1.11.1)中修復。

Table API/SQL:支持 JDBC Catalog 和 Postgres Catalog

Flink 1.11 支持了一種通用的 JDBC Catalog 接口(FLIP-93),這一接口允許 Table API/SQL 的用戶自動的從通過 JDBC 連接的關係數據庫中導出表結構。這一功能避免了之前用戶需要手動複製表結構以及進行類型映射的麻煩,並且允許 Flink 在編譯時而不是運行時對錶結構進行檢查。

首先在1.11中實現的是 Postgres Catalog。

Table API/SQL:支持 Avro,ORC 和 Parquet 格式的文件系統連接器

為了提高用戶使用 Flink 進行端到端的流式 ETL 的體驗,Flink 1.11 在 Table API/SQL 中引入了新的文件系統連接器。它基於 Flink 自己的文件系統抽象和 StreamingFileSink 來實現,從而保證和 DataStream API 有相同的能力和一致的行為。

這也意味著 Table API/SQL 的用戶可以使用 StreamingFileSink 現在已經支持的文件格式,例如 (Avro) Parquet,以及在這1.11中新增加的文件格式,例如 Avro 和 ORC。

CREATE TABLE my_table (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',         
  'path' = 'file:///path/to/file,
  'format' = '...',  -- supported formats: Avro, ORC, Parquet, CSV, JSON         
  ...
);

新的全能的文件系統連接器可以透明的支持流作業和批作業,提供 Exactly-once 語義並且提供了完整的分區的支持,從而相對於之前的 Connector 極大的擴展了可以支持的場景。例如,用戶可以容易的實現將流式數據從 Kafka 寫入 Hive 的場景。

後續的文件系統連接器的優化可以參考 FLINK-17778。

Table API/SQL:支持 Python UDF

在1.11之前 Table API/SQL 的用戶只能通過 Java 或 Scala 來實現 UDF。在1.11中,Flink 擴展了 Python 語言的應用範圍,除了 PyFlink 外,Flink 1.11 還在 SQL DDL 語法(FLIP-106)和 SQL Client(FLIP-114)中支持了 Python UDF。用戶還可以在系統 Catalog 中通過 SQL DDL 或者 Java Catalog API 來註冊 Python UDF,這樣這些 UDF 可以在作業中共享。

其它的 Table API/SQL 優化

■ Hive Connect 兼容 Hive DDL 和 DML(FLIP-123)

從1.11開始,用戶可以在 Table API/SQL 和 SQL Client 中使用 Hive 語法(HiveQL)來編寫 SQL 語句。為了支持這一特性,Flink 引入了一種新的 SQL 方言,用戶可以動態的為每一條語句選擇使用Flink(default)或Hive(hive)方法。對於所支持的 DDL 和 DML 的完整列表,請參考 Hive 方言的文檔。

■ Flink SQL 語法的擴展和優化

  • Flink 1.11 引入了主鍵約束的概念,從而可以在 Flink SQL DDL 的運行時優化中使用(FLIP-87)。
  • 視圖對象已經在 SQL DDL 中完整支持,可以通過 CREATE/ALTER/DROP VIEW 等語句使用(FLIP-71)。
  • 用戶可以在 DQL 和 DML 中使用動態表屬性動態指定或覆蓋 Table 的選項(FLIP-113)。
  • 為了簡化 connector 參數的配置,提高異常處理的能力,Table API/SQL 修改了一些配置項的名稱(FLIP-122)。這一改動不會破壞兼容性,用戶仍然可以使用老的名稱。

■ 新的 Table Source 和 Sink 接口(FLIP-95)

Flink 1.11 引入了新的 Table Source 和 Sink 接口(即 DynamicTableSource 和 DynamicTableSink),這一接口可以統一批作業和流作業,在使用 Blink Planner 時提供更高效的數據處理並且可以支持修改日誌的處理(參考支持修改日誌)。新的接口簡化了用戶實現新的自定義的連接器和修改現有連接器的複雜度。一個基於支持修改日誌語義的數據解析格式來實現定製表掃描的Source的案例請參考這一文檔。

注意:儘管這一修改不會破壞兼容性,但是我們推薦 Table API/SQL 的用戶儘快將現有的Source和Sink升級到新的接口上。

■ 重構 Table Env 接口(FLIP-84)

1.11之前 TableEnvironment 和 Table 上相似的接口的行為並不完全相同,這導致了接口的不一致並使用戶感到困惑。為了解決這一問題並使基於 Table API/SQL 的編寫程序更加流暢,Flink 1.11 引入了新的方法來統一這些不一致的行為,例如執行觸發的時機(即executeSql()),結果展示(即 print(),collecto())並且為後續版本的重要功能(如多語句執行)打下了基礎。

注意:在 FLIP-84 中被標記為過期的方法不會被立刻刪掉,但是我們建議用戶採用新的方法。對於新的方法和過期方法的完整列表,可以查看 FLIP-84 的總結部分。

■ 新的類型推斷和 Table API UDF(FLIP-65)

在 Flink 1.9 中,社區開始在 Table API 中支持一種新的類型系統來提高與標準 SQL 的一致性(FLIP-37)。在1.11中這一工作接近完成,通過支持在 Table API UDF 中使用新的類型系統(目前支持 scalar 函數與 table 函數,計劃下一版本也支持 aggregate 函數)。

PyFlink:支持 Pandas UDF

在1.11之前,PyFlink 中的 Python UDF 僅支持標準的 Python 標量類型。這帶來了一些限制:

  1. 在 JVM 和 Python 進程之間傳遞數據會導致較大序列化、反序列化開銷。
  2. 難以集成常用的高性能 Python 數值計算框架,例如 Pandas 和 NumPy。

為了克服這些限制,社區引入了對基於 Pandas 的(標量)向量 Python UDF 的支持(FLIP-97)。由於可以通過利用 Apache Arrow 來最小化序列化/反序列化的開銷,向量 UDF 的性能一般會非常好;此外,將 pandas.Series 作為輸入輸出的類型可以充分複用 Pandas 和 NumPy 庫。這些特點使 Pandas UDF 特別適合並行機器學習和其它大規模、分佈式的數據科學的計算作業(例如特徵提取或分佈式模式服務)。

@udf(input_types=[DataTypes.BIGINT(),DataTypes.BIGINT()],result_type=DataTypes.BIGINT(),udf_type="pandas")
defadd(i,j):
  returni+j

為了使 UDF 變為 Pandas UDF,需要在 udf 的裝飾器中添加額外的參數 udf_type=”pandas”,如文檔所示。

PyFlink 的其它優化

■ 支持轉換器 fromPandas/toPandas(FLIP-120)

Arrow 還被用來優化 PyFlink Table 和 pandas.DataFrame 之間的轉換,從而使用戶可以在不同的處理引擎之間無縫切換,而不需要編寫特殊的連接器進行中轉。使用 fromPandas()和toPandas() 方法的安例,可以參考相關文檔。

■ 支持用戶自定義的 Table Function(User-defined Table Function,UDTF)(FLINK-14500)

從1.11開始,用戶可以在 PyFlink 定義和註冊自定義的 UDTF。與 Python UDF 類似,UDTF 可以接受0個,一個或多個標量值作為參數,但是可以返回任意多行數據作為輸出而不是隻能返回單個值。

■ 基於 Cython 對 UDF 的性能進行優化(FLIP-121)

Cython 是一個 Python 語言預編譯的超集,它經常被用來提高大規模數據計算函數的性能,因為它可以將代碼執行速度優化到機器指令級別,並且可以很好的與常用的基於 C 語言實現的庫配合,例如 NumPy。從 Flink 1.11 開始,用戶可以構造包括 Cython支持的 PyFlink[60]並且可以通過 Cython 來優化 Python UDF。這種優化可以極大的提升代碼的性能(與 1.10 的 Python UDF 相比最高能有 30 倍的提升)。

■ Python UDF 支持用戶自定義的 Metrics(FLIP-112)

為了使用戶可以更容易的監控和調試 Python UDF 的執行,PyFlink 現在支持收集和輸出 Metric 的值到外部系統中,並且支持自定義域和變量。用戶可以在 UDF 的 open 方法中通過調用 function_context.get_metric_group() 來訪問一個 Metric 系統,如文檔所示。

其它重要優化

  • [FLINK-17339] 從1.11開始,Blink Planner 將變為 Table API/SQL 的默認 Planner。實際上,在1.10中 SQL Client 的默認 Planner 已經變為 Blink Planner。老的 Planner 仍然將會支持,但是後續不會再有大的變更。
  • [FLINK-5763] Savepoints 將所有的狀態寫入到單個目錄下(包括元數據和程序狀態)。這使得用戶可以容易的看出每個 Savepoint 的 State 包含哪些文件,並且允許用戶直接通過移動目錄來實現 Savepoint 的重定位。
  • [FLINK-16408] 為了減少 JVM 元數據空間的壓力,Flink 1.11 中對於單個 TaskExecutor 只要上面還有某個作業的 Slot,該作業的 ClassLoader 就會被複用。這一改動會改變 Flink 錯誤恢復的行為,因為 static 字段不會被重新初始化。
  • [FLINK-11086] Flink 現在可以支持 Hadoop 3.0.0 以上的版本。注意 Flink 項目並未提供任何更新的“flink-shaded-hadoop-*”的 jar 包,而是需要用戶自己將相應的 Hadoop 依賴加入 HADOOP_CLASSPATH 環境變量(推薦的方式)或者將 Hadoop 依賴加入到 lib/目錄下。
  • [FLINK-16963] 所有 Flink 內置的 Metric Report 現在被修改為 Flink 的插件。如果要使用它們,不應該放置到 lib/目錄下(會導致類衝突),而是要放置到 plugins/目錄下。
  • [FLINK-12639] 社區正在對 Flink 文檔進行重構,從1.11開始,您可能會注意到文檔的導航和內容組織發生了一些變化。

詳細發佈說明

如果你想要升級到1.11的話,請詳細閱讀詳細發佈說明。與之前所有1.x版本相比,1.11可以保證所有標記為@Public的接口的兼容。

點擊「閱讀原文」即可查看原版官方博客~

Leave a Reply

Your email address will not be published. Required fields are marked *