開發與維運

Flink 在有讚的實踐和應用

作者:沈磊

簡介:今天主要分享的內容是 Flink 在有讚的實踐和應用。內容包括:

  1. Flink 的容器化改造和實踐
  2. Flink SQL 的實踐和應用
  3. 未來規劃。

GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~

一、Flink 的容器化改造和實踐

1. 有讚的集群演進歷史

  • 2014 年 7 月,第一個 Storm 任務正式上線;
  • 2016 年,引入 Spark Streaming, 運行在 Hadoop Yarn;
  • 2018 年,引入了 Flink,作業模式為 Flink on Yarn Per Job;
  • 2020 年 6 月,實現了 100% Flink Jar 任務 K8s 化, K8s 作為 Flink Jar 默認計算資源,Flink SQL 任務 On Yarn,Flink 統一實時開發;
  • 2020 年 11 月,Storm 集群正式下線。原先的 storm 任務全部都遷移到了 Flink;
  • 2021 年,我們打算把所有的 Flink 任務 K8s 化。

圖片

2. Flink 在內部支持的業務場景

Flink 支持的業務場景有風控,埋點的實時任務,支付,算法實時特徵處理,BI 的實時看板,以及實時監控等等。目前的實時任務規模有 500+。

圖片

3. 有贊在 Flink on Yarn 的痛點

主要有三部分:

  • 第一,CPU 沒有隔離。Flink On Yarn 模式,CPU 沒有隔離,某個實時任務造成某臺機器 CPU 使用過高時, 會對該機器其他實時任務造成影響;
  • 第二,大促擴縮容成本高。Yarn 和 HDFS 服務使用物理機,物理機在大促期間擴縮容不靈活,同時需要投入一定的人力和物力;
  • 第三,需要投入人力運維。公司底層應用資源統一為 K8S,單獨再對 Yarn 集群運維,會再多一類集群的人力運維成本。

圖片

4. Flink on k8s 相對於 Yarn 的優勢

可以歸納為 4 點:

  • 第一,統一運維。公司統一化運維,有專門的部門運維 K8S;
  • 第二,CPU 隔離。K8S Pod 之間 CPU 隔離,實時任務不相互影響,更加穩定;
  • 第三,存儲計算分離。Flink 計算資源和狀態存儲分離,計算資源能夠和其他組件資源進行 混部,提升機器使用率;
  • 第四,彈性擴縮容。大促期間能夠彈性擴縮容,更好的節省人力和物力成本。

圖片

5. 實時集群的部署情況

總體上分為三層。第一層是存儲層;第二層是實時計算資源層;第三層是實時計算引擎層。

  • 存儲層主要分為兩部分:

    • 第一個就是雲盤,它主要存儲 Flink 任務本地的狀態,以及 Flink 任務的日誌;
    • 第二部分是實時計算 HDFS 集群,它主要存儲 Flink 任務的遠端狀態。
  • 第二層是實時計算的資源層,分為兩部分:

    • 一個是 Hadoop Yarn 集群;
    • 另一個是 Flink k8s 集群,再往下細分,會有 Flink k8s 和離線的 HDFS 混部集群的資源,還有 Flink k8s 單獨類型的集群資源。
  • 最上層有一些實時 Flink Jar,spark streaming 任務,以及 Flink SQL 任務。

我們考慮混部的原因是,離線 HDFS 集群白天機器使用率不高。把離線 HDFS 集群計算資源給實時任務,離線使用內部其他組件的彈性計算資源,從而提升機器使用率,更好的達到降本效果。

圖片

6. Flink on k8s 的容器化流程

如下圖所示:

  1. 第一步,實時平臺的 Flink Jar 任務提交,Flink Jar 任務版本管理,Docker Flink 任務鏡像構建,上傳鏡像到 Docker 鏡像倉庫;
  2. 第二步,任務啟動;
  3. 第三步,yaml 文件創建;
  4. 第四步,和 k8s Api Server 之間進行命令交互;
  5. 第五步,從 Docker 鏡像倉庫拉取 Flink 任務鏡像到 Flink k8s 集群;
  6. 最後,任務運行。這邊有幾個 tips:

    • 作業模式為 Flink Standalone Per Job 模式;
    • 每個 Flink Jar 任務一個鏡像,通過任務名稱 + 時間截作為鏡像的版本;
    • JobManager 需要創建為 Deployment 而不是 Job 類型;
    • Dockerfile 指定 HADOOP_USER_NAME,與線上任務保持一致。

圖片

7. 在 Flink on k8s 的一些實踐

  • 第一個實踐是解決資源少配任務無法啟動這個問題。

    先來描述一下問題,Flink on k8s 非雲原生,無法做到實時任務資源按需申請。當用戶在平臺配置的資源少於實時任務真實使用的資源時(比如用戶代碼寫死併發度,但用戶配置的併發度小於該值),會出現實時任務無法啟動的問題。

    針對這個問題,我們內部增加了一種 Flink Jar 任務併發度的自動檢測機制。它的主要流程如下圖所示。首先,用戶會在我們平臺去提交 Flink Jar 作業,當他提交完成之後,在後臺會把 Jar 作業以及運行參數,構建 PackagedProgram。通過 PackagedProgram 獲取到任務的預執行計劃。再通過它獲取到任務真實的併發度。如果用戶在代碼裡配置的併發度小於平臺端配置的資源,我們會使用在平臺端的配置去申請資源,然後進行啟動;反之,我們會使用它真實的任務併發度去申請資源,啟動任務。

圖片

  • 第二個實踐是 Flink on k8s 任務的資源分析工具。

    首先來說一下背景,Flink k8s 任務資源是用戶自行配置,當配置的併發度或者內存過大時,存在計算資源浪費的問題,從而會增加底層機器成本。怎麼樣去解決這個問題,我們做了一個平臺管理員的工具。對於管理員來說,他可以從兩種視角去看這個任務的資源是否進行了一個超配:

    • 第一個是任務內存的視角。我們根據任務的 GC 日誌,通過一個開源工具 GC Viewer,拿到這一個實時任務的內存使用指標;
    • 第二個是消息處理能力的視角。我們在 Flink 源碼層增加了數據源輸入 record/s 和任務消息處理時間 Metric。根據 metric 找到消息處理最慢的 task 或者 operator,從而判斷併發度配置是否合理。

    管理員根據內存分析指標以及併發度合理性,結合優化規則,預設置 Flink 資源。然後我們會和業務方溝通與調整。右圖是兩種分析結果,上面是 Flink on K8S pod 內存分析結果。下面是 Flink K8S 任務處理能力的分析結果。最終,我們根據這些指標就可以對任務進行一個資源的重新調整,降低資源浪費。目前我們打算把它做成一個自動化的分析調整工具。

圖片

  • 接下來是 Flink on K8s 其他的相關實踐。

    • 第一,基於 Ingress Flink Web UI 和 Rest API 的使用。每個任務有一個 Ingress 域名,始終通過域名訪問 Flink Web UI 以及 Resti API 使用;
    • 第二,掛載多個 hostpath volume,解決單塊雲盤 IO 限制。單塊雲盤的寫入帶寬以及 IO 能力有瓶頸,使用多塊雲盤,降低雲盤 Checkpoint 狀態和本地寫入的壓力;
    • 第三,Flink 相關通用配置 ConfigMap 化、Flink 鏡像上傳成功的檢測。為 Filebeat、Flink 作業通用配置,創建 configmap,然後掛載到實時任務中,確保每個 Flink 任務鏡像都成功上傳到鏡像倉庫;
    • 第四,HDFS 磁盤 SSD 以及基於 Filebeat 日誌採集。SSD 磁盤主要是為了降低磁盤的 IO Wait 時 間,調整 dfs.block.invalidate.limit,降低 HDFS Pending delete block 數。任務日誌使用 Filebeat 採集,輸出到 kafka,後面通過自定義 LogServer 和離線公用 LogServer 查看。

圖片

8. Flink on K8s 當前面臨的痛點

  • 第一,JobManager HA 問題。JobManager Pod 如果掛掉,藉助於 k8s Deployment 能力,JobManager 會根據 yaml 文件重啟,狀態可能會丟失。而如果 yaml 配置 Savepoint 恢復,則消息可能大量重複。我們希望後續藉助於 ZK 或者 etcd 支持 Jobmanager HA;
  • 第二,修改代碼,再次上傳時間久。一旦代碼修改邏輯,Flink Jar 任務上傳時間加上打鏡像時間可能是分鐘級別,對實時性要求比較高的業務或許有影響。我們希望後續可以參考社區的實現方式,從 HDFS 上面拉取任務 Jar 運行;
  • 第三,K8S Node Down 機, JobManager 恢復慢。一旦 K8S Node down 機後, Jobmanager Pod 恢復運行需要 8分鐘左右,主要是 k8s 內部異常發現時間以及作業啟動時間,對部分業務有影響,比如CPS實時任務。如何解決,平臺端定時檢測 K8s node 狀態,一旦檢測到 down 機狀態,將 node 上面有 JobManager 所屬的任務停止掉,然後從其之前 checkpoint 恢復;
  • 第四,Flink on k8s 非雲原生。當前通過 Flink Jar 任務併發度自動檢測工具解決資源少配無法啟動問題,但是如果任務的預執行計劃無法獲取,就無法獲取到代碼配置的併發度。我們的思考是: Flink on k8s 雲原生功能以及前面的 1、2 問題,如果社區支持的比較快速的話,後面可能會考慮將 Flink 版本與社區版本對齊。

圖片

9. Flink on K8s的一些方案推薦

  • 第一種方案,是平臺自己去構建和管理任務的鏡像。

    • 優點是:平臺方對於構建鏡像,以及運行實時任務整體流程自我掌控,具體問題能夠及時修正。
    • 缺點是:需要對 Docker 以及 K8S 相關技術要有一定了解,門檻使用比較高,同時需要考慮非雲原生相關問題。它的適用版本為 Flink 1.6 以上。
  • 第二種方案,Flink k8s Operator。

    • 優點是:對用戶整體封裝了很多底層細節,使用門檻相對降低一些。
    • 缺點是:整體使用沒有第一種方案那麼靈活,一旦有問題,由於底層使用的是其封裝的功能,底層不好修改。它的適用版本為Flink 1.7 以上。
  • 最後一種方案是,基於社區 Flink K8s 功能。

    • 優點是:雲原生,對於資源的申請方面更加友好。同時,用戶使用會更加方便,屏蔽很多底層實現。
    • 缺點是:K8s 雲原生功能還是實驗中的功能,相關功能還在開發中,比如 k8s Per job 模式。它的適用版本為Flink 1.10 以上。

圖片

二、Flink SQL 實踐和應用

1. 有贊 Flink SQL 的發展歷程

  • 2019 年 9 月,我們對 Flink 1.9 、1.10 SQL 方面的能力進行研究和嘗試,同時增強了一些 Flink SQL 功能。
  • 2019 年 10 月,我們進行了 SQL 功能驗證,基於埋點實時需求,驗證 Flink SQL Hbase 維表關聯功能,結果符合預期。
  • 2020 年 2 月,我們對 SQL 的功能進行了擴展,以 Flink 1.10 作為 SQL 計算引擎,進行 Flink SQL 功能擴展開發和優化,實時平臺支持全 SQL 化開發。
  • 2020 年 4 月,開始支持實時數倉、有贊教育、美業、零售等相關實時需求。
  • 2020 年 8 月,新版的實時平臺才開始正式上線,目前主推 Flink SQL 開發我們的實時任務。

圖片

2. 在 Flink SQL 方面的一些實踐

主要分為三個方面:

  • 第一,Flink Connector 的實踐包括:Flink SQL 支持 Flink NSQ Connector、Flink SQL 支持 Flink HA Hbase Sink 和維表、Flink SQL 支持無密 Mysql Connector、Flink SQL 支持標準輸出(社區已經支持)、Flink SQL 支持 Clickhouse Sink;
  • 第二,平臺層的實踐包括:Flink SQL 支持 UDF 以及 UDF 管理、支持任務從 Checkpoint 恢復、支持冪等函數、支持 Json 相關函數等、支持 Flink 運行相關參數配置,比如狀態時間設置,聚合優化參數等等、Flink 實時任務血緣數據自動化採集、Flink 語法正確性檢測功能;
  • 第三,Flink Runtime的實踐包括:Flink 源碼增加單個Task 以及 Operator 單條記錄處理時間指標;修復 Flink SQL 可撤回流 TOP N 的BUG。

圖片

3. 業務實踐

  • 第一個實踐是我們內部的客服機器人實時看板。流程分為三層:

    • 第一層是實時數據源,首先是線上的 MySQL 業務表,我們會把它的 Binlog 通過 DTS 服務同步到相應的 Kafka Topic;
    • 實時任務的 ODS 層有三個 Kafka Topic;
    • 在實時 DWD 層,有兩個 Flink SQL 任務。

      • Flink SQL A 消費兩個 topic,然後把這兩個 topic 裡面的數據去通過 Interval Join,根據一些窗口的作用關聯到對應的數據。同時,會對這個實時任務設置狀態的保留時間。Join 之後,會去進行一些 ETL 的加工處理,最終會把它的數據輸入到一個 topic C。
      • 另外一個實時任務 Flink SQL B 消費一個 topic,然後會對 topic 裡面的數據進行清洗,然後到 HBase 裡面去進行一個維表的關聯,去關聯它所需要的一些額外的數據,關聯的數據最終會輸入到 topic D。

    在上游,Druid 會消費這兩個 topic 的數據,去進行一些指標的查詢,最終提供給業務方使用。

圖片

  • 第二個實踐是實時用戶行為中間層。用戶在我們平臺上面會去搜索、瀏覽、加入購物車等等,都會產生相應的事件。原先的方案是基於離線來做的。我們會把數據落庫到 Hive 表,然後算法那邊的同學會結合用戶特徵、機器學習的模型、離線的數據去生成一些用戶評分預估,再把它輸入到 HBase。

    在這樣的背景下面,會有如下訴求:當前的用戶評分主要是基於離線任務,而算法同學希望結合實時的用戶特徵,更加及時、準確的提高推薦精準度。這其實就需要構建一個實時的用戶行為中間層,把用戶產生的事件輸入到 Kafka 裡面,通過 Flink SQL 作業對這些數據進行處理,然後把相應的結果輸出到 HBase 裡面。算法的同學再結合算法模型,實時的更新模型裡面的一些參數,最終實時的進行用戶的評分預估,也會落庫到 HBase,然後到線上使用。

    圖片

    用戶行為中間層的構建流程分為三個步驟:

    • 第一層,我們的數據源在 Kafka 裡面;
    • 第二層是 ODS 層,在 Flink SQL 作業裡面會有一些流表的定義,一些 ETL 邏輯的處理。然後去定義相關的 sink 表、維表等等。這裡面也會有一些聚合的操作,然後輸入到 Kafka;
    • 在 DWS 層,同樣有用戶的 Flink SQL 作業,會涉及到用戶自己的 UDF Jar,多流 Join,UDF 的使用。然後去讀取 ODS 層的一些數據,落庫到 HBase 裡面,最終給算法團隊使用。

    這裡有幾個實踐經驗:

    • 第一,Kafka Topic、Flink 任務名稱,Flink SQL Table 名稱,按照數倉命名規範。
    • 第二,指標聚合類計算,Flink SQL 任務要設置空閒狀態保留時間,防止任務狀態無限增大。
    • 第三,如果存在數據傾斜或者讀狀態壓力較大等情況,需要配置 Flink SQL 優化參數。

圖片

4. 在 HAHBase Connector 的實踐

社區 HBase Connector 數據關聯或者寫入是單 HBase 集群使用,當 HBase 集群不可用時,實時任務數據的寫入或者關聯會受到影響,從而可能會影響到業務使用。至於怎麼樣去解決這個問題。首先,在 HBase 方面有兩個集群,主集群和備集群。它們之間通過 WAL 進行主從的複製。Flink SQL 作業先寫入主集群,當主集群不可用的時候,自動降級到備集群,不會影響到線上業務的使用。

圖片

5. 無密 Mysql Connector 和指標擴展實踐

左圖是 Flink 無密 Mysql Sink 語法,解決的問題包括三點:

  • 第一,Mysql 數據庫用戶名和密碼不以明文方式向外進行暴露和存儲;
  • 第二,支持 Mysql 用戶名和密碼週期性更新;
  • 第三,內部自動根據用戶名鑑定表權限使用。這樣做最主要的目的還是保證實時任務數據庫使用更安全。

然後是左下圖,我們在 Flink 源碼層面增加 Task 和 Operator 單條消息處理時間 Metric。目的是幫助業務方,根據消息處理時間的監控指標,排查和優化 Flink 實時任務。

圖片

6. Flink 任務血緣元數據自動化採集的實踐

Flink 任務血緣元數據採集的流程如下圖所示,平臺啟動實時任務後,根據當前任務是 Flink Jar 任務,還是 Flink SQL 任務,分別走兩條不同的路徑,來獲取任務的血緣數據,再把血緣數據上報元數據系統。這樣做的價值有兩點:

  • 第一,幫助業務方瞭解實時任務加工鏈路。業務方能夠更清晰的認知實時任務之間的關係和影響,當操作任務時,能夠及時通知下游其他業務方;
  • 第二,更好的構建實時數倉。結合實時任務血緣圖,提煉實時數據公共層,提升複用性,更好的構建實時數倉。

圖片

三、未來規劃

最後是未來的規劃,包括四點:

  • 第一,推廣 Flink 實時任務 SQL 化。推廣 Flink SQL 開發實時任務,提升 Flink SQL 任務比例。
  • 第二,Flink 任務計算資源自動優化配置。從內存、任務處理能力、輸入速率等,對任務資源進行分析,對資源配置不合理任務自動化配置,從而降低機器成本。
  • 第三,Flink SQL 任務 k8s 化以及 K8s 雲原生。Flink 底層計算資源統一為 k8s,降低運維成本,Flink k8s 雲原生,更合理使用 K8s 資源。
  • 第四,Flink 與數據湖以及 CDC 功能技術的調研。新技術的調研儲備,為未來其他實時需求奠定技術基礎。

圖片

關鍵詞:Flink SQL,Flink on Yarn,Flink on K8s,實時計算,容器化


更多 Flink 相關技術交流,可掃碼加入社區釘釘大群~

image.png

活動推薦

阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版現開啟活動:
99元試用實時計算 Flink 全託管版本(包年包月、10CU)即可得定製 Flink 獨家定製T恤;另包3個月及以上還有85折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *