大數據

Flink 新場景:OLAP 引擎性能優化及應用案例

摘要:本文由阿里巴巴技術專家賀小令(曉令)分享,主要介紹 Apache Flink 新場景 OLAP 引擎,內容分為以下四部分:

  1. 背景介紹
  2. Flink OLAP 引擎
  3. 案例介紹
  4. 未來計劃

一、背景介紹

1.OLAP 及其分類

640.png

OLAP 是一種讓用戶可以用從不同視角方便快捷的分析數據的計算方法。主流的 OLAP 可以分為3類:多維 OLAP ( Multi-dimensional OLAP )、關係型 OLAP ( Relational OLAP ) 和混合 OLAP ( Hybrid OLAP ) 三大類。

(1)多維 OLAP ( MOLAP )

傳統的 OLAP 分析方式
數據存儲在多維數據集中

(2)關係型 OLAP ( ROLAP )

以關係數據庫為核心,以關係型結構進行多維數據的表示
通過 SQL 的 where 條件以呈現傳統 OLAP 的切片、切塊功能

(3)混合 OLAP ( HOLAP )

將 MOLAP 和 ROLPA 的優勢結合起來,以獲得更快的性能

以下將詳細介紹每種分類的具體特徵。

■ 多維 OLAP ( MOLAP )

MOLAP 的典型代表是 Kylin 和 Druid。

640 1.png

  • MOLAP 處理流程

首先,對原始數據做數據預處理;然後,將預處理後的數據存至數據倉庫,用戶的請求通過 OLAP server 即可查詢數據倉庫中的數據。

640 2.png

  • MOLAP 的優點和缺點

MOLAP 的優點和缺點都來自於其數據預處理 ( pre-processing ) 環節。數據預處理,將原始數據按照指定的計算規則預先做聚合計算,這樣避免了查詢過程中出現大量的臨時計算,提升了查詢性能,同時也為很多複雜的計算提供了支持。

但是這樣的預聚合處理,需要預先定義維度,會限制後期數據查詢的靈活性;如果查詢工作涉及新的指標,需要重新增加預處理流程,損失了靈活度,存儲成本也很高;同時,這種方式不支持明細數據的查詢。

因此,MOLAP 適用於對性能要求非常高的場景。

■ 關係型 OLAP ( ROLAP )

ROLAP 的典型代表是 Presto 和 Impala。

640 3.png

  • 處理流程

ROLAP 的處理流程上,用戶的請求直接發送給 OLAP server,然後 OLAP server 將用戶的請求轉換成關係型操作算子,再通過 SCAN 掃描原始數據,在原始數據基礎上做過濾、聚合、關聯等處理,最後將計算結果返回給用戶。

640 4.png

  • ROLAP 的優點和缺點

ROLAP 不需要進行數據預處理 ( pre-processing ),因此查詢靈活,可擴展性好。這類引擎使用 MPP 架構 ( 與Hadoop相似的大型並行處理架構,可以通過擴大併發來增加計算資源 ),可以高效處理大量數據。

但是當數據量較大或 query 較為複雜時,查詢性能也無法像 MOLAP 那樣穩定。所有計算都是臨時發生 ( 沒有預處理 ),因此會耗費更多的計算資源。

因此,ROLAP 適用於對查詢靈活性高的場景。

■ 混合 OLAP ( HOLAP )

混合 OLAP,是 MOLAP 和 ROLAP 的一種融合。當查詢聚合性數據的時候,使用MOLAP 技術;當查詢明細數據時,使用 ROLAP 技術。在給定使用場景的前提下,以達到查詢性能的最優化。

2.Apache Flink 介紹

■ Flink 支持的應用場景

640 5.png

Apache Flink 支持的3種典型應用場景:

(1)事件驅動的應用

  • 反欺詐
  • 基於規則的監控報警

(2)流式 Pipeline

  • 數據 ETL
  • 實時搜索引擎的索引

(3)批處理 & 流處理分析

  • 網絡質量監控
  • 消費者實時數據分析

■ Flink 架構及優勢

640 6.png

Flink 的整體架構如上圖所示,在此架構下,Flink 的優勢也十分突出,主要分為6個方面:

(1)統一框架 ( 不區分流處理和批處理 )

  • 用戶 API 統一
  • 執行引擎統一

(2)多層次 API

  • 標準 SQL APL
  • Table API
  • DataStream API ( 靈活,無 schema 限制 )

(3)高性能

  • 支持內存計算
  • 支持代價模型優化
  • 支持代碼動態生成

(4)方便集成

  • 支持豐富的 Connectors
  • 方便對接現有 Catalog

(5)靈活的 Failover 策略

  • 在 Pipeline 下支持快速 failover
  • 類似 MapReduce、Spark 一樣支持 shuffle 數據落盤

(6)易部署維護

  • 靈活部署方案
  • 支持高可用

二、Apache Flink OLAP 引擎

1.為什麼 Flink 可以做 ROLAP 引擎?

640 7.png

  • Flink 的核心和基礎是流計算,支持高性能、低延遲的大規模計算。
  • Blink 將批看作有限流,批處理是針對有限數據集的優化,因此批處理引擎也是構建在流引擎上 ( 已開源 )。
  • OLAP 是響應時間要求更短的批處理,因此 OLAP 可以看作是一種特殊的批。OLAP 引擎也可以構建在現有的批引擎上。

注:Flink OLAP 引擎目前不帶存儲,只是一個計算框架。

2.Flink 做 OLAP 引擎的優勢

640 8.png

(1)統一引擎:流處理、批處理、OLAP 統一使用 Flink 引擎。

降低學習成本,僅需要學習一個引擎
提高開發效率,很多 SQL 是流批通用
提高維護效率,可以更集中維護好一個引擎

(2)既有優勢:利用 Flink 已有的很多特性,使 OLAP 使用場景更為廣泛。

使用流處理的內存計算、Pipeline
支持代碼動態生成
也可以支持批處理數據落盤能力

(3)相互增強:OLAP 能享有現有引擎的優勢,同時也能增強引擎能力

  • 無統計信息場景的優化
  • 開發更高效的算子
  • 使 Flink 同時兼備流、批、OLAP 處理的能力,成為更通用的框架

3.性能優化

OLAP 對查詢時間非常敏感,當前很多組件的性能不滿足要求,因此我們對 Flink 做了很多相關優化。

■ 服務架構的優化

  • 客戶端服務化

下圖介紹了一條 SQL 怎麼在客戶端一步一步變為 JobGraph,最終提交給 JM:

640 9.png

在改動之前,每次接受一個 query 時會啟動一個新的 JVM 進程來進行作業的編譯。其中 JVM 的啟動、Class 的加載、代碼的動態編譯 ( 如 Optimizer 模塊由於需要通過 Janino 動態編譯進行 cost 計算 ) 等操作都非常耗時 ( 需要約3~5s )。因此,我們將客戶端進行服務化,將整個 Client 做成 Service,當接收到用戶的 query 時,無需重複各項加載工作,可將延時降低至 100ms 左右。

  • 自定義 CollectionTableSink
    640 10.png

這部分優化,源於 OLAP 的一個特性:OLAP 會將最終計算結果發給客戶端,通過JobManager 轉發給 Client。假如某個 query 的結果數據量很大,會讓 JobManager OOM ( OutOfMemory );如果同時執行多個 query,也會相互影響。

因此,我們從新實現了一個 CollectionTableSink,限制數據的條數和數據大小,避免出現 OOM,保證多個 Query 同時運行時的穩定性。

  • 調度優化

640 11.png

在 Batch 模式下的調度存在以下問題:

  • 使用 Lazy_from_sources 模式調度,會導致整體運行時間較長,也可能造成死鎖。
  • RM ( Resource Manager ) 按 OnDemand 方式分配 Slot 需求,也會造成死鎖。
  • RM 以單線程同步模式向 TM ( Transaction Manager ) 分配 Slot 請求,會造成等待時間更長。

注:調度死鎖是指在資源有限的情況下,多個 Job 同時運行時,如果多個 Job都只申請到了部分資源並沒有剩餘資源可以申請,導致 Job 沒法繼續執行,新的 Job 也沒法提交。

針對上述問題,我們提出了以下幾點改動:

  • 採用 Eager 調度模式 ( 確保所有的資源都申請到後才開始運行 )。
  • 使用 FIFO ( 先進先出隊 ) 模式申請資源 ( 確保當前 Job 的資源分配結束後才開始下一個 Job 的資源分配 )。
  • 將單線程同步模式改為多線程異步模式,減少任務啟動時間和執行時間。

■ 針對 source 的優化

在 ROLAP 的執行場景中,所有數據都是通過掃描原始數據表後進行處理;因此,基於 Source 的讀取性能非常關鍵,直接影響 Job 的執行效率。

  • Project&Filter 下堆

640 12.png

像 Parquet 這類的列存文件格式,支持按需讀取相所需列,同時支持 RowGroup 級別的過濾。利用該特性,可以將 Project 和 Filter 下推到 TableSource,從而只需要掃描 Query 中涉及的字段和滿足條件的 RowGroup,大大提升讀取效率。

  • Aggregate 下堆

640 13.png

這個優化也是充分利用了 TableSource 的特性:例如 Parquet 文件的 metadata 中已經存儲了每個 RowGroup 的統計信息 ( 如 max、min等 ),因此在做 max、min 這類聚合統計時,可直接讀取 metadata 信息,而不需要先讀取所有原始數據再計算。

■ 在沒有統計信息場景下做的優化

  • 消除 CrossJoin

640 14.png

CrossJoin 是沒有任何 Join 條件,將 Join 的兩張表的數據做笛卡爾積,導致 Join 的結果膨脹非常厲害,這類 Join 應該儘量避免。我們對含有 CrossJoin 的 Plan 進行改寫:將有 join 條件的表格先做 join ( 通常會因為一些數據 Join 不上而減少數據 ),從而提高執行效率。這是一個確定性的改寫,即使在沒有統計信息的情況下,也可以使用該優化。

  • 自適應的 Local Aggregate

640 15.png

通常情況下,兩階段的 Aggregate 是非常高效的,因為 LocalAggregate 能聚合大量數據,導致 Shuffle 的數據量會變少。但是當 LocalAggregate 的聚合度很低的時候, Local 聚合操作的意義不大,反而會浪費 CPU。

在沒有任何統計信息的情況下,優化器沒法決定是否要產生 LocalAggregate 算子;因此,我們採用運行時採樣的方式來判斷聚合度,如果聚合度低於設定的閾值,我們將關閉聚合操作,改為僅做數據轉發;經我們測試,部分場景有 30% 的性能提升。

4.測試結果

640 16.png

上圖是 Flink 和 Presto基於 1T 數據做的 SSB ( Star Schema Benchmark ) 測試,從圖中可以看出 Flink 和 Presto 整體上不相上下,甚至有些 Query Flink 性能優於Presto。

注:Flink OLAP 從開始到嘉賓分享時,只有3個月時間。

案例介紹

1.Flink OLAP 在數據探查上的應用

640 17.png

上圖描述了一個數據湖應用的完整架構,Flink OLAP 主要用於"數據探查"。

數據探查是對數據結構做智能判斷,給出數據的探查結果,快速瞭解數據的信息和質量情況。即用戶可以在管控平臺上了解數據湖中任意一份數據的數據特性。用戶通過 Web 交互操作選擇相應的表和指標後立即展示相關結果指標,因此要求低延遲、實時反饋。而且數據湖中很多數據沒有任何統計信息;前述的各種查詢、聚合層面的優化,主要為這類場景服務。

2.整體架構

640 18.png

上圖是這類應用的整體架構。整套服務託管到 Kubernetes 上,最終訪問的數據是OSS。

未來計劃

當前,Flink OLAP 引擎性能優化及應用主要是基於內部 Flink,後續工作主要分為以下三塊:

  1. 推回社區:目前所有工作都是基於內部 Flink,希望推回社區;
  2. 資源隔離:後期很多功能的開發和優化會圍繞多 Query 運行時的"資源隔離";
  3. 優化&性能:圍繞 OLAP 的特性,在此場景下會進一步做優化和性能提升等方面的工作。

Leave a Reply

Your email address will not be published. Required fields are marked *