作者 | 伍翀(雲邪),阿里巴巴技術專家
整理 | 陳婧敏(清樾),阿里巴巴技術專家
摘要:本文整理自 Flink Forward 2020 全球在線會議中文精華版,由 Apache Flink PMC 伍翀(雲邪)分享,社區志願者陳婧敏(清樾)整理。旨在幫助大家更好地理解 Flink SQL 引擎的工作原理。文章主要分為以下四部分:
- Flink SQL Architecture
- How Flink SQL Works?
- Flink SQL Optimizations
- Summary and Futures
Tips:點擊下方鏈接可查看作者分享的原版視頻~
https://ververica.cn/developers/flink-forward-virtual-conference/
Apache Flink 社區在最近的兩個版本(1.9 & 1.10 )中為面向未來的統一流批處理在架構層面做了很多優化,其中一個重大改造是引入了 Blink Planner,開始支持 SQL & Table API 使用不同的 SQL Planner 進行編譯(Planner 的插件化)。
本文首先會介紹推動這些優化背後的思考,展示統一的架構如何更好地處理流式和批式查詢,其次將深入剖析 Flink SQL 的編譯及優化過程,包括:
- Flink SQL 利用 Apache Calcite 將 SQL 翻譯為關係代數表達式,使用表達式摺疊(Expression Reduce),下推優化(Predicate / Projection Pushdown )等優化技術生成物理執行計劃(Physical Plan),利用 Codegen 技術生成高效執行代碼。
- Flink SQL 使用高效的二進制數據存儲結構 BinaryRow 加速計算性能;使用 Mini-batch 攢批提高吞吐,降低兩層聚合時由 Retraction 引起的數據抖動;聚合場景下數據傾斜處理和 Top-N 排序的優化原理。
## Flink SQL 架構 & Blink Planner(1.9+ )
1.1 Old Planner 的限制
要想了解 Flink SQL 在1.9 版本引入新架構的動機,我們首先看下 1.9 版本之前的架構設計。
從圖中可以看出,雖然面向用戶的 Table API & SQL 是統一的,但是流式和批式任務在翻譯層分別對應了 DataStreamAPI 和 DataSetAPI,在 Runtime 層面也要根據不同的 API 獲取執行計劃,兩層的設計使得整個架構能夠複用的模塊有限,不易擴展。
1.2 統一的 Blink Planner
Flink 在設計之初就遵循“批是流的特例”的理念,在架構上做到流批統一是大勢所趨。在社區和阿里巴巴的共同努力下,1.9 版本引入了新的 Blink Planner,將批 SQL 處理作為流 SQL 處理的特例,儘量對通用的處理和優化邏輯進行抽象和複用,通過 Flink 內部的 Stream Transformation API 實現流 & 批的統一處理,替代原 Flink Planner 將流 & 批區分處理的方式。
此外,新架構通過靈活的插件化方式兼容老版本 Planner,用戶可自行選擇。不過在 1.11 版本 Blink Planner 會代替 Old Planner 成為默認的 Planner 來支持流 & 批進一步融合統一( Old Planner 將在之後逐步退出歷史舞臺)。
Flink SQL 工作流
Flink SQL 引擎的工作流總結如圖所示。
從圖中可以看出,一段查詢 SQL / 使用TableAPI 編寫的程序(以下簡稱 TableAPI 代碼)從輸入到編譯為可執行的 JobGraph 主要經歷如下幾個階段
- 將 SQL文本 / TableAPI 代碼轉化為邏輯執行計劃(Logical Plan)
- Logical Plan 通過優化器優化為物理執行計劃(Physical Plan)
- 通過代碼生成技術生成 Transformations 後進一步編譯為可執行的 JobGraph 提交運行
本節將重點對 Flink SQL 優化器的常用優化方法和 CodeGen 生成 Transformations 進行介紹。
2.1 Logical Planning
Flink SQL 引擎使用 Apache Calcite SQL Parser 將 SQL 文本解析為詞法樹,SQL Validator 獲取 Catalog 中元數據的信息進行語法分析和驗證,轉化為關係代數表達式(RelNode),再由 Optimizer 將關係代數表達式轉換為初始狀態的邏輯執行計劃。
備註:TableAPI 代碼使用 TableAPI Validator 對接 Catalog 後生成邏輯執行計劃。
E.g.1 考慮如下表達 JOIN 操作的一段 SQL。
SELECT
t1.id, 1 + 2 + t1.value AS v
FROM t1, t2
WHERE
t1.id = t2.id AND
t2.id < 1000
經過上述操作後得到了一個樹狀結構的邏輯執行計劃,根節點對應最上層的 Select 語句,葉子節點對應輸入表 t1 和 t2 的 TableScan 操作,Join 和 Where 條件過濾 分別對應了 Join 和 Filter 節點。
LogicalProject(id=[$0], v=[+(+(1, 2), $1)])
+- LogicalFilter(condition=[AND(=($0, $3), <($3, 1000))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default, t1]])
+- LogicalTableScan(table=[[default_catalog, default, t2]])
可視化後如圖所示,這是優化器開始工作的初始狀態。
下面開始介紹 Flink SQL 優化器常見的幾種優化方式。
■ 2.1.1 Expression Reduce
表達式(Expression) 是 SQL 中最常見的語法。比如 t1.id 是一個表達式, 1 + 2 + t1.value 也是一個表達式。優化器在優化過程中會遞歸遍歷樹上節點,儘可能預計算出每個表達式的值,這個過程就稱為表達式摺疊。這種轉換在邏輯上等價,通過優化後,真正執行時不再需要為每一條記錄都計算一遍 1 + 2。
■ 2.1.2 PushDown Optimization
下推優化是指在保持關係代數語義不變的前提下將 SQL 語句中的變換操作儘可能下推到靠近數據源的位置以獲得更優的性能,常見的下推優化有謂詞下推(Predicate Pushdown),投影下推(Projection Pushdown,有時也譯作列裁剪)等。
- Predicate Pushdown
回顧 E.g.1,我們發現 WHERE 條件表達式中 t2.id < 1000 這個過濾條件描述的是對錶 t2 的約束,跟表 t1 無關,完全可以下推到 JOIN 操作之前完成。假設表 t2 中有一百萬行數據,但是滿足 id < 1000 的數據只有 1,000 條,則通過謂詞下推優化後到達 JOIN 節點的數據量降低了1,000 倍,極大地節省了 I / O 開銷,提升了 JOIN 性能。
謂詞下推(Predicate Pushdown)是優化 SQL 查詢的一項基本技術,謂詞一詞來源於數學,指能推導出一個布爾返回值(TRUE / FALSE)的函數或表達式,通過判斷布爾值可以進行數據過濾。謂詞下推是指保持關係代數語義不變的前提下將 Filter 儘可能移至靠近數據源的位置(比如讀取數據的 SCAN 階段)來降低查詢和傳遞的數據量(記錄數)。
- Projection Pushdown
列裁剪是 Projection Pushdown 更直觀的描述方式,指在優化過程中去掉沒有使用的列來降低 I / O 開銷,提升性能。但與謂詞下推只移動節點位置不同,投影下推可能會增加節點個數。比如最後計算出的投影組合應該放在 TableScan 操作之上,而 TableScan 節點之上沒有 Projection 節點,優化器就會顯式地新增 Projection 節點來完成優化。另外如果輸入表是基於列式存儲的(如 Parquet 或 ORC 等),優化還會繼續下推到 Scan 操作中進行。
回顧 E.g.1,我們發現整個查詢中只用到了表 t1 的 id 和 value 字段,表 t2 的 id 字段,在 TableScan 節點之上分別增加 Projection 節點去掉多餘字段,極大地節省了 I / O 開銷。
簡要總結一下,謂詞下推和投影下推分別通過避免處理不必要的記錄數和字段數來降低 I / O 開銷提升性能。
2.2 Physical Planning on Batch
通過上述一系列操作後,我們得到了優化後的邏輯執行計劃。邏輯執行計劃描述了執行步驟和每一步需要完成的操作,但沒有描述操作的具體實現方式。而物理執行計劃會考慮物理實現的特性,生成每一個操作的具體實現方式。比如 Join 是使用 SortMergeJoin、HashJoin 或 BroadcastHashJoin 等。優化器在生成邏輯執行計劃時會計算整棵樹上每一個節點的 Cost,對於有多種實現方式的節點(比如 Join 節點),優化器會展開所有可能的 Join 方式分別計算。最終整條路徑上 Cost 最小的實現方式就被選中成為 Final Physical Plan。
回顧 E.g.1,當它以批模式執行,同時我們可以拿到輸入表的 Statistics 信息。在經過前述優化後,表 t2 到達 Join 節點時只有 1,000 條數據,使用 BroadcastJoin 的開銷相對最低,則最終的 Physical Plan 如下圖所示。
2.3 Translation & Code Generation
代碼生成(Code Generation) 在計算機領域是一種廣泛使用的技術。在 Physical Plan 到生成 Transformation Tree 過程中就使用了 Code Generation。
回顧 E.g.1,以 表 t2 之上的 Calc 節點 t2.id < 1000 表達式為例,通過 Code Generation 後生成了描述 Transformation Operator 的一段 Java 代碼,將接收到的 Row 中 id < 1000 的 Row 發送到下一個 Operator。
Flink SQL 引擎會將 Physical Plan 通過 Code Generation 翻譯為 Transformations,再進一步編譯為可執行的 JobGraph。
2.4 Physical Planning on Stream
以上介紹了 Flink SQL 引擎的整體工作流,上述例子是假定以批模式編譯的,下面我們來介紹一下以流模式編譯時,在生成 Physical Plan 過程中的一個重要機制:Retraction Mechanism (aka. Changelog Mechanism)。
■ 2.4.1 Retraction Mechanism
Retraction 是流式數據處理中撤回過早下發(Early Firing)數據的一種機制,類似於傳統數據庫的 Update 操作。級聯的聚合等複雜 SQL 中如果沒有 Retraction 機制,就會導致最終的計算結果與批處理不同,這也是目前業界很多流計算引擎的缺陷。
E.g.2 考慮如下統計詞頻分佈的 SQL。
SELECT cnt, COUNT(cnt) as freq
FROM (
SELECT word, COUNT(*) as cnt
FROM words
GROUP BY word)
GROUP BY cnt
假設輸入數據是:
則經過上面的計算後,預期的輸出結果應該是:
但與批處理不同,流處理的數據是一條條到達的,理論上每一條數據都會觸發一次計算,所以在處理了第一個 Hello 和第一個 World 之後,詞頻為 1 的單詞數已經變成了 2,此時再處理第二個 Hello 時,如果不能修正之前的結果,Hello 就會在詞頻等於 1 和詞頻等於 2 這兩個窗口下被同時統計,顯然這個結果是錯誤的,這就是沒有 Retraction 機制帶來的問題。
Flink SQL 在流計算領域中的一個重大貢獻就是首次提出了這個機制的具體實現方案。Retraction 機制又名 Changelog 機制,因為某種程度上 Flink 將輸入的流數據看作是數據庫的 Changelog,每條輸入數據都可以看作是對數據庫的一次變更操作,比如 Insert,Delete 或者 Update。以 MySQL 數據庫為例,其Binlog 信息以二進制形式存儲,其中 Update_rows_log_event 會對應 2 條標記 Before Image (BI) 和 After Image (AI),分別表示某一行在更新前後的信息。
在 Flink SQL 優化器生成流作業的 Physical Plan 時會判斷當前節點是否是更新操作,如果是則會同時發出 2 條消息 update_before 和 update_after 到下游節點,update_before 表示之前“錯誤”下發的數據,需要被撤回,update_after 表示當前下發的“正確”數據。下游收到後,會在結果上先減去 update_before,再加上 update_after。
回顧 E.g.2,下面的動圖演示了加入 Retraction 機制後正確結果的計算過程。
update_before 是一條非常關鍵的信息,相當於標記出了導致當前結果不正確的那個“元凶”。不過額外操作會帶來額外的開銷,有些情況下不需要發送 update_before 也可以獲得正確的結果,比如下游節點接的是 UpsertSink(MySQL 或者 HBase的情況下,數據庫可以按主鍵用 update_after 消息覆蓋結果)。是否發送 update_before 由優化器決定,用戶不需要關心。
■ 2.4.2 Update_before Decision
前面介紹了 Retraction 機制和 update_before,那優化器是怎樣決定是否需要發送update_before 呢?本節將介紹這一部分的工作。
Step1:確定每個節點對應的 Changelog 變更類型
數據庫中最常見的三種操作類型分別是 Insert (記為 [I]),Delete(記為 [D]),Update(記為 [U])。優化器首先會自底向上檢查每個節點,判斷它屬於哪(幾)種類型,分別打上對應標記。
回顧 E.g.2,第一個 Source 節點由於只產生新數據,所以屬於 Insert,記為 [I];第二個節點計算內層的聚合,所以會發出更新的消息,記為 [I,U];第三個節點裁掉 word 字段,屬於簡單計算,傳遞了上游的變更類型,記為 [I,U];第四個節點是外層的聚合計算,由於它收到了來自上游的 Update 消息,所以額外需要 Delete 操作來保證更新成功,記為 [I,U,D]。
Step2:確定每個節點發送的消息類型
在介紹 Step2 之前,我們先介紹下 Flink 中 Update 消息類型的表示形式。在 Flink 中 Update 由兩條 update_before(簡稱 UB)和 update_after (簡稱 UA)來表示,其中 UB 消息在某些情況下可以不發送,從而提高性能。
在 Step1 中優化器自底向上推導出了每個節點對應的 Changelog 變更操作,這一步裡會先自頂向下推斷當前節點需要父節點提供的消息類型,直到遇到第一個不需要父節點提供任何消息類型的節點,再往上回推每個節點最終的實現方式和需要的消息類型。
回顧 E.g.2,由於最上層節點是 UpsertSink 節點,只需要它的父節點提供 [UA] 即可。到了外層聚合的 Aggregate 節點,由於 Aggregate 節點的輸入有 Update 操作,所以需要父節點需要提供 [UB,UA],這樣才能正確更新自己的計算狀態。
再往下到 Calc 節點,它需要傳遞 [UB,UA] 的需求給它的父節點,也就是內層的 Aggregate 節點。而到了內層 Aggregation 節點,它的父節點是 Source 節點,不會產生 Update 操作,所以它不需要 Source 節點額外發送任何 [UB / UA ]。當優化器遍歷到 Source 節點,便開始進行回溯,如果當前節點能滿足子節點的 requirement,則將對應的標籤更新到節點上,否則便無法生成 plan。首先內層的 Aggregate 能產生 UB,所以能滿足子節點的 requirement,所以優化器會給內層的 Aggregate 節點打上 [UB,UA] 的標籤,然後向上傳遞到 Calc 節點,同樣打上 [UB,UA] ,再到外層的 Aggregate 節點,由於它的下游只需要接受更新後的消息,所以打上 [UA] 標籤,表示它只需要向下遊發送 update_after 即可。
這些標籤最終會影響算子的物理實現,比如外層的 Aggregate 節點,由於它會接收到來自上游的 [UB],所以物理實現會使用帶 Retract 的 Count,同時它只會向 Sink 發送 update_after。而內層的 Aggregate 節點,由於上游發送過來的數據沒有 [UB],所以可以採用不帶 Retract 的 Count 實現,同時由於帶有 [UB] 標籤,所以需要往下游發送 update_before。
Flink SQL Internal Optimization
前面介紹了 Flink SQL 引擎的工作原理,接下來會簡要概括一下 Flink SQL 內部的一些優化,更多資料可以在 Flink Forward Asia 2019 查看。
3.1 BinaryRow
在 Flink 1.9+ 前, Flink Runtime 層各算子間傳遞的數據結構是 Row,其內部實現是 Object[]。這種數據結構的問題在於不但需要額外開銷存 Object Metadata,計算過程中還涉及到大量序列化 / 反序列 (特別是只需要處理某幾個字段時需要反序列化整個 Row),primitive 類型的拆 / 裝箱等,都會帶來大量額外的性能開銷。
Flink 1.9 開始引入了 Blink Planner,使用二進制數據結構的 BinaryRow 來表示 Record。BinaryRow 作用於默認大小為 32K 的 Memory Segment,直接映射到內存。BinaryRow 內部分為 Header,定長區和變長區。Header 用於存儲 Retraction 消息的標識,定長區使用 8 個 bytes 來記錄字段的 Nullable 信息及所有 primitive 和可以在 8 個 bytes 內表示的類型。其它類型會按照基於起始位置的 offset 存放在變長區。
BinaryRow 作為 Blink Planner 的基礎數據結構,帶來的好處是顯而易見的:首先存儲上更為緊湊,去掉了額外開銷;其次在序列化和反序列化上帶來的顯著性能提升,可根據 offset 只反序列化需要的字段,在開啟 Object Reuse 後,序列化可以直接通過內存拷貝完成。
3.2 Mini-batch Processing
Flink 是純流式處理框架,在理論上每一條新到的數據都會觸發一次計算。然而在實現層面,這樣做會導致聚合場景下每處理一條數據都需要讀寫 State 及序列化 / 反序列化。如果能夠在內存中 buffer 一定量的數據,預先做一次聚合後再更新 State,則不但會降低操作 State 的開銷,還會有效減少發送到下游的數據量,提升 throughput,降低兩層聚合時由 Retraction 引起的數據抖動, 這就是 Mini-batch 攢批優化的核心思想。
3.3 Skew Processing
對於數據傾斜的優化,主要分為是否帶 DISTINCT 去重語義的兩種方式。對於普通聚合的數據傾斜,Flink 引入了 Local-Global 兩階段優化,類似於 MapReduce 增加 Local Combiner 的處理模式。而對於帶有去重的聚合,Flink 則會將用戶的 SQL 按原有聚合的 key 組合再加上 DISTINCT key 做 Hash 取模後改寫為兩層聚合來進行打散。
3.4 Top-N Rewrite
全局排序在流式的場景是很難實現的,但如果只需要計算到目前的 Top-N 極值,問題就變得可解。不過傳統數據庫求排序的 SQL 語法是通過 ORDER BY 加 LIMIT 限制條數,背後實現的機制也是通過掃描全表排序後再返回 LIMIT 條數的記錄。另外如果按照某些字段開窗排序,ORDER BY 也無法滿足要求。Flink SQL 借鑑了批場景下開窗求 Top-N 的語法,使用 ROW_NUMBER 語法來做流場景下的 Top-N 排序。
E.g.3 下面這段 SQL 計算了每個類目下銷量 Top3 的店鋪
SELECT*
FROM(
SELECT *, -- you can get like shopId or other information from this
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rowNum
FROM shop_sales )
WHERE rowNum <= 3
在生成 Plan 方面,ROW_NUMBER 語義對應 OverAggregate 窗口節點和一個過濾行數的 Calc 節點,而這個窗口節點在實現層面需要為每一個到達的數據重新將 State 中的歷史數據拿出來排序,這顯然不是最優解。
我們知道流式場景求解極大 / 小值的最優操作是通過維護一個 size 為 N 的 minHeap / maxHeap。由實現反推出我們需要在優化器上新增一條規則,在遇到 ROW_NUMBER 生成的邏輯節點後,將其優化為一個特殊的 Rank 節點,對應上述的最優實現方式(當然這只是特殊 Rank 對應的其中一種實現)。這便是 Top-N Rewrite 的核心思想。
Summary & Futures
本文內容回顧
- 簡要介紹 Flink 1.9 + 在 SQL & TableAPI 上引入新架構,統一技術棧,朝著流 & 批一體的方向邁進了一大步。
- 深入介紹 Flink SQL 引擎的內部運行機制,以及在對用戶透明的同時,Flink SQL 在優化方面做的許多工作。
未來工作計劃
- 在 Flink 1.11+ 後的版本,Blink Planner 將作為默認的 Planner 提供生產級別的支持。
- FLIP-95:重構 TableSource & TableSink 的接口設計,面向流批一體化,在 Source 端支持 changelog 消息流,從而支持 FLIP-105 的 CDC 數據源。
- FLIP-105:Flink TableAPI & SQL 對 CDC 的支持。
- FLIP-115:擴展目前只支持 CSV 的 FileSystem Connector,使其成為流批統一的 Generalized FileSystem Connector。
- FLIP-123:對 Hive DDL 和 DML 的兼容,支持用戶在 Flink 中運行 Hive DDL。