開發與維運

有贊實時任務優化:Flink Checkpoint 異常解析與應用實踐

作者:沈磊(有贊大數據)

有贊實時任務主要以 Flink 為主,為了保證實時任務的容錯恢復以及停止重啟時的狀態恢復,幾乎所有的實時任務都會開啟 Checkpoint 或者觸發 Savepoint 進行狀態保存。由於 Savepoint 底層原理的實現和 Checkpoint 幾乎一致,本文結合 Flink 1.9 版本,重點講述 Flink Checkpoint 原理流程以及常見原因分析,讓用戶能夠更好的理解 Flink Checkpoint,從而開發出更健壯的實時任務。

一、 什麼是 Flink Checkpoint 和狀態

1.1 Flink Checkpoint 是什麼

Flink Checkpoint 是一種容錯恢復機制。這種機制保證了實時程序運行時,即使突然遇到異常或者機器問題時也能夠進行自我恢復。Flink Checkpoint 對於用戶層面來說,是透明的,用戶會感覺實時任務一直在運行。

Flink Checkpoint 是 Flink 自身的系統行為,用戶無法對其進行交互,用戶可以在程序啟動之前,設置好實時任務 Checkpoint 相關的參數,當任務啟動之後,剩下的就全交給 Flink 自行管理。

1.2 為什麼要開啟 Checkpoint

實時任務不同於批處理任務,除非用戶主動停止,一般會一直運行,運行的過程中可能存在機器故障、網絡問題、外界存儲問題等等,要想實時任務一直能夠穩定運行,實時任務要有自動容錯恢復的功能。而批處理任務在遇到異常情況時,在重新計算一遍即可。實時任務因為會一直運行的特性,如果在從頭開始計算,成本會很大,尤其是對於那種運行時間很久的實時任務來說。

實時任務開啟 Checkpoint 功能,也能夠減少容錯恢復的時間。因為每次都是從最新的 Chekpoint 點位開始狀態恢復,而不是從程序啟動的狀態開始恢復。舉個列子,如果你有一個運行一年的實時任務,如果容錯恢復是從一年前啟動時的狀態恢復,實時任務可能需要運行很久才能恢復到現在狀態,這一般是業務方所不允許的。

1.3 Flink 任務狀態是什麼

Flink Checkpoint 會將實時任務的狀態存儲到遠端存儲,比如 HDFS ,亞馬遜的 S3 等等。Flink 任務狀態可以理解為實時任務計算過程中,中間產生的數據結果,同時這些計算結果會在後續實時任務處理時,能夠繼續進行使用。實時任務的狀態可以是一個聚合結果值,比如 WordCount 統計的每個單詞的數量,也可以是消息流中的明細數據。

Flink 任務狀態整體可以劃分兩種:Operator 狀態和 KeyedState。常見的 Operator 狀態,比如 Kafka Topic 每個分區的偏移量。KeyedState 是基於 KeyedStream 來使用的,所以在使用前,你需要對你的流通過 keyby 來進行分區,常見的狀態比如有 MapState、ListState、ValueState 等等。

下面是一個實時計算奇數和偶數的任務的示例:

640.jpeg

在上圖中,假如輸入的流來自於 Kafka ,那麼 Kafka Topic 分區的偏移量是狀態,所有奇數的和、所有偶數的和也都是狀態。

二、 Flink Checkpoint 流程和原理

2.1 開啟 Checkpoint 功能

想要使用 Flink Checkpoint 功能,首先是要在實時任務開啟 Checkpoint。Flink 默認情況下是關閉 Checkpoint 功能,下面代碼是開啟 Checkpoint :

640-2.jpeg

上述代碼中,設置了 Flink Checkpoint 的間隔 3 秒,設置的 Checkpoint 的語義為 EXACTLY_ONCE。Flink 默認的 Checkpoint 語義為 EXACTLY_ONCE。上述代碼也使用 RocksDBStateBackend 進行狀態存儲。用戶也可以自己設置 Flink Checkpoint 的參數,通過 CheckpointConfig 這個類進行設置,代碼如下:

CheckpointConfig chkConfig = env.getCheckpointConfig();
/** 調用 CheckpointConfig 各種 set 方法 */
chkConfig.setX

2.2 Flink 一次 Checkpoint 的參與者

Flink 整體作業採用主從架構,Master 為 JobManager,Slave 為 TaskManager,Client 則是負責提交用戶實時任務的代碼邏輯 ,Flink 整體框架圖如下圖所示:

640-3.jpeg

JobManager 主要負責實時任務的調度以及對 Checkpoint 的觸發,TaskManager 負責真正用戶的代碼執行邏輯,具體表現形式則是 Task 在 TaskManager上面進行運行,一個 Task 對應一個線程,它可能運行一個算子的 SubTask,也可能是運行多個 Chain 起來的算子的 SubTask。

Flink 實時任務一次 Checkpoint 的參與者主要包括三塊:JobManager、TaskManager以及 Zookeeper。JobManager 定時會觸發執行 Checkpoint,具體則是在 JobManager 中運行的 CheckpointCoordinator 中觸發所有 Source 的 SubTask 向下遊廣播 CheckpointBarrier。

TaskManager 收到 CheckpointBarrier 後,根據 Checkpoint 的語義,決定是否在進行 CheckpointBarrier 對齊時,緩衝後續的數據記錄,當收到所有上游輸入的 CheckpointBarrier 後,開始做 Checkpoint。TaskManager Checkpoint 完成後,會向 JobManager 發送確認完成的消息。只有當所有 Sink 算子完成 Checkpoint 且發送確認消息後,該次 Checkpoint 才算完成。

在高可用模式下,ZooKeeper 主要存儲最新一次 Checkpoint 成功的目錄,當Flink 任務容錯恢復時,會從最新成功的 Checkpoint 恢復。Zookeeper 同時也存儲著 Flink 作業的元數據信息。比如在高可用模式下,Flink 會將 JobGraph 以及相關 Jar 包存儲在 HDFS 上面,Zookeeper 記錄著該信息。再次容錯重啟時,讀取這些信息,進行任務啟動。

下圖是一次 Checkpoint 的參與者:

640-4.jpeg

2.3 Checkpoint 協調者:CheckpointCoordinator

CheckpointCoordinator,是 Checkpoint 中最重要的類,協調著實時任務整個 Checkpoint 的執行。下圖是 CheckpointCoordinator 中的方法:

640-5.jpeg

Flink CheckpointCoordinator 中有幾個比較重要的方法:

  1. triggerCheckpoint,觸發 Flink 任務進行 Checkpoint 的方法
  2. triggerSavepoint,觸發 Flink 任務 Savepoint 的方法
  3. restoreSavepoint,Flink 任務從 Savepoint 狀態恢復
  4. restoreLatestCheckpointedState,從最新一次 Checkpoint 點位狀態恢復
  5. receiveAcknowledgeMessage,接受 Operator SubTask Checkpoint 完成的消息並處理

Flink CheckpointCoordinator 類是在 ExecutionGraph 形成時進行初始化的,具體則是在 ExecutionGraph 創建之後,調用 enableCheckpointing 方法,然後在該方法中,CheckpointCoordinator 進行創建。以下是 Flink Checkpoint 觸發的時序圖:

640-6.jpeg

當 Flink 作業狀態由創建到運行時,CheckpointCoordinator 中的 ScheduledThreadPoolExecutor 會定時執行 ScheduledTrigger 中的邏輯。ScheduledTrigger 本質就是一個 Runnable,run 方法中執行 triggerCheckpoint 方法。

2.4 Flink Checkpoint 流程與原理

一次 Flink Checkpoint 的流程是從 CheckpointCoordinator 的 triggerCheckpoint 方法開始,下面來看看一次 Flink Checkpoint 涉及到的主要內容:

  1. Checkpoint 開始之前先進行預檢查,比如檢查最大併發的 Checkpoint 數,最小的 Checkpoint 之間的時間間隔。默認情況下,最大併發的 Checkpoint 數為 1,最小的 Checkpoint 之間的時間間隔為 0.
  2. 判斷所有 Source 算子的 Subtask (Execution) 是否都處於運行狀態,有則直接報錯。同時檢查所有待確認的算子的 SubTask(Execution)是否是運行狀態,有則直接報錯。
  3. 創建 PendingCheckpoint,同時為該次 Checkpoint 創建一個 Runnable,即超時取消線程,默認 Checkpoint 十分鐘超時。
  4. 循環遍歷所有 Source 算子的 Subtask(Execution),最底層調用 Task 的triggerCheckpointBarrier, 廣播 CheckBarrier 到下游 ,同時 Checkpoint 其狀態。
  5. 下游的輸入中有 CheckpointBarrierHandler 類來處理 CheckpoinBarrier,然後會調用 notifyCheckpoint 方法,通知 Operator SubTask 進行 Checkpoint。
  6. 每當 Operator SubTask 完成 Checkpoint 時,都會向 CheckpointCoordoritor 發送確認消息。CheckpointCoordinator 的 receiveAcknowledgeMessage 方法會進行處理。
  7. 在一次 Checkpoint 過程中,當所有從 Source 端到 Sink 端的算子 SubTask 都完成之後,CheckpointCoordoritor 會通知算子進行 notifyCheckpointCompleted 方法,前提是算子的函數實現 CheckpointListener 接口。

Flink 會定時在任務的 Source 算子的 SubTask 觸發 CheckpointBarrier,CheckpointBarrier 是一種特殊的消息事件,會隨著消息通道流入到下游的算子中。只有當最後 Sink 端的算子接收到 CheckpointBarrier 並確認該次 Checkpoint 完成時,該次 Checkpoint 才算完成。所以在某些算子的 Task 有多個輸入時,會存在 Barrier 對齊時間,我們可以在 Flink Web UI上面看到各個 Task 的 CheckpointBarrier 對齊時間。

下圖是一次 Flink Checkpoint 實例流程示意圖:

640-7.jpeg

Flink Checkpoint 保存的任務狀態在程序取消停止時,默認會進行清除。Checkpoint 狀態保留策略主要有兩種:

DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION

DELETE_ON_CANCELLATION 表示當程序取消時,刪除 Checkpoint 存儲的狀態文件。RETAIN_ON_CANCELLATION 表示當程序取消時,保存之前的 Checkpoint 存儲的狀態文件 用戶可以結合業務情況,設置 Checkpoint 保留模式:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 開啟 checkpoint */
env.enableCheckpointing(10000);
/** 設置 checkpoint 保留策略,取消程序時,保留 checkpoint 狀態文件 */
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.5 Flink Checkpoint 語義

Flink Checkpoint 支持兩種語義:Exactly_Once 和 At_least_Once,默認的 Checkpoint 語義是 Exactly_Once。具體語義含義如下:

Exactly_Once 含義是:保證每條數據對於 Flink 任務的狀態結果隻影響一次。打個比方,比如 WordCount 程序,目前實時統計的 "hello" 這個單詞數為 5,同時這個結果在這次 Checkpoint 成功後,保存在了 HDFS。在下次 Checkpoint 之前, 又來 2 個 "hello" 單詞,突然程序遇到外部異常自動容錯恢復,會從最近的 Checkpoint 點開始恢復,那麼會從單詞數為 5 的這個狀態點開始恢復,Kafka 消費的數據點位也是狀態為 5 這個點位開始計算,所以即使程序遇到外部異常自動恢復時,也不會影響到 Flink 狀態的結果計算。

At_Least_Once 含義是:每條數據對於 Flink 任務的狀態計算至少影響一次。比如在 WordCount 程序中,你統計到的某個單詞的單詞數可能會比真實的單詞數要大,因為同一條消息,當 Flink 任務容錯恢復後,可能將其計算多次。

Flink 中 Exactly_Once 和 At_Least_Once 具體是針對 Flink 任務狀態而言的,並不是 Flink 程序對消息記錄只處理一次。舉個例子,當前 Flink 任務正在做 Checkpoint,該次 Checkpoint 還沒有完成,這次 Checkpoint 時間段的數據其實已經進入 Flink 程序處理,只是程序狀態沒有最終存儲到遠程存儲。當程序突然遇到異常,進行容錯恢復時,那麼就會從最新的 Checkpoint 進行狀態恢復重啟,上一次 Checkpoint 成功到這次 Checkpoint 失敗的數據還會進入 Flink 系統重新處理,具體實例如下圖:

640.png

上圖中表示一個 WordCount 實時任務的 Checkpoint,在進行 chk-5 Checkpoint 時,突然遇到程序異常,那麼實時任務會從 chk-4 進行恢復,那麼之前 chk-5 處理的數據,Flink 系統會再次進行處理。不過這些數據的狀態沒有 Checkpoint 成功,所以 Flink 任務容錯恢復再次運行時,對於狀態的影響還是隻有一次。

Exactly_Once 和 At_Least_Once 具體在底層實現大致相同,具體差異表現在 CheckpointBarrier 對齊方式的處理:

640-8.jpeg

如果是 Exactly_Once 模式,某個算子的 Task 有多個輸入通道時,當其中一個輸入通道收到 CheckpointBarrier 時,Flink Task 會阻塞該通道,其不會處理該通道後續數據,但是會將這些數據緩存起來,一旦完成了所有輸入通道的 CheckpointBarrier 對齊,才會繼續對這些數據進行消費處理。

對於 At_least_Once,同樣針對某個算子的 Task 有多個輸入通道的情況下,當某個輸入通道接收到 CheckpointBarrier 時,它不同於 Exactly Once,即使沒有完成所有輸入通道 CheckpointBarrier 對齊,At Least Once 也會繼續處理後續接收到的數據。所以使用 At Least Once 不能保證數據對於狀態計算只有一次的計算影響。

三、 Flink Checkpoint 常見失敗原因和注意點

3.1 Flink Checkpoint 常見失敗原因分析

Flink Checkpoint 失敗有很多種原因,常見的失敗原因如下:

  1. 用戶代碼邏輯沒有對於異常處理,讓其直接在運行中拋出。比如解析 Json 異常,沒有捕獲,導致 Checkpoint失敗,或者調用 Dubbo 超時異常等等。
  2. 依賴外部存儲系統,在進行數據交互時,出錯,異常沒有處理。比如輸出數據到 Kafka、Redis、HBase等,客戶端拋出了超時異常,沒有進行捕獲,Flink 任務容錯機制會再次重啟。
  3. 內存不足,頻繁GC,超出了 GC 負載的限制。比如 OOM 異常
  4. 網絡問題、機器不可用問題等等。

從目前的具體實踐情況來看,Flink Checkpoint 異常覺大多數還是用戶代碼邏輯的問題,對於程序異常沒有正確的處理導致。所以在編寫 Flink 實時任務時,一定要注意處理程序可能出現的各種異常。這樣,也會讓實時任務的邏輯更加的健壯。

當自己的 Flink 實時任務 Checkpoint 失敗時,用戶可以先通過 Flink Web UI 進行快速定位 Checkpoint 失敗的原因,如果在 Flink Web UI 上面沒有看到異常信息,可以去看任務的具體日誌進行定位,如下是 Flink Web UI 查看錯誤原因示意圖:

640-9.jpeg

3.2 Flink Checkpoint 參數配置及注意點

下面是設置 Flink Checkpoint 參數配置的建議及注意點:

  1. 當 Checkpoint 時間比設置的 Checkpoint 間隔時間要長時,可以設置 Checkpoint 間最小時間間隔。這樣在上次 Checkpoint 完成時,不會立馬進行下一次 Checkpoint,而是會等待一個最小時間間隔,之後再進行 Checkpoint。否則,每次 Checkpoint 完成時,就會立馬開始下一次 Checkpoint,系統會有很多資源消耗 Checkpoint 方面,而真正任務計算的資源就會變少。
  2. 如果Flink狀態很大,在進行恢復時,需要從遠程存儲上讀取狀態進行恢復,如果狀態文件過大,此時可能導致任務恢復很慢,大量的時間浪費在網絡傳輸方面。此時可以設置 Flink Task 本地狀態恢復,任務狀態本地恢復默認沒有開啟,可以設置參數 state.backend.local-recovery 值為 true 進行激活。
  3. Checkpoint 保存數,Checkpoint 保存數默認是1,也就是隻保存最新的 Checkpoint 的狀態文件,當進行狀態恢復時,如果最新的 Checkpoint 文件不可用時(比如 HDFS 文件所有副本都損壞或者其他原因),那麼狀態恢復就會失敗,如果設置 Checkpoint 保存數 2,即使最新的Checkpoint恢復失敗,那麼Flink 會回滾到之前那一次 Checkpoint 的狀態文件進行恢復。考慮到這種情況,用戶可以增加 Checkpoint 保存數。
  4. 建議設置的 Checkpoint 的間隔時間最好大於 Checkpoint 的完成時間。

下圖是不設置 Checkpoint 最小時間間隔示例圖,可以看到,系統一致在進行 Checkpoint,大量的資源使用在 Flink Chekpoint 上,可能對運行的任務產生一定影響:

640-2.png

還有一種特殊的情況,Flink 端到端 Sink 的 EXACTLYONCE 的問題,也就是數據從 Flink 端到外部消息系統的消息一致性。打個比方,Flink 輸出數據到 Kafka 消息系統中,如果使用 Kafka 0.10 的版本,Flink 不支持端到端的 EXACTLYONCE,可能存在消息重複輸入到 Kafka。

640-3.png

如上圖所示,當做 chk-5 Checkpoint 的時候,chk-5 失敗,然後從 chk-4 來進行恢復,但是 chk-5 的部分數據在 Chekpoint 失敗之前就有部分進入到 Kafka 消息系統,再次恢復時,該部分數據可能再次重放到 Kafka 消息系統中。
Flink 中解決端到端的一致性有兩種方法:做冪等以及事務寫,冪等的話,可以使用 KV 存儲系統來做冪等,因為 KV 存儲系統的多次操作結果都是相同的。Flink 內部目前支持二階段事務提交,Kafka 0.11 以上版本支持事務寫,所以支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。

四、 有讚的優化實踐

有贊實時計算對於 Flink 任務的 Checkpoint 和 Savepoint 做了兩個方面工作,第一個工作是對於 Flink Checkpoint 失敗的情況,如果 Checkpoint 失敗過於頻繁,同時 Flink Checkpoint 失敗次數如果達到平臺默認的失敗閾值,平臺會及時給用戶報警提示。我們會每 5 分鐘檢查一次實時任務,統計實時任務近 15 分鐘內,Flink Checkpoint 失敗次數的最大值和最小值的差值達到平臺默認的閾值,則會立馬給用戶報警,讓用戶能夠及時的處理問題。

當然,並不是所有的 Flink 實時任務 Checkpoint 失敗平臺都能發現,因為 Checkpoint 失敗次數的檢查,首先與用戶配置的 Checkpoint 的時間間隔有關。舉個例子,如果用戶配置的 Checkpoint 間隔為 1 小時,其實平臺默認 Checkpoint 邏輯檢查根本就無法發現實時任務 Checkpoint 失敗。

針對這種情況,實時平臺也支持用戶自定義設置 Checkpoint 失敗閾值,目前支持兩種 Checkpoint 失敗邏輯檢查,一個是 實時任務的 Checkpoint 失敗次數的總和達到閾值,另一個則是近 10 分鐘內,Flink Checkpoint 次數的最大值和最小值的差值的計算邏輯,用戶可以根據實時任務的敏感度,設置具體的參數。

第二個方面則是針對 Flink 任務的狀態恢復,為了防止實時任務的狀態丟失,實時計算平臺會定期的對實時任務進行 Savepoint 觸發,當任務由於外界因素導致任務失敗時,這種失敗是任務直接掛掉,Yarn 任務的狀態直接為 Killed,這種情況下,如果用戶開啟自動拉起功能,實時平臺自動拉起實時任務,同時從最新的 Savepoint 進行狀態恢復,以至於狀態不丟失。同時,實時計算平臺也支持用戶停止任務時,觸發 Savepoint,再次重啟實時任務時,還是從停止時的任務狀態進行恢復。

五、 總結

目前,有贊在實時計算方面,還有很長的路要走。在滿足業務的同時,可能也會有很多的坑需要踩。後面有贊實時計算會重點在實時數倉方面進行投入,同時會基於 Flink SQL 進行功能擴展和開發。

為了用戶開發實時任務的便利性,後面有贊會開始進行在線實時計算平臺的設計開發。未來也會將實時任務遷移到 K8S 上面,這樣在大促場景下,能夠更方便的進行資源的擴容和縮容。

未來,有贊實時計算平臺會為用戶帶來更好的開發體驗,降低用戶開發實時任務的難度,讓我們一起拭目以待。

Leave a Reply

Your email address will not be published. Required fields are marked *