開發與維運

Apache Flink 誤用之痛

摘要:本文根據 Flink Forward 全球在線會議 · 中文精華版整理而成,圍繞著項目的開始、需求分析、開發,以及測試、上線、運維整個生命週期展開,介紹了 Apache Flink 實踐中的一些典型誤用情況,並給出了相應的更優實踐方案。

Flink 實踐中最首當其衝的誤用就是不按迭代開發的過程操作。最佳實踐應該遵循迭代開發的步驟進行,包含以下幾個階段:

  1. 項目開始
  2. 涉及分析
  3. 開發
  4. 測試
  5. 上線
  6. 維護

1. 項目開始

在開始開發前,我們需要選擇正確的切入方式,以下幾種往往是最糟糕的開始:

    a) 從一個具有挑戰性的用例開始(端對端的 Exactly-once、大狀態、複雜的業務邏輯、強實時SLA的組合)   
    b) 之前沒有流處理經驗   
    c) 不對團隊做相關的培訓   
    d) 不利用社區

在開發的過程中,其實要認認真真的來規劃我們的切入點,首先,要從簡單的任務開始循序漸進。要有一定的大數據和流處理的知識積累,儘量參加一些培訓,也要利用好社區資源。基於這樣的想法,我們就能很快找到切入點。

怎麼樣去做?社區提供了很多的培訓,包括 Flink Forward 和 Vererica 網站上有各種培訓課程,大家可以去看。同時,可以充分利用社區。社區還建立了中文的郵件列表,大家可以充分利用中文郵件列表來解決手頭的疑難雜症。另外,Stack Overflow 也是個提問的好地方,但在提問前儘量去看一看已有的提問,做到心中有數。

2. 設計分析

方案設計中的一些常見錯誤思維,往往是由於沒有充分思考需求導致的,比如:

    a) 不考慮數據一致性和交付保證   
    b) 不考慮業務升級和應用改進   
    c) 不考慮業務規模問題   
    d) 不深入思考實際業務需求

我們要認真分析需求,同時認真考慮實際交付情況。提到一致性和交付保障,其實可以通過幾個問題來引導大家完成這件事,如下圖所示:

圖1.jpg

第1個問題,是否在乎數據的丟失?

如果不在乎,你可以沒有 Checkpoint。

第2個問題,是否在乎結果的正確性?

在很多的場景裡面,我們非常關注結果的正確性,比如金融領域,但是另外一些場景比如監控或其他簡單的使用場景僅需要一個概要的數據統計。如果不在乎結果的正確性,可以考慮用 at-least-once 的模式配置並使用可回放的數據源。相反,如果結果的準確性十分重要,且下游不關心重複記錄,那麼僅需設置 exactly-once 模式並使用可回放的數據源。如果下游要求數據不能重複,哪怕數據正確也只能發送一次,這種時候就對 sink 有更進一步的限制,在 exactly-once 的模式下,使用可回放的數據源,並且 sink 需要支持事務。

帶著這樣的思維方式分析業務,才能非常清晰地知道,怎麼去使用 Flink,進而避免一些糟糕的事情發生。

完成分析之後,最終目的是什麼?我們為什麼要有這種選擇,而不是一上來就選一個最好的方案?

因為世界上永遠沒有“最好”,這裡的核心因素就是延遲,要根據業務的延遲和準確性需求來均衡去做選擇。

當需求都分析好之後,還需要去思考應用是否需要升級。從一個正常的 Flink 作業來講,我們有幾個問題要考慮。第一個,Flink 作業一般都有狀態讀取,做升級時需要有 savepoint 機制來保障,將狀態存儲保留在遠端,再恢復到新的作業上去。很多場景下都會有升級的需求,這簡單列了幾點:

a 升級集群版本   
b 業務 bug 的修復
c 業務邏輯(拓撲)的變更

在比較複雜的場景下,作業會有拓撲的變化,如下圖:

圖2.png

此處需要添加一個算子,去掉一個 sink 。對於這樣的變化,我們要考慮狀態的恢復。當 Flink 發現新作業有節點沒了,對應的狀態無法恢復,就會拋出異常導致升級失敗。這時候可以使用參數 --allowNonRestoreState 來忽略此類問題。

另外新作業中還有新建的節點,這個節點就用空狀態去初始化即可。除此之外,還需要注意,為了保證作業成功啟動並且狀態恢復不受影響,我們應該為算子設置 StreamAPI 中的 uid 。當然,如果狀態的結構發生了變化,Avro Types 和 POJO 的類型都是支持的,Kryo 是不支持的。最後建議所有 key 的類型儘量不要修改,因為這會涉及 shuffle 和 狀態的正確性。

資源的使用情況也是必須要考慮的因素之一,下面是一個評估內存和網絡 IO 使用的思路。這裡我們假設使用的是 Fs State,所有運行時狀態都在內存中。不恰當的資源配置可能會造成 OOM 等嚴重的問題。

圖3.png

完成資源評估後,還需要考慮事件時間和亂序問題。下面是一個具體的例子:

圖4.png

在這個例子中選擇哪種時間窗口、何時觸發計算,僅憑一句話的需求是無法描述清楚的。只有根據流處理的特性結合實際的業務去認真分析需求,才能將 Flink 技術進行恰當的運用。

還需要注意,Flink 是流批統一的計算引擎,不是所有的業務都能用流處理或者都能用批處理來實現,需要分析自己的場景適合用哪種方式來實現。

3. 開發

3.1 API 的選擇

在 DataStream API 和 Table API/SQL 的選擇上,如果有強烈的需求控制狀態和每條狀態到來的行為,要使用 DataStream API;如果是簡單的數據提取和關係代數的運算,可以選擇 Table API/SQL。在一些場景下,只能選擇 DataStream API:

a) 在升級過程中要改變狀態
b) 不能丟失遲到的數據
c) 在運行時更改程序的行為

3.2 數據類型

在開發過程中,關於數據類型,有兩種誤用場景:

a) 使用深度嵌套的複雜數據類型
b) KeySelector 中使用任意類型

正確的做法是選擇儘可能簡單的狀態類型,在 KeySelector 中不使用 Flink 不能自動識別的類型。

3.3 序列化

數據類型越簡單越好,基於序列化成本的考慮,儘量使用 POJO 和 Avro SpecificRecords。也鼓勵大家開發完使用 IDE 的工具本地調試一下,看一下性能瓶頸在哪。

表1.jpg

圖5中是一種效率較低的處理過程,我們應該先進行過濾和投影操作,防止不需要的數據進行多餘的處理。

圖5.png

3.4 併發性

兩種誤用場景及相應容易造成的問題:

  • 任務之間共享靜態變量

容易引起 bug;容易造成死鎖和競爭問題;帶來額外的同步開銷。

  • 在用戶函數中生成線程

檢查點變得複雜易錯。

對於想用線程的情況,如果是需要加速作業,可以調整並行度和資源,使用異步IO;如果是需要一些定時任務的觸發,可以使用 Flink 自帶的 Timer 定時調度任務。

3.5 窗口

儘量避免像圖6這樣自定義 Window,使用 KeyedProcessFunction 可以使得實現更加簡單和穩定。

圖6.png

另外,也要避免圖7中的這種滑動窗口,在圖7中每個記錄被50萬個窗口計算,無論是計算資源還是業務延遲都會非常糟糕。

圖7.png

3.6 可查詢狀態

Queryable State 目前還在不斷的完善中,可以用於監控和查詢,但在實際投產時還是有一些問題需要注意的,比如對於線程安全訪問,RocksDB 狀態後端是支持的,而 FS 狀態後端是不支持的,另外還有性能和一致性保障等問題需要注意。

3.7 DataStream API 的應用

對圖8這種場景,可以使用 DataStreamUtils#reinterpretAsKeyedStream 這個方法,避免面對相同的 key 進行多次 shuffle 。

圖8.png

對圖9這種場景,應該把一些初始化的邏輯寫在 RichFunction 的 open 方法裡。

圖9.png

4. 測試

圖10.png

除了系統測試和 UDF 的單元測試,還應該做 Mini Cluster 測試,在本機運行一個 Mini Cluster 把端到端的業務跑起來,可以及早地發現一些問題。

還有 Harness 測試,它可以精準地幫助完成有狀態的任務測試。它可以精準的控制 watermark、元素的 event time 等。可以參考:

https://github.com/knaufk/flink-testing-pyramid

5. 上線

很多場景會導致業務抖動,一種是實際業務本身就有抖動,其他的比如 Timer、CP 的對齊、GC 等正常現象的發生,還有追數據的場景,開始和追平的時候狀態是不一樣的,這種情況下也不用擔心,有意識地識別這種狀況,進而判斷這種是正常還是非預期狀況。

在線上監控時要注意,metrics 過多會對 JVM 造成很大壓力,上報的頻率不要選擇 subtask,這對資源的開銷是很高的。

在配置時要注意,一開始儘量不用 RocksDB 狀態後端,FS 狀態後端的部署成本低速度也更快。少用網絡的文件系統。SlotSharingGroups 的配置儘量使用默認的,避免引發欠機制的破壞,導致資源浪費。

6. 維護

像 Flink 這樣快節奏的項目,每個版本都有很多 bug 被修復,及時升級也很重要。

7.PyFlink/SQL/TableAPI 的補充

  1. 使用 TableEnvironment 還是 StreamTableEnvironment?推薦 TableEnvironment 。(分段優化)
  2. State TTL 未設置,導致 State 無限增長,或者 State TTL 設置不結合業務需求,導致數據正確性問題。
  • 不支持作業升級,例如增加一個 COUNT SUM 會導致作業 state 不兼容。
  • 解析 JSON 時,重複調度 UDF,嚴重影響性能,建議替換成 UDTF。
  • 多流 JOIN 的時候,先做小表 JOIN,再做大表 JOIN。目前,Flink 還沒有表的 meta 信息,沒法在 plan 優化時自動做 join reorder。

作者簡介:

本文由 Konstantin Knauf 分享,孫金城進行中文解說。

孫金城(金竹),Apache Member,阿里巴巴高級技術專家。2011 年加入阿里,9 年的阿里工作中,主導過很多內部核心系統,如,阿里集團行為日誌,阿里郎,雲轉碼,文檔轉換等。在 2016 年初開始瞭解 Apache Flink 社區,由初期的參與社區開發到後來逐漸主導具體模塊的開發,到負責 Apache Flink Python API(PyFlink) 的建設。 目前是 PMC member of Apache Flink and ALC(Beijing), 以及 Committer for Apache Flink, Apache Beam and Apache IoTDB。

Leave a Reply

Your email address will not be published. Required fields are marked *