大數據

Flink 1.10 細粒度資源管理解析

相信不少讀者在開發 Flink 應用時或多或少會遇到在內存調優方面的問題,比如在我們生產環境中遇到最多的 TaskManager 在容器化環境下佔用超出容器限制的內存而被 YARN/Mesos kill 掉[1],再比如使用 heap-based StateBackend 情況下 State 過大導致 GC 頻繁影響吞吐。這些問題對於不熟悉 Flink 內存管理的用戶來說十分難以排查,而且 Flink 晦澀難懂的內存配置參數更是讓用戶望而卻步,結果是往往將內存調大至一個比較浪費的閾值以儘量避免內存問題。

對於作業規模不大的普通用戶而言,這些通常在可以接受的範圍之內,但對於上千並行度的大作業來說,浪費資源的總量會非常可觀,而且進程的不穩定性導致的作業恢復時間也會比普通作業長得多,因此阿里巴巴的 Blink 團隊針對內存管理機制做了大量的優化,並於近期開始合併到 Flink。本文的內容主要基於阿里團隊工程師宋辛童在 Flink Forward Beijing 的分享[2],以及後續相關的幾個 FLIP 提案。

Flink 目前(1.9)的內存管理

TaskManager 作為 Master/Slave 架構中的 Slave 提供了作業執行需要的環境和資源,最為重要而且複雜,因此 Flink 的內存管理也主要指 TaskManager 的內存管理。

TaskManager 的資源(主要是內存)分為三個層級,分別是最粗粒度的進程級(TaskManager 進程本身),線程級(TaskManager 的 slot)和 SubTask 級(多個 SubTask 共用一個 slot)。

1-640.png圖1.TaskManager 資源層級

在進程級,TaskManager 將內存劃分為以下幾塊:

  • Heap Memory: 由 JVM 直接管理的 heap 內存,留給用戶代碼以及沒有顯式內存管理的 Flink 系統活動使用(比如 StateBackend、ResourceManager 的元數據管理等)。
  • Network Memory: 用於網絡傳輸(比如 shuffle、broadcast)的內存 Buffer 池,屬於 Direct Memory 並由 Flink 管理。
  • Cutoff Memory: 在容器化環境下進程使用的物理內存有上限,需要預留一部分內存給 JVM 本身,比如線程棧內存、class 等元數據內存、GC 內存等。
  • Managed Memory: 由 Flink Memory Manager 直接管理的內存,是數據在 Operator 內部的物理表示。Managed Memory 可以被配置為 on-heap 或者 off-heap (direct memory)的,off-heap 的 Managed Memory 將有效減小 JVM heap 的大小並減輕 GC 負擔。目前 Managed Memory 只用於 Batch 類型的作業,需要緩存數據的操作比如 hash join、sort 等都依賴於它。

根據 Managed Memory 是 on-heap 或 off-heap 的不同,TaskManager 的進程內存與 JVM 內存分區關係分別如下:

640.png圖2.TaskManager 內存分區

在線程級別,TaskManager 會將其資源均分為若干個 slot (在 YARN/Mesos/K8s 環境通常是每個 TaskManager 只包含 1 個 slot),沒有 slot sharing 的情況下每個 slot 可以運行一個 SubTask 線程。除了 Managed Memory,屬於同一 TaskManager 的 slot 之間基本是沒有資源隔離的,包括 Heap Memory、Network Buffer、Cutoff Memory 都是共享的。所以目前 slot 主要的用處是限制一個 TaskManager 的 SubTask 數。

從作為資源提供者的 TaskManager 角度看, slot 是資源的最小單位,但從使用者 SubTask 的角度看,slot 的資源還可以被細分,因為 Flink 的 slot sharing 機制。默認情況下, Flink 允許多個 SubTask 共用一個 slot 的資源,前提是這些 SubTask 屬於同一個 Job 的不同 Task。以官網的例子來說,一個拓撲為 Source(6)-map(6)-keyby/window/apply(6)-sink(1) 的作業,可以運行在 2 個 slot 數為 3 的 TaskManager 上(見圖3)。

640-3.png
圖3.TaskManager Slot Sharing

這樣的好處是,原本一共需要 19 個 slot 的作業,現在只需要作業中與 Task 最大並行度相等的 slot, 即 6 個 slot 即可運行起來。此外因為不同 Task 通常有不同的資源需求,比如 source 主要使用網絡 IO,而 map 可能主要需要 cpu,將不同 Task 的 subtask 放到同一 slot 中有利於資源的充分利用。

可以看到,目前 Flink 的內存管理是比較粗粒度的,資源隔離並不是很完整,而且在不同部署模式下(Standalone/YARN/Mesos/K8s)或不同計算模式下(Streaming/Batch)的內存分配也不太一致,為深度平臺化及大規模應用增添了難度。

Flink 1.10 細粒度的資源管理

為了改進 Flink 內存管理機制,阿里巴巴的工程師結合 Blink 的優化經驗分別就進程、線程、SubTask(Operator)三個層面分別提出了 3 個 FLIP,均以 1.10 為目標 release 版本。下面將逐一介紹每個提案的內容。

FLIP-49: 統一 TaskExecutor 的內存配置

■ 背景

TaskExecutor 在不同部署模式下具體負責作業執行的進程,可以簡單視為 TaskManager。目前 TaskManager 的內存配置存在不一致以及不夠直觀的問題,具體有以下幾點:

  • 流批作業內容配置不一致。Managed Memory 只覆蓋 DataSet API,而 DataStream API 的則主要使用 JVM 的 heap 內存,相比前者需要更多的調優參數且內存消耗更難把控。
  • RocksDB 佔用的 native 內存並不在內存管理裡,導致使用 RocksDB 時內存需要很多手動調優。
  • 不同部署模式下,Flink 內存計算算法不同,並且令人難以理解。

針對這些問題,FLIP-49[4] 提議通過將 Managed Memory 的用途拓展至 DataStream 以解決這個問題。DataStream 中主要佔用內存的是 StateBackend,它可以從管理 Managed Memory 的 MemoryManager 預留部分內存或分配內存。通過這種方式同一個 Flink 配置可以運行 Batch 作業和 Streaming 作業,有利於流批統一。

■ 改進思路

對比1.jpg

可以看到目前 DataStream 作業的內存分配沒有經過 MemoryManager 而是直接向 JVM 申請,容易造成 heap OOM 或者物理內存佔用過大[3],因此直接的修復辦法是讓 MemoryManager 瞭解到 StateBackend 的內存佔用。這會有兩種方式,一是直接通過 MemoryManager 申請內存,二是仍使用隱式分配的辦法,但需要通知 MemoryManager 預留這部分內存。此外 MemoryManager 申請 off-heap 的方式也會有所變化,從 ByteBuffer#allocateDirect() 變為 Unsafe#allocateMemory(),這樣的好處是顯式管理的 off-heap 內存可以從 JVM 的 -XX:MaxDirectMemorySize 參數限制中分離出來。

另外 MemoryManager 將不只可以被配置為 heap/off-heap,而是分別擁有對應的內存池。這樣的好處是在同一個集群可以運行要求不同類型內存的作業,比如一個 FsStateBackend 的 DataStream 作業和一個 RocksDBStateBackend 的 DataStream 作業。heap/off-heap 的比例可以通過參數配置,1/0 則代表了完全的 on-heap 或者 off-heap。

改進之後 TaskManager 的各內存分區如下:

640-4.png

TaskManager 新內存結構

表格 1.jpg
表格2.jpg

值得注意的是有 3 個分區是沒有默認值的,包括 Framework Heap Memory、Total Flink Memory 和 Total Process Memory,它們是決定總內存的最關鍵參數,三者分別滿足不同部署模式的需要。比如在 Standalone 默認下,用戶可以配置 Framework Heap Memory 來限制用戶代碼使用的 heap 內存;而在 YARN 部署模式下,用戶可以通過配置 YARN container 的資源來間接設置 Total Process Memory。

FLIP-56: 動態 slot 分配

■ 背景

目前 Flink 的資源是預先靜態分配的,也就是說 TaskManager 進程啟動後 slot 的數目和每個 slot 的資源數都是固定的而且不能改變,這些 slot 的生命週期和 TaskManager 是相同的。Flink Job 後續只能向 TaskManager 申請和釋放這些 slot,而沒有對 slot 資源數的話語權。

640-5.png

圖5. 靜態 slot 分配

這種粗粒度的資源分配假定每個 SubTask 的資源需求都是大致相等的,優點是較為簡單易用,缺點在於如果出現 SubTask 的資源需求有傾斜的情況,用戶則需要按其中某個 SubTask 最大資源來配置總體資源,導致資源浪費且不利於多個作業複用相同 Flink 集群。

■ 改進思路

FLIP-56[5] 提議通過將 TaskManager 的資源改為動態申請來解決這個問題。TaskManager 啟動的時候只需要確定資源池大小,然後在有具體的 Flink Job 申請資源時再按需動態分配 slot。Flink Job 申請 slot 時需要附上資源需求,TaskManager 會根據該需求來確定 slot 資源。

640-6.png
圖6. 動態 slot 分配

值得注意的是,slot 資源需求可以是 unknown。提案引入了一個新的默認 slot 資源要求配置項,它表示一個 slot 佔總資源的比例。如果 slot 資源未知,TaskManager 將按照該比例切分出 slot 資源。為了保持和現有靜態 slot 模型的兼容性,如果該配置項沒有被配置,TaskManager 會根據 slot 數目均等分資源生成 slot。

目前而言,該 FLIP 主要涉及到 Managed Memory 資源,TaskManager 的其他資源比如 JVM heap 還是多個 slot 共享的。

FLIP-53: 細粒度的算子資源管理

■ 背景

FLIP-56 使得 slot 的資源可以根據實際需求確定,而 FLIP-53 則探討了 Operator (算子)層面如何表達資源需求,以及如何根據不同 Operator 的設置來計算出總的 slot 資源。

目前 DataSet API 以及有可以指定 Operator 資源佔比的方法(TaskConfig 和 ChainedDriver),因此這個 FLIP 只涉及到 DataStream API 和 Table/SQL API (先在 Blink Planner 實現)。不過提案並沒有包括用戶函數 API 上的變化(類似新增 dataStream.setResourceSpec() 函數),而是主要討論 DataStream 到 StreamGraph 的翻譯過程如何計算 slot 資源。改進完成後,這三個 API 的資源計算邏輯在底層會是統一的。

■ 改進思路

要理解 Flink 內部如何劃分資源,首先要對 Flink 如何編譯用戶代碼並部署到分佈式環境的過程有一定的瞭解。

640-7.jpg

圖7. Flink 作業編譯部署流程

以 DataStream API 為例,用戶為 DataStream 新增 Operator 時,Flink 在底層會將以一個對應的 Transform 來封裝。比如 dataStream.map(new MyMapFunc()) 會新增一個 OneInputTransformation 實例,裡面包括了序列化的 MyMapFunc 實例,以及 Operator 的配置(包括名稱、uid、並行度和資源等),並且記錄了它在拓撲中的前一個 Transformation 作為它的數據輸入。

當 env.execute() 被調用時,在 client 端 StreamGraphGenerator 首先會遍歷 Transformation 列表構造出 StreamGraph 對象(每個 Operator 對應一個 StreamNode),然後 StreamingJobGraphGenerator 再將 StreamGraph 翻譯成 DataStream/DataSet/Table/SQL 通用的 JobGraph(此時會應用 chaining policy 將可以合併的 Operator 合併為 OperatorChain,每個 OperatorChain 或不能合併的 Operator 對應一個 JobVertex),並將其傳給 JobManager。

JobManager 收到 JobGraph 後首先會將其翻譯成表示運行狀態的 ExecutionGraph,ExecutionGraph 的每個節點稱為 ExecutionJobVertex,對應一個 JobVertex。ExecutionJobVertex 有一個或多個並行度且可能被調度和執行多次,其中一個並行度的一次執行稱為 Execution,JobManager 的 Scheduler 會為每個 Execution 分配 slot。

細粒度的算子資源管理將以下面的方式作用於目前的流程:

  1. 用戶使用 API 構建的 Operator(以 Transformation 表示)會附帶 ResourceSpecs,描述該 Operator 需要的資源,默認為 unknown。
  2. 當生成 JobGraph 的時候,StreamingJobGraphGenerator 根據 ResourceSpecs 計算出每個 Operator 佔的資源比例(主要是 Managed Memory 的比例)。
  3. 進行調度的時候,Operator 的資源將被加總成為 Task 的 ResourceProfiles (包括 Managed Memory 和根據 Task 總資源算出的 Network Memory)。這些 Task 會被劃分為 SubTask 實例被部署到 TaskManager 上。
  4. 當 TaskManager 啟動 SubTask 的時候,會根據各 Operator 的資源佔比劃分 Slot Managed Memory。劃分的方式可以是用戶指定每個 Operator 的資源佔比,或者默認均等分。

值得注意的是,Scheduler 的調度有分 EAGER 模式和 LAZY_FROM_SOURCE 兩種模式,分別用於 Stream 作業和 Batch 作業,它們會影響到 slot 的資源計算。Stream 類型的作業要求所有的 Operator 同時運行,因此資源的需求是急切的(EAGER);而 Batch 類型的作業可以劃分為多個階段,不同階段的 Operator 不需要同時運行,可以等輸入數據準備好了再分配資源(LAZY_FROM_SOURCE)。這樣的差異導致如果要充分利用 slot,Batch 作業需要區分不同階段的 Task,同一時間只考慮一個階段的 Task 資源。

解決的方案是將 slot sharing 的機制拓展至 Batch 作業。默認情況下 Stream 作業的所有 Operator 都屬於 default sharing group,所以全部 Operator 都能共用都一個 slot。對於 Batch 作業而言,我們將整個 JobGraph 根據 suffle 劃分為一至多個 Region,每個 Region 屬於獨立的 sharing group,因而不會被放到同一個 slot 裡面。

640-7.png

圖8. 不同作業類型的 Slot Sharing Group

總結

隨著 Flink 的越來越大規模地被應用於各種業務,目前資源管理機制的靈活性、易用性不足的問題越發凸顯,新的細粒度資源管理機制將大大緩解這個問題。此外,新資源管理機制將統一流批兩者在 runtime 層資源管理,這也為將最終的流批統一打下基礎。對於普通用戶而言,這裡的大多數變動是透明的,主要的影響應該是出現新的內存相關的配置項需要了解一下。

參考資料:

1.[[FLINK-13477] Containerized TaskManager killed because of lack of memory overhead](https://issues.apache.org/jira/browse/FLINK-13477)

2.機遇與挑戰:Apache Flink 資源管理機制解讀與展望

3.[[FLINK-7289] Memory allocation of RocksDB can be problematic in container environments](https://issues.apache.org/jira/browse/FLINK-7289)

4.FLIP-49: Unified Memory Configuration for TaskExecutors

5.FLIP-56: Dynamic Slot Allocation

6.FLIP-53: Fine Grained Operator Resource Management

Leave a Reply

Your email address will not be published. Required fields are marked *