本文由社區志願者陳政羽整理,Apache Flink 社區在 5 月份發佈了 1.13 版本,帶來了很多新的變化。文章整理自徐榜江(雪盡) 5 月 22 日在北京的 Flink Meetup 分享的《深入解讀 Flink SQL 1.13》,內容包括:
- Flink SQL 1.13 概覽
- 核心 feature 解讀
- 重要改進解讀
- Flink SQL 1.14 未來規劃
- 總結
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、Flink SQL 1.13 概覽
Flink 1.13 是一個社區大版本,解決的 issue 在 1000 個以上,通過上圖我們可以看到,解決的問題大部分是關於 Table/SQL 模塊,一共 400 多個 issue 佔了總體的 37% 左右。這些 issue 主要圍繞了 5 個 FLIP 展開,在本文中我們也會根據這 5 個方面進行介紹,它們分別是:
下面我們對這些 FLIP 進行詳細解讀。
二、 核心 feature 解讀
1. FLIP-145:支持 Window TVF
社區的小夥伴應該瞭解,在騰訊、阿里巴巴、字節跳動等公司的內部分支已經開發了這個功能的基礎版本。這次 Flink 社區也在 Flink 1.13 推出了 TVF 的相關支持和優化。下面將從 Window TVF 語法、近實時累計計算場景、 Window 性能優化、多維數據分析,來分析這個新功能。
1.1 Window TVF 語法
在 1.13 版本前,window 的實現是通過一個特殊的 SqlGroupedWindowFunction:
SELECT
TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
TUMBLE_END(bidtime,INTERVAL '10' MINUTE),
TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)
在 1.13 版本中,我們對它進行了 Table-Valued Function 的語法標準化:
SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price)
FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end
通過對比兩種語法,我們可以發現:TVF 語法更加靈活,不需要必須跟在 GROUP BY 關鍵字後面,同時 Window TVF 基於關係代數,使得其更加標準。在只需要劃分窗口場景時,可以只用 TVF,無需用 GROUP BY 做聚合,這使得 TVF 擴展性和表達能力更強,支持自定義 TVF(例如實現 TOP-N 的 TVF)。
上圖中的示例就是利用 TVF 做的滾動窗口的劃分,只需要把數據劃分到窗口,無需聚合;如果後續需要聚合,再進行 GROP BY 即可。同時,對於熟悉批 SQL 的用戶來說,這種操作是非常自然的,我們不再需要像 1.13 版本之前那樣必須要用特殊的 SqlGroupedWindowFunction 將窗口劃分和聚合綁定在一起。
目前 Window TVF 支持 tumble window,hop window,新增了 cumulate window;session window 預計在 1.14 版本也會支持。
1.2 Cumulate Window
Cumulate window 就是累計窗口,簡單來說,以上圖裡面時間軸上的一個區間為窗口步長。
- 第一個 window 統計的是一個區間的數據;
- 第二個 window 統計的是第一區間和第二個區間的數據;
- 第三個 window 統計的是第一區間,第二個區間和第三個區間的數據。
累積計算在業務場景中非常常見,如累積 UV 場景。在 UV 大盤曲線中:我們每隔 10 分鐘統計一次當天累積用戶 UV。
在 1.13 版本之前,當需要做這種計算時,我們一般的 SQL 寫法如下:
INSERT INTO cumulative_UV
SELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UV
FROM (
SELECT
DATE_FORMAT(ts,'yyyy-MM-dd') as date_str,
SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0' as time_str,
user_id
FROM user_behavior
)
GROUP BY date_str
先將每條記錄所屬的時間窗口字段拼接好,然後再對所有記錄按照拼接好的時間窗口字段,通過 GROUP BY 做聚合,從而達到近似累積計算的效果。
- 1.13 版本前的寫法有很多缺點,首先這個聚合操作是每條記錄都會計算一次。其次,在追逆數據的時候,消費堆積的數據時,UV 大盤的曲線就會跳變。
- 在 1.13 版本支持了 TVF 寫法,基於 cumulate window,我們可以修改為下面的寫法,將每條數據按照 Event Time 精確地分到每個 Window 裡面, 每個窗口的計算通過 watermark 觸發,即使在追數據場景中也不會跳變。
INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT(DISTINCT user_id) as UV
FROM Table(
CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY))
)
GROUP BY WINDOW_start,WINDOW_end
UV 大盤曲線效果如下圖所示:
1.3 Window 性能優化
Flink 1.13 社區開發者們對 Window TVF 進行了一系列的性能優化,包括:
- 內存優化:通過內存預分配,緩存 window 的數據,通過 window watermark 觸發計算,通過申請一些內存 buffer 避免高頻的訪問 state;
- 切片優化:將 window 切片,儘可能複用已計算結果,如 hop window,cumulate window。計算過的分片數據無需再次計算,只需對切片的計算結果進行復用;
- 算子優化:window 算子支持 local-global 優化;同時支持 count(distinct) 自動解熱點優化;
- 遲到數據:支持將遲到數據計算到後續分片,保證數據準確性。
基於這些優化,我們通過開源 Benchmark (Nexmark) 進行性能測試。結果顯示 window 的普適性能有 2x 提升,且在 count(distinct) 場景會有更好的性能提升。
1.4 多維數據分析
語法的標準化帶來了更多的靈活性和擴展性,用戶可以直接在 window 窗口函數上進行多維分析。如下圖所示,可以直接進行 GROUPING SETS、ROLLUP、CUBE 的分析計算。如果是在 1.13 之前的版本,我們可能需要對這些分組進行單獨的 SQL 聚合,再對聚合結果做 union 操作才能達到類似的效果。而現在,類似這種多維分析的場景,可以直接在 window TVF 上支持。
支持 Window Top-N
除了多維分析,Window TVF 也支持 Top-N 語法,使得在 Window 上取 Top-N 的寫法更加簡單。
2. FLIP-162:時區和時間函數
2.1 時區問題分析
大家在使用 Flink SQL 時反饋了很多時區相關的問題,造成時區問題的原因可以歸納為 3 個:
- PROCTIME() 函數應該考慮時區,但未考慮時區;
- CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函數未考慮時區;
- Flink 的時間屬性,只支持定義在 TIMESTAMP 這種數據類型上面,這個類型是無時區的,TIMESTAMP 類型不考慮時區,但用戶希望是本地時區的時間。
針對 TIMESTAMP 類型沒有考慮時區的問題,我們提議通過TIMESTAMP_LTZ類型支持 (TIMESTAMP_LTZ 是 timestamp with local time zone 的縮寫)。可以通過下面的表格來進行和 TIMESTAMP 的對比:
TIMESTAMP_LTZ 區別於之前我們使用的 TIMESTAMP,它表示絕對時間的含義。通過對比我們可以發現:
- 如果我們配置使用 TIMESTAMP,它可以是字符串類型的。用戶不管是從英國還是中國時區來觀察,這個值都是一樣的;
- 但是對於 TIMSTAMP_TLZ 來說,它的來源就是一個 Long 值,表示從時間原點流逝過的時間。同一時刻,從時間原點流逝的時間在所有時區都是相同的,所以這個 Long 值是絕對時間的概念。當我們在不同的時區去觀察這個值,我們會用本地的時區去解釋成 “年-月-日-時-分-秒” 的可讀格式,這就是 TIMSTAMP_TLZ 類型,TIMESTAMP_LTZ 類型也更加符合用戶在不同時區下的使用習慣。
下面的例子展示了 TIMESTAMP 和 TIMESTAMP_LTZ 兩個類型的區別。
2.2 時間函數糾正
訂正 PROCTIME() 函數
當我們有了 TIMESTAMP_LTZ 這個類型的時候,我們對 PROCTIME() 類型做了糾正:在 1.13 版本之前,它總是返回 UTC 的 TIMESTAMP;而現在,我們把返回類型變為了 TIMESTAMP_LTZ。PROCTIME 除了表示函數之外,也可以表示時間屬性的標記。
訂正 CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函數
這些函數在不同時區下出來的值是會發生變化的。例如在英國 UTC 時區時候是凌晨 2 點;但是如果你設置了時區是 UTC+8,時間就是在早上的 10 點。不同時區的實際時間會發生變化,效果如下圖:
解決 processing time Window 時區問題
大家都知道 proctime 可以表示一個時間屬性,對 proctime 的 window 操作:
- 在 1.13 版本之前,如果我們需要做按天的 window 操作,你需要手動解決時區問題,去做一些 8 小時的偏移然後再減回去;
- 在 FLIP-162 中我們解決了這個問題,現在用戶使用的時候十分簡單,只需要聲明 proctime 屬性,因為 PROCTIME() 函數的返回值是TIMESTAMP_LTZ,所以結果是會考慮本地的時區。下圖的例子顯示了在不同的時區下,proctime 屬性的 window 的聚合是按照本地時區進行的。
訂正 Streaming 和 Batch 模式下函數取值方式
時間函數其實在流和批上面的表現形式會有所區別,這次修正主要是讓其更加符合用戶實際的使用習慣。例如以下函數:
- 在流模式中是 per-record 計算,即每條數據都計算一次;
- 在 Batch 模式是 query-start 計算,即在作業開始前計算一次。例如我們常用的一些 Batch 計算引擎,如 Hive 也是在每一個批開始前計算一次。
2.3 時間類型使用
在 1.13 版本也支持了在 TIMESTAMP 列上定義 Event time,也就是說Event time 現在既支持定義在 TIMESTAMP 列上,也支持定義在 TIMESTAMP_ LTZ 列上。那麼作為用戶,具體什麼場景用什麼類型呢?
- 當作業的上游源數據包含了字符串的時間(如:2021-4-15 14:00:00)這樣的場景,直接聲明為 TIMESTAMP 然後把 Event time 定義在上面即可,窗口在計算的時候會基於時間字符串進行切分,最終會計算出符合你實際想要的預想結果;
- 當上遊數據源的打點時間屬於 long 值,表示的是一個絕對時間的含義。在 1.13 版本你可以把 Event time 定義在 TIMESTAMP_LTZ 上面。此時定義在 TIMESTAMP_LTZ 類型上的各種 WINDOW 聚合,都能夠自動的解決 8 小時的時區偏移問題,無需按照之前的 SQL 寫法額外做時區的修改和訂正。
小提示:Flink SQL 中關於時間函數,時區支持的這些提升,是版本不兼容的。用戶在進行版本更新的時候需要留意作業邏輯中是否包含此類函數,避免升級後業務受到影響。
2.4 夏令時支持
在 Flink 1.13 以前,對於國外夏令時時區的用戶,做窗口相關的計算操作是十分困難的一件事,因為存在夏令時和冬令時切換的跳變。
Flink 1.13 通過支持在 TIMESTAMP_LTZ 列上定義時間屬性,同時 Flink SQL 在 WINDOW 處理時巧妙地結合 TIMESTAMP 和 TIMESTAMP_LTZ 類型,優雅地支持了夏令時。這對國外夏令時時區用戶,以及有海外業務場景的公司比較有用。
三、重要改進解讀
1. FLIP-152:提升 Hive 語法兼容性
FLIP-152 主要是做了 Hive 語法的兼容性增強,支持了 Hive 的一些常用 DML 和 DQL 語法,包括:
通過 Hive dialect 支持 Hive 常用語法。Hive 有很多的內置函數,Hive dialect 需要配合 HiveCatalog 和 Hive Module 一起使用,Hive Module 提供了 Hive 所有內置函數,加載後可以直接訪問。
與此同時,我們還可以通過 Hive dialect 創建/刪除 Catalog 函數以及一些自定義的函數,這樣使得 Flink SQL 與 Hive 的兼容性得到了極大的提升,讓熟悉 Hive 的用戶使用起來會更加方便。
2. FLIP-163:改進 SQL Client
在 1.13 版本之前,大家覺得 Flink SQL Client 就是周邊的一個小工具。但是,FLIP-163 在 1.13 版本進行了重要改進:
- 通過 -i 的參數,提前把 DDL 一次性加載初始化,方便初始化表的多個 DDL 語句,不需要多次執行命令創建表,替代了之前用 yaml 文件方式創建表;
- 支持 -f 參數,其中 SQL 文件支持 DML(insert into)語句;
-
支持更多實用的配置:
- 通過 SET SQL-client.verbose = true , 開啟 verbose,通過開啟 verbose 打印整個信息,相對以前只輸出一句話更加容易追蹤錯誤信息;
- 通過 SET execution.runtime-mode=streaming / batch 支持設置批/流作業模式;
- 通過 SET pipline.name=my_Flink_job 設置作業名稱;
- 通過 SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab 設置作業 savepoint 路徑;
- 對於有依賴的多個作業,通過 SET Table.dml-sync=true 去選擇是否異步執行,例如離線作業,作業 a 跑完才能跑作業 b ,通過設置為 true 實現執行有依賴關係的 pipeline 調度。
-
同時支持 STATEMENT SET語法:
有可能我們的一個查詢不止寫到一個 sink 裡面,而是需要輸出到多個 sink,比如一個 sink 寫到 jdbc,一個 sink 寫到 HBase。
- 在 1.13 版本之前需要啟動 2 個 query 去完成這個作業;
- 在 1.13 版本,我們可以把這些放到一個 statement 裡面,以一個作業的方式去執行,能夠實現節點的複用,節約資源。
3. FLIP-136:增強 DataStream 和 Table 的轉換
雖然 Flink SQL 大大降低了我們使用實時計算的一些使用門檻,但 Table/SQL 這種高級封裝也屏蔽了一些底層實現,如 timer,state 等。不少高級用戶希望能夠直接操作 DataStream 獲得更多的靈活性,這就需要在 Table 和 DataStream 之間進行轉換。FLIP-136 增強了 Table 和 DataStream 間的轉換,使得用戶在兩者之間的轉換更加容易。
- 支持 DataStream 和 Table 轉換時傳遞 EVENT TIME 和 WATERMARK;
Table Table = TableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime","TIMESTMP(3)")
.watermark("rowtime","SOURCE_WATERMARK()")
.build());
)
- 支持 Changelog 數據流在 Table 和 DataStream 間相互轉換。
//DATASTREAM 轉 Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table
//Table 轉 DATASTREAM
StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>
StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>
四、Flink SQL 1.14 未來規劃
1.14 版本主要有以下幾點規劃:
- 刪除 Legacy Planner:從 Flink 1.9 開始,在阿里貢獻了 Blink-Planner 之後,很多一些新的 Feature 已經基於此 Blink Planner 進行開發,以前舊的 Legacy Planner 會徹底刪除;
- 完善 Window TVF:支持 session window,支持 window TVF 的 allow -lateness 等;
- 提升 Schema Handling:全鏈路的 Schema 處理能力以及關鍵校驗的提升;
- 增強 Flink CDC 支持:增強對上游 CDC 系統的集成能力,Flink SQL 內更多的算子支持 CDC 數據流。
五、總結
本文詳細解讀了 Flink SQL 1.13 的核心功能和重要改進。
- 支持 Window TVF;
- 系統地解決時區和時間函數問題;
- 提升 Hive 和 Flink 的兼容性;
- 改進 SQL Client;
- 增強 DataStream 和 Table 的轉換。
同時還分享了社區關於 Flink SQL 1.14 的未來規劃,相信看完文章的同學可以對 Flink SQL 在這個版本中的變化有更多的瞭解,在實踐過程中大家可以多多關注這些新的改動和變化,感受它們所帶來的業務層面上的便捷。
活動推薦
阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版現開啟6月限時活動:
0元試用實時計算 Flink 全託管版本(包年包月、10CU)即可有機會獲得 Flink 獨家定製T恤;另包3個月及以上還有85折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc