大數據

Apache Flink 進階(十二):深度探索 Flink SQL

作者:賀小令(曉令)
整理:鄭仲尼

本文根據 Apache Flink 進階篇系列直播整理而成,由阿里巴巴技術專家賀小令分享,文章將從用戶的角度來講解 Flink 1.9 版本中 SQL 相關原理及部分功能變更,希望加深大家對 Flink 1.9 新功能的理解,在使用上能夠有所幫助。主要內容:

  1. 新 TableEnvironment 的設計與使用場景
  2. 新 Catalog 的設計以及 DDL 實踐
  3. Blink Planner 的幾點重要改進及優化

新 TableEnvironment

FLIP-32 中提出,將 Blink 完全開源,合併到 Flink 主分支中。合併後在 Flink 1.9 中會存在兩個 Planner:Flink Planner 和 Blink Planner。

在之前的版本中,Flink Table 在整個 Flink 中是一個二等公民。而 Flink SQL 具備的易用性,使用門檻低等特點深受用戶好評,也越來越被重視,Flink Table 模塊也因此被提升為一等公民。而 Blink 在設計之初就考慮到流和批的統一,批只是流的一種特殊形式,所以在將 Blink 合併到 Flink 主分支的過程中,社區也同時考慮了 Blink 的特殊設計。

新 TableEnvironment 整體設計

圖1.png

圖1 新 Table Environment 整體設計

從圖 1 中,可以看出,TableEnvironment 組成部分如下:

  • flink-table-common:這個包中主要是包含 Flink Planner 和 Blink Planner 一些共用的代碼。
  • flink-table-api-java:這部分是用戶編程使用的 API,包含了大部分的 API。
  • flink-table-api-scala:這裡只是非常薄的一層,僅和 Table API 的 Expression 和 DSL 相關。
  • 兩個 Planner:flink-table-planner 和 flink-table-planner-blink。
  • 兩個 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,從圖中可以看出,Flink Planner 和 Blink Planner 都會依賴於具體的 JAVA API,也會依賴於具體的 Bridge,通過 Bridge 可以將 API 操作相應的轉化為 Scala 的 DataStream、DataSet,或者轉化為 JAVA 的 DataStream 或者 Data Set。

新舊 TableEnvironment 對比

在 Flink 1.9 之前,原來的 Flink Table 模塊,有 7 個 Environment,使用和維護上相對困難。7 個 Environment 包括:StreamTableEnvironment,BatchTableEnvironment 兩類,JAVA 和 Scala 分別 2 個,一共 4 個,加上 3 個父類,一共就是 7 個。

在新的框架之下,社區希望流和批統一,因此對原來的設計進行精簡。首先,提供統一的 TableEnvironment,放在 flink-table-api-java 這個包中。然後,在 Bridge 中,提供了兩個用於銜接 Scala DataStream 和 Java DataStream 的 StreamTableEnvironment。最後,因為 Flink Planner 中還殘存在著 toDataSet() 類似的操作,所以,暫時保留 BatchTableEnvironment。這樣,目前一共是 5 個 TableEnvironment。

因為未來 Flink Planner 將會被移除,BatchTableEnvironment 就會被廢棄,這樣,未來就剩下 3 個 Environment 了,整個 TableEnvironment 的設計將更加簡潔明瞭。

新 TableEnvironment 的應用

本節中,將介紹新的應用場景以及相關限制。

下圖詳細列出了新 TableEnvironment 的適用場景:

圖2.png
圖2 新 Table Environment 適應場景

第一行,簡單起見,在後續將新的 TableEnvironment 稱為 UnifyTableEnvironment。在 Blink 中,Batch 被認為是 Stream 的一個特例,因此 Blink 的 Batch 可以使用 UnifyTableEnvironment。

UnifyTableEnvironment 在 1.9 中有一些限制,比如它不能夠註冊 UDAF 和 UDTF,當前新的 Type System 的類型推導功能還沒有完成,Java、Scala 的類型推導還沒統一,所以這部分的功能暫時不支持。可以肯定的是,這部分功能會在 1.10 中實現。此外,UnifyTableEnvironment 無法轉化為 DataStream 和 DataSet。

第二行,Stream TableEnvironment 支持轉化成 DataStream,也可以註冊 UDAF 和 UDTF。如果是 JAVA 寫的,就註冊到 JAVA 的 TableEnvironment,如果是用 Scala 寫的,就註冊到 Scala 的 TableEnvironment。

注意,Blink Batch 作業是不支持 Stream TableEnvironment 的,因為目前沒有 toAppendStream(),所以 toDataStream() 這樣的語義暫時不支持。從圖中也可以看出,目前操作只能使用 TableEnvironment。

最後一行,BatchTableEvironment 能夠使用 toDataSet() 轉化為 DataSet。

從上面的圖 2 中,可以很清晰的看出各個 TableEnvironment 能夠做什麼事情,以及他們有哪些限制。

接下來,將使用示例對各種情況進行說明。

示例1:Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(“job name”);

從圖 2 中可以看出,Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代碼中,首先需要創建一個 EnvironmentSetting,同時指定使用 Blink Planner,並且指定用 Batch 模式。之所以需要指定 Blink Planner,是因為目前 Flink 1.9 中,將 Flink Planner 和 Blink Planner 的 jar 同時放在了 Flink 的 lib 目錄下。如果不指定使用的 Planner,整個框架並不知道需要使用哪個 Planner,所以必須顯示的指定。當然,如果 lib 下面只有一個 Planner 的 jar,這時不需要顯示指定使用哪個 Planner。

另外,還需要注意的是在 UnifyEnvironment 中,用戶是無法獲取到 ExecutionEnvironment 的,即用戶無法在寫完作業流程後,使用 executionEnvironment.execute() 方法啟動任務。需要顯式的使用 tableEnvironment.execute() 方法啟動任務,這和之前的作業啟動很不相同。

示例 2:Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv = …
StreamTableEnvironment tEnv =  StreamTableEnvironment.create(execEnv, settings);
tEnv…

Blink Stream 既可以使用 UnifyTableEnvironment,也可以使用 StreamTableEnvironment,與 Batch 模式基本類似,只是需要將 inBatchMode 換成 inStreamingMode。

示例 3:Flink Batch

ExecutionEnvironment execEnv = ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
tEnv...

與之前沒有變化,不做過多介紹。

示例 4:Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(“job name”);

Flink Stream 也是同時支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 時,需要指定為 useOldPlanner,也即 Flink Planner。因為未來 Flink Planner 會被移除,因此,特意起了一個 OlderPlanner 的名字,而且只能夠使用 inStreamingMode,無法使用 inBatchMode。

新 Catalog 和 DDL

構建一個新的 Catalog API 主要是 FLIP-30 提出的,之前的 ExternalCatalog 將被廢棄,Blink Planner 中已經不支持 ExternalCatalog 了,Flink Planner 還支持 ExternalCatalog。

新 Catalog 設計

下圖是新 Catalog 的整體設計:

圖3.png
圖3 新 Catalog 設計

可以看到,新的 Catalog 有三層結構(..),最頂層是 Catalog 的名字,中間一層是 Database,最底層是各種 MetaObject,如 Table,Partition,Function 等。當前,內置了兩個 Catalog 實現:MemoryCatalog 和 HiveCatalog。當然,用戶也可以實現自己的 Catalog。

Catalog 能夠做什麼事情呢?首先,它可以支持 Create,Drop,List,Alter,Exists 等語句,另外它也支持對 Database,Table,Partition,Function,Statistics 等的操作。基本上,常用的 SQL 語法都已經支持。

CatalogManager 正如它名字一樣,主要是用來管理 Catalog,且可以同時管理多個 Catalog。也就是說,可以通過在一個相同 SQL 中,跨 Catalog 做查詢或者關聯操作。
例如,支持對 A Hive Catalog 和 B Hive Catalog 做相互關聯,這給 Flink 的查詢帶來了很大的靈活性。

CatalogManager 支持的操作包括:

  • 註冊 Catalog(registerCatalog)
  • 獲取所有的 Catalog(getCatalogs)
  • 獲取特定的 Catalog(getCatalog)
  • 獲取當前的 Catalog(getCurrentCatalog)
  • 設置當前的 Catalog(setCurrentCatalog)
  • 獲取當前的 Database(getCurrentDatabase)
  • 設置當前的 Database(setCurrentDatabase)

Catalog 雖然設計了三層結構,但在使用的時候,並不需要完全指定三層結構的值,可以只寫Table Name,這時候,系統會使用 getCurrentCatalog,getCurrentDatabase 獲取到默認值,自動補齊三層結構,這種設計簡化了對 Catalog 的使用。如果需要切換默認的 Catalog,只需要調用 setCurrentCatalog 就可以了。

在 TableEnvironment 層,提供了操作 Catalog 的方法,例如:

  • 註冊 Catalog(registerCatalog)
  • 列出所有的 Catalog(listCatalogs)
  • 獲取指定 Catalog(getCatalog)
  • 使用某個 Catalog(useCatalog)

在 SQL Client 層,也做了一定的支持,但是功能有一定的限制。用戶不能夠使用 Create 語句直接創建 Catalog,只能通過在 yarn 文件中,通過定義 Description 的方式去描述 Catalog,然後在啟動 SQL Client 的時候,通過傳入 -e +file_path 的方式,定義 Catalog。目前 SQL Client 支持列出已定義的 Catalog,使用一個已經存在的 Catalog 等操作。

DDL 設計與使用

有了 Catalog,就可以使用 DDL 來操作 Catalog 的內容,可以使用 TableEnvironment 的 sqlUpdate() 方法執行 DDL 語句,也可以在 SQL Client 執行 DDL 語句。

sqlUpdate() 方法中,支持 Create Table,Create View,Drop Table,Drop View 四個命令。當然,inset into 這樣的語句也是支持的。

下面分別對 4 個命令進行說明:

Create Table:可以顯示的指定 Catalog Name 或者 DB Name,如果缺省,那就按照用戶設定的 Current Catalog 去補齊,然後可以指定字段名稱,字段的說明,也可以支持 Partition By 語法。最後是一個 With 參數,用戶可以在此處指定使用的 Connector,例如,Kafka,CSV,HBase 等。With 參數需要配置一堆的屬性值,可以從各個 Connector 的 Factory 定義中找到。Factory 中會指出有哪些必選屬性,哪些可選屬性值。

需要注意的是,目前 DDL 中,還不支持計算列和 Watermark 的定義,後續的版本中將會繼續完善這部分。

Create Table [[catalog_name.]db_name.]table_name(
  a int comment 'column comment',
  b bigint,
  c varchar
)comment 'table comment'
[partitioned by(b)]
With(
    update-mode='append',
    connector.type='kafka',
    ...
)

Create View:需要指定 View 的名字,然後緊跟著的是 SQL。View 將會存儲在 Catalog 中。

CREATE VIEW view_name AS SELECT xxx 

Drop Table&Drop View:和標準 SQL 語法差不多,支持使用 IF EXISTS 語法,如果未加 IF EXISTS ,Drop 一個不存在的表,會拋出異常。

 DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name 

SQL Client中執行DDL:大部分都只支持查看操作,僅可以使用 Create View 和 Drop View。Catalog,Database,Table ,Function 這些只能做查看。用戶可以在 SQL Client 中 Use 一個已經存在的 Catalog,修改一些屬性,或者做 Description,Explain 這樣的一些操作。

CREATE VIEW
DROP VIEW
SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS l USE CATALOG xxx
SET xxx=yyy
DESCRIBE table_name
EXPLAIN SELECT xxx 

DDL 部分,在 Flink 1.9 中其實基本已經成型,只是還有一些特性,在未來需要逐漸的完善。

Blink Planner

本節將主要從 SQL/Table API 如何轉化為真正的 Job Graph 的流程開始,讓大家對 Blink Planner 有一個比較清晰的認識,希望對大家閱讀 Blink 代碼,或者使用 Blink 方面有所幫助。然後介紹 Blink Planner 的改進及優化。

圖4.png
圖4 主要流程

從上圖可以很清楚的看到,解析的過程涉及到了三層:Table API/SQL,Blink Planner,Runtime,下面將對主要的步驟進行講解。

Table API&SQL 解析驗證:在 Flink 1.9 中,Table API 進行了大量的重構,引入了一套新的 Operation,這套 Operation 主要是用來描述任務的 Logic Tree。

當 SQL 傳輸進來後,首先會去做 SQL 解析,SQL 解析完成之後,會得到 SqlNode Tree(抽象語法樹),然後會緊接著去做 Validator(驗證),驗證時會去訪問 FunctionManger 和 CatalogManger,FunctionManger 主要是查詢用戶定義的 UDF,以及檢查 UDF 是否合法,CatalogManger 主要是檢查這個 Table 或者 Database 是否存在,如果驗證都通過,就會生成一個 Operation DAG(有向無環圖)。

從這一步可以看出,Table API 和 SQL 在 Flink 中最終都會轉化為統一的結構,即 Operation DAG。

生成RelNode:Operation DAG 會被轉化為 RelNode(關係表達式) DAG。

優化:優化器會對 RelNode 做各種優化,優化器的輸入是各種優化的規則,以及各種統計信息。當前,在 Blink Planner 裡面,絕大部分的優化規則,Stream 和 Batch 是共享的。差異在於,對 Batch 而言,它沒有 state 的概念,而對於 Stream 而言,它是不支持 sort 的,所以目前 Blink Planner 中,還是運行了兩套獨立的規則集(Rule Set),然後定義了兩套獨立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。優化器優化的結果,就是具體的 Physical Rel DAG。

轉化:得到 Physical Rel Dag 後,繼續會轉化為 ExecNode,通過名字可以看出,ExecNode 已經屬於執行層的概念了,但是這個執行層是 Blink 的執行層,在 ExecNode 中,會進行大量的 CodeGen 的操作,還有非 Code 的 Operator 操作,最後,將 ExecNode 轉化為 Transformation DAG。

生成可執行 Job Graph:得到 Transformation DAG 後,最終會被轉化成 Job Graph,完成 SQL 或者 Table API 的解析。

Blink Planner 改進及優化

Blink Planner 功能方面改進主要包含如下幾個方面:

  • 更完整的 SQL 語法支持:例如,IN,EXISTS,NOT EXISTS,子查詢,完整的 Over 語句,Group Sets 等。而且已經跑通了所有的 TPCH,TPCDS 這兩個測試集,性能還非常不錯。
  • 提供了更豐富,高效的算子
  • 提供了非常完善的 cost 模型,同時能夠對接 Catalog 中的統計信息,使 cost 根據統計信息得到更優的執行計劃。
  • 支持 join reorder
  • shuffle service:對 Batch 而言,Blink Planner 還支持 shuffle service,這對 Batch 作業的穩定性有非常大的幫助,如果遇到 Batch 作業失敗,通過 shuffle service 能夠很快的進行恢復。

性能方面,主要包括以下部分:

  • 分段優化。
  • Sub-Plan Reuse。
  • 更豐富的優化 Rule:共一百多個 Rule ,並且絕大多數 Rule 是 Stream 和 Batch 共享的。
  • 更高效的數據結構 BinaryRow:能夠節省序列化和反序列化的操作。
  • mini-batch 支持(僅 Stream):節省 state 的訪問的操作。
  • 節省多餘的 Shuffle 和 Sort(Batch 模式):兩個算子之間,如果已經按 A 做 Shuffle,緊接著他下的下游也是需要按 A Shuffle 的數據,那中間的這一層 Shuffle,就可以省略,這樣就可以省很多網絡的開銷,Sort 的情況也是類似。Sort 和 Shuffle 如果在整個計算裡面是佔大頭,對整個性能是有很大的提升的。

深入性能優化及實踐

本節中,將使用具體的示例進行講解,讓你深入理解 Blink Planner 性能優化的設計。

分段優化

示例 5

create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;  

上面的這幾個 SQL,轉化為 RelNode DAG,大致圖形如下:

圖5.png
圖5 示例5 RelNode DAG

如果是使用 Flink Planner,經過優化層後,會生成如下執行層的 DAG:

圖6.png
圖6 示例 5 Flink Planner DAG

可以看到,Flink Planner 只是簡單的從 Sink 出發,反向的遍歷到 Source,從而形成兩個獨立的執行鏈路,從上圖也可以清楚的看到,Scan 和第一層 Aggregate 是有重複計算的。

在 Blink Planner 中,經過優化層之後,會生成如下執行層的 DAG:

圖7.png
圖7 示例 5 Blink Planner DAG

Blink Planner 不是在每次調用 insert into 的時候就開始優化,而是先將所有的 insert into 操作緩存起來,等到執行前才進行優化,這樣就可以看到完整的執行圖,可以知道哪些部分是重複計算的。Blink Planner 通過尋找可以優化的最大公共子圖,找到這些重複計算的部分。經過優化後,Blink Planner 會將最大公共子圖的部分當做一個臨時表,供其他部分直接使用。

這樣,上面的圖可以分為三部分,最大公共子圖部分(臨時表),臨時表與 Filter 和 SinkTable1 優化,臨時表與第二個 Aggregate 和 SinkTable 2 優化。

Blink Planner 其實是通過聲明的 View 找到最大公共子圖的,因此在開發過程中,如果需要複用某段邏輯,就將其定義為 View,這樣就可以充分利用 Blink Planner 的分段優化功能,減少重複計算。

當然,當前的優化也不是最完美的,因為提前對圖進行了切割,可能會導致一些優化丟失,今後會持續地對這部分算法進行改進。

總結一下,Blink Planner 的分段優化,其實解的是多 Sink 優化問題(DAG 優化),單 Sink 不是分段優化關心的問題,單 Sink 可以在所有節點上優化,不需要分段。

Sub-Plan Reuse

示例 6

insert into SinkTabl
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq; 

這個示例的 SQL 和分段優化的 SQL 其實是類似的,不同的是,沒有將結果 Sink 到兩個 Table 裡面,而是將結果 Union 起來,Sink 到一個結果表裡面。

下面看一下轉化為 RelNode 的 DAG 圖:

圖8.png
圖 8 示例 6 RelNode DAG

從上圖可以看出,Scan 和第一層的 Aggregate 也是有重複計算的,Blink Planner 其實也會將其找出來,變成下面的圖:

圖9.png
圖9 示例 6 Blink Planner DAG

Sub-Plan 優化的啟用,有兩個相關的配置:

  • table.optimizer.reuse-sub-plan-enabled (默認開啟)
  • table.optimizer.reuse-source-enabled(默認開啟)

這兩個配置,默認都是開啟的,用戶可以根據自己的需求進行關閉。這裡主要說明一下 table.optimizer.reuse-source-enabled 這個參數。在 Batch 模式下,join 操作可能會導致死鎖,具體場景是在執行 hash-join 或者 nested-loop-join 時一定是先讀 build 端,然後再讀 probe 端,如果啟用 reuse-source-enabled,當數據源是同一個 Source 的時候,Source 的數據會同時發送給 build 和 probe 端。這時候,build 端的數據將不會被消費,導致 join 操作無法完成,整個 join 就被卡住了。

為了解決死鎖問題,Blink Planner 會先將 probe 端的數據落盤,這樣 build 端讀數據的操作才會正常,等 build 端的數據全部讀完之後,再從磁盤中拉取 probe 端的數據,從而解決死鎖問題。但是,落盤會有額外的開銷,會多一次寫的操作;有時候,讀兩次 Source 的開銷,可能比一次寫的操作更快,這時候,可以關閉 reuse-source,性能會更好。當然,如果讀兩次 Source 的開銷,遠大於一次落盤的開銷,可以保持 reuse-source 開啟。需要說明的是,Stream 模式是不存在死鎖問題的,因為 Stream 模式 join 不會有選邊的問題。

總結而言,sub-plan reuse 解的問題是優化結果的子圖複用問題,它和分段優化類似,但他們是一個互補的過程。

注:Hash Join:對於兩張待 join 的表 t1, t2。選取其中的一張表按照 join 條件給的列建立hash 表。然後掃描另外一張表,一行一行去建好的 hash 表判斷是否有對應相等的行來完成 join 操作,這個操作稱之為 probe (探測)。前一張表叫做 build 表,後一張表的叫做 probe 表。

Agg 分類優化

Blink 中的 Aggregate 操作是非常豐富的:

  • group agg,例如:select count(a) from t group by b
  • over agg,例如:select count(a) over (partition by b order by c) from t
  • window agg,例如:select count(a) from t group by tumble(ts, interval '10' second), b
  • table agg ,例如:tEnv.scan('t').groupBy('a').flatAggregate(flatAggFunc('b' as ('c', 'd'))) 

下面主要對 Group Agg 優化進行講解,主要是兩類優化。

1. Local/Global Agg 優化

Local/Global Agg 主要是為了減少網絡 Shuffle。要運用 Local/Global 的優化,必要條件如下:

  • Aggregate 的所有 Agg Function 都是 mergeable 的,每個 Aggregate 需要實現 merge 方法,例如 SUM,COUNT,AVG,這些都是可以分多階段完成,最終將結果合併;但是求中位數,計算 95% 這種類似的問題,無法拆分為多階段,因此,無法運用 Local/Global 的優化。
  • table.optimizer.agg-phase-strategy 設置為 AUTO 或者 TWO_PHASE。
  • Stream 模式下,mini-batch 開啟 ;Batch 模式下 AUTO 會根據 cost 模型加上統計數據,選擇是否進行 Local/Global 優化。

示例 7

select count(*) from t group by color

沒有優化的情況下,下面的這個 Aggregate 會產生 10 次的 Shuffle 操作。

圖10.png
圖 10 示例 7 未做優化的 Count 操作

使用 Local/Global 優化後,會轉化為下面的操作,會在本地先進行聚合,然後再進行 Shuffle 操作,整個 Shuffle 的數據剩下 6 條。在 Stream 模式下,Blink 其實會以 mini-batch 的維度對結果進行預聚合,然後將結果發送給 Global Agg 進行彙總。

圖11.png
圖 11 示例 7 經過 Local/Global 優化的 Count 操作

2. Distinct Agg 優化

Distinct Agg 進行優化,主要是對 SQL 語句進行改寫,達到優化的目的。但 Batch 模式和 Stream 模式解決的問題是不同的:

  • Batch 模式下的 Distinct Agg,需要先做 Distinct,再做 Agg,邏輯上需要兩步才能實現,直接實現 Distinct Agg 開銷太大。
  • Stream 模式下,主要是解決熱點問題,因為 Stream 需要將所有的輸入數據放在 State 裡面,如果數據有熱點,State 操作會很頻繁,這將影響性能。

Batch 模式

第一層,求 distinct 的值和非 distinct agg function 的值,第二層求 distinct agg function 的值

示例 8

select color, count(distinct id), count(*) from t group by color 

手工改寫成:

select color, count(id), min(cnt) from (
   select color, id, count(*) filter (where $e=2) as cnt from (       
      select color, id, 1 as $e from t --for distinct id 
      union all
      select color, null as id, 2 as $e from t -- for count(*) 
  ) group by color, id, $e 
) group by color 

轉化的邏輯過程,如下圖所示:

圖12.png
圖 12 示例 8 Batch 模式 Distinct 改寫邏輯

Stream 模式

Stream 模式的啟用有一些必要條件:

  • 必須是支持的 agg function:avg/count/min/max/sum/first_value/concat_agg/single_value;
  • table.optimizer.distinct-agg.split.enabled(默認關閉)

示例 9

select color, count(distinct id), count(*) from t group by color 

手工改寫成:

select color, sum(dcnt), sum(cnt) from (
  select color, count(distinct id) as dcnt, count(*) as cnt from t 
  group by color, mod(hash_code(id), 1024)
) group by color

改寫前,邏輯圖大概如下:

圖13.png
圖 13 示例 9 Stream 模式未優化 Distinct

改寫後,邏輯圖就會變為下面這樣,熱點數據被打散到多箇中間節點上。

圖14.jpg
圖14 示例 9 Stream 模式優化 Distinct

需要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的這個 1024 為打散的維度,這個值建議設置大一些,設置太小產生的效果可能不好。

總結

本文首先對新的 TableEnvironment 的整體設計進行了介紹,並且列舉了各種模式下TableEnvironment 的選擇,然後通過具體的示例,展示了各種模式下代碼的寫法,以及需要注意的事項。

在新的 Catalog 和 DDL 部分,對 Catalog 的整體設計、DDL 的使用部分也都以實例進行拆分講解。最後,對 Blink Planner 解析 SQL/Table API 的流程、Blink Planner 的改進以及優化的原理進行了講解,希望對大家探索和使用 Flink SQL 有所幫助。

Leave a Reply

Your email address will not be published. Required fields are marked *