摘要:本文由阿里巴巴技術專家賀小令(曉令)分享,主要介紹 Apache Flink 新場景 OLAP 引擎,內容分為以下四部分:
- 背景介紹
- Flink OLAP 引擎
- 案例介紹
- 未來計劃
一、背景介紹
1.OLAP 及其分類
OLAP 是一種讓用戶可以用從不同視角方便快捷的分析數據的計算方法。主流的 OLAP 可以分為3類:多維 OLAP ( Multi-dimensional OLAP )、關係型 OLAP ( Relational OLAP ) 和混合 OLAP ( Hybrid OLAP ) 三大類。
(1)多維 OLAP ( MOLAP )
傳統的 OLAP 分析方式
數據存儲在多維數據集中
(2)關係型 OLAP ( ROLAP )
以關係數據庫為核心,以關係型結構進行多維數據的表示
通過 SQL 的 where 條件以呈現傳統 OLAP 的切片、切塊功能
(3)混合 OLAP ( HOLAP )
將 MOLAP 和 ROLPA 的優勢結合起來,以獲得更快的性能
以下將詳細介紹每種分類的具體特徵。
■ 多維 OLAP ( MOLAP )
MOLAP 的典型代表是 Kylin 和 Druid。
- MOLAP 處理流程
首先,對原始數據做數據預處理;然後,將預處理後的數據存至數據倉庫,用戶的請求通過 OLAP server 即可查詢數據倉庫中的數據。
- MOLAP 的優點和缺點
MOLAP 的優點和缺點都來自於其數據預處理 ( pre-processing ) 環節。數據預處理,將原始數據按照指定的計算規則預先做聚合計算,這樣避免了查詢過程中出現大量的臨時計算,提升了查詢性能,同時也為很多複雜的計算提供了支持。
但是這樣的預聚合處理,需要預先定義維度,會限制後期數據查詢的靈活性;如果查詢工作涉及新的指標,需要重新增加預處理流程,損失了靈活度,存儲成本也很高;同時,這種方式不支持明細數據的查詢。
因此,MOLAP 適用於對性能要求非常高的場景。
■ 關係型 OLAP ( ROLAP )
ROLAP 的典型代表是 Presto 和 Impala。
- 處理流程
ROLAP 的處理流程上,用戶的請求直接發送給 OLAP server,然後 OLAP server 將用戶的請求轉換成關係型操作算子,再通過 SCAN 掃描原始數據,在原始數據基礎上做過濾、聚合、關聯等處理,最後將計算結果返回給用戶。
- ROLAP 的優點和缺點
ROLAP 不需要進行數據預處理 ( pre-processing ),因此查詢靈活,可擴展性好。這類引擎使用 MPP 架構 ( 與Hadoop相似的大型並行處理架構,可以通過擴大併發來增加計算資源 ),可以高效處理大量數據。
但是當數據量較大或 query 較為複雜時,查詢性能也無法像 MOLAP 那樣穩定。所有計算都是臨時發生 ( 沒有預處理 ),因此會耗費更多的計算資源。
因此,ROLAP 適用於對查詢靈活性高的場景。
■ 混合 OLAP ( HOLAP )
混合 OLAP,是 MOLAP 和 ROLAP 的一種融合。當查詢聚合性數據的時候,使用MOLAP 技術;當查詢明細數據時,使用 ROLAP 技術。在給定使用場景的前提下,以達到查詢性能的最優化。
2.Apache Flink 介紹
■ Flink 支持的應用場景
Apache Flink 支持的3種典型應用場景:
(1)事件驅動的應用
- 反欺詐
- 基於規則的監控報警
(2)流式 Pipeline
- 數據 ETL
- 實時搜索引擎的索引
(3)批處理 & 流處理分析
- 網絡質量監控
- 消費者實時數據分析
■ Flink 架構及優勢
Flink 的整體架構如上圖所示,在此架構下,Flink 的優勢也十分突出,主要分為6個方面:
(1)統一框架 ( 不區分流處理和批處理 )
- 用戶 API 統一
- 執行引擎統一
(2)多層次 API
- 標準 SQL APL
- Table API
- DataStream API ( 靈活,無 schema 限制 )
(3)高性能
- 支持內存計算
- 支持代價模型優化
- 支持代碼動態生成
(4)方便集成
- 支持豐富的 Connectors
- 方便對接現有 Catalog
(5)靈活的 Failover 策略
- 在 Pipeline 下支持快速 failover
- 類似 MapReduce、Spark 一樣支持 shuffle 數據落盤
(6)易部署維護
- 靈活部署方案
- 支持高可用
二、Apache Flink OLAP 引擎
1.為什麼 Flink 可以做 ROLAP 引擎?
- Flink 的核心和基礎是流計算,支持高性能、低延遲的大規模計算。
- Blink 將批看作有限流,批處理是針對有限數據集的優化,因此批處理引擎也是構建在流引擎上 ( 已開源 )。
- OLAP 是響應時間要求更短的批處理,因此 OLAP 可以看作是一種特殊的批。OLAP 引擎也可以構建在現有的批引擎上。
注:Flink OLAP 引擎目前不帶存儲,只是一個計算框架。
2.Flink 做 OLAP 引擎的優勢
(1)統一引擎:流處理、批處理、OLAP 統一使用 Flink 引擎。
降低學習成本,僅需要學習一個引擎
提高開發效率,很多 SQL 是流批通用
提高維護效率,可以更集中維護好一個引擎
(2)既有優勢:利用 Flink 已有的很多特性,使 OLAP 使用場景更為廣泛。
使用流處理的內存計算、Pipeline
支持代碼動態生成
也可以支持批處理數據落盤能力
(3)相互增強:OLAP 能享有現有引擎的優勢,同時也能增強引擎能力
- 無統計信息場景的優化
- 開發更高效的算子
- 使 Flink 同時兼備流、批、OLAP 處理的能力,成為更通用的框架
3.性能優化
OLAP 對查詢時間非常敏感,當前很多組件的性能不滿足要求,因此我們對 Flink 做了很多相關優化。
■ 服務架構的優化
- 客戶端服務化
下圖介紹了一條 SQL 怎麼在客戶端一步一步變為 JobGraph,最終提交給 JM:
在改動之前,每次接受一個 query 時會啟動一個新的 JVM 進程來進行作業的編譯。其中 JVM 的啟動、Class 的加載、代碼的動態編譯 ( 如 Optimizer 模塊由於需要通過 Janino 動態編譯進行 cost 計算 ) 等操作都非常耗時 ( 需要約3~5s )。因此,我們將客戶端進行服務化,將整個 Client 做成 Service,當接收到用戶的 query 時,無需重複各項加載工作,可將延時降低至 100ms 左右。
- 自定義 CollectionTableSink
這部分優化,源於 OLAP 的一個特性:OLAP 會將最終計算結果發給客戶端,通過JobManager 轉發給 Client。假如某個 query 的結果數據量很大,會讓 JobManager OOM ( OutOfMemory );如果同時執行多個 query,也會相互影響。
因此,我們從新實現了一個 CollectionTableSink,限制數據的條數和數據大小,避免出現 OOM,保證多個 Query 同時運行時的穩定性。
- 調度優化
在 Batch 模式下的調度存在以下問題:
- 使用 Lazy_from_sources 模式調度,會導致整體運行時間較長,也可能造成死鎖。
- RM ( Resource Manager ) 按 OnDemand 方式分配 Slot 需求,也會造成死鎖。
- RM 以單線程同步模式向 TM ( Transaction Manager ) 分配 Slot 請求,會造成等待時間更長。
注:調度死鎖是指在資源有限的情況下,多個 Job 同時運行時,如果多個 Job都只申請到了部分資源並沒有剩餘資源可以申請,導致 Job 沒法繼續執行,新的 Job 也沒法提交。
針對上述問題,我們提出了以下幾點改動:
- 採用 Eager 調度模式 ( 確保所有的資源都申請到後才開始運行 )。
- 使用 FIFO ( 先進先出隊 ) 模式申請資源 ( 確保當前 Job 的資源分配結束後才開始下一個 Job 的資源分配 )。
- 將單線程同步模式改為多線程異步模式,減少任務啟動時間和執行時間。
■ 針對 source 的優化
在 ROLAP 的執行場景中,所有數據都是通過掃描原始數據表後進行處理;因此,基於 Source 的讀取性能非常關鍵,直接影響 Job 的執行效率。
- Project&Filter 下堆
像 Parquet 這類的列存文件格式,支持按需讀取相所需列,同時支持 RowGroup 級別的過濾。利用該特性,可以將 Project 和 Filter 下推到 TableSource,從而只需要掃描 Query 中涉及的字段和滿足條件的 RowGroup,大大提升讀取效率。
- Aggregate 下堆
這個優化也是充分利用了 TableSource 的特性:例如 Parquet 文件的 metadata 中已經存儲了每個 RowGroup 的統計信息 ( 如 max、min等 ),因此在做 max、min 這類聚合統計時,可直接讀取 metadata 信息,而不需要先讀取所有原始數據再計算。
■ 在沒有統計信息場景下做的優化
- 消除 CrossJoin
CrossJoin 是沒有任何 Join 條件,將 Join 的兩張表的數據做笛卡爾積,導致 Join 的結果膨脹非常厲害,這類 Join 應該儘量避免。我們對含有 CrossJoin 的 Plan 進行改寫:將有 join 條件的表格先做 join ( 通常會因為一些數據 Join 不上而減少數據 ),從而提高執行效率。這是一個確定性的改寫,即使在沒有統計信息的情況下,也可以使用該優化。
- 自適應的 Local Aggregate
通常情況下,兩階段的 Aggregate 是非常高效的,因為 LocalAggregate 能聚合大量數據,導致 Shuffle 的數據量會變少。但是當 LocalAggregate 的聚合度很低的時候, Local 聚合操作的意義不大,反而會浪費 CPU。
在沒有任何統計信息的情況下,優化器沒法決定是否要產生 LocalAggregate 算子;因此,我們採用運行時採樣的方式來判斷聚合度,如果聚合度低於設定的閾值,我們將關閉聚合操作,改為僅做數據轉發;經我們測試,部分場景有 30% 的性能提升。
4.測試結果
上圖是 Flink 和 Presto基於 1T 數據做的 SSB ( Star Schema Benchmark ) 測試,從圖中可以看出 Flink 和 Presto 整體上不相上下,甚至有些 Query Flink 性能優於Presto。
注:Flink OLAP 從開始到嘉賓分享時,只有3個月時間。
案例介紹
1.Flink OLAP 在數據探查上的應用
上圖描述了一個數據湖應用的完整架構,Flink OLAP 主要用於"數據探查"。
數據探查是對數據結構做智能判斷,給出數據的探查結果,快速瞭解數據的信息和質量情況。即用戶可以在管控平臺上了解數據湖中任意一份數據的數據特性。用戶通過 Web 交互操作選擇相應的表和指標後立即展示相關結果指標,因此要求低延遲、實時反饋。而且數據湖中很多數據沒有任何統計信息;前述的各種查詢、聚合層面的優化,主要為這類場景服務。
2.整體架構
上圖是這類應用的整體架構。整套服務託管到 Kubernetes 上,最終訪問的數據是OSS。
未來計劃
當前,Flink OLAP 引擎性能優化及應用主要是基於內部 Flink,後續工作主要分為以下三塊:
- 推回社區:目前所有工作都是基於內部 Flink,希望推回社區;
- 資源隔離:後期很多功能的開發和優化會圍繞多 Query 運行時的"資源隔離";
- 優化&性能:圍繞 OLAP 的特性,在此場景下會進一步做優化和性能提升等方面的工作。