大數據

Flink SQL CDC 上線!我們總結了 13 條生產實踐經驗

作者:曾慶東

摘要:7月,Flink 1.11 新版發佈,在生態及易用性上有大幅提升,其中 Table & SQL 開始支持 Change Data Capture(CDC)。CDC 被廣泛使用在複製數據、更新緩存、微服務間同步數據、審計日誌等場景,本文由社區由曾慶東同學分享,主要介紹 Flink SQL CDC 在生產環境的落地實踐以及總結的實戰經驗,文章分為以下幾部分:

  1. 項目背景
  2. 解決方案
  3. 項目運行環境與現狀
  4. 具體實現
  5. 踩過的坑和學到的經驗
  6. 總結

Tips:點擊下方鏈接可查看社區直播的 Flink SQL CDC 相關視頻~
https://flink-learning.org.cn/developers/flink-training-course3/

01 項目背景

本人目前參與的項目屬於公司裡面數據密集、計算密集的一個重要項目,需要提供高效且準確的 OLAP 服務,提供靈活且實時的報表。業務數據存儲在 MySQL 中,通過主從複製同步到報表庫。作為集團級公司,數據增長多而且快,出現了多個千萬級、億級的大表。為了實現各個維度的各種複雜的報表業務,有些千萬級大表仍然需要進行 Join,計算規模非常驚人,經常不能及時響應請求。

隨著數據量的日益增長和實時分析的需求越來越大,急需對系統進行流式計算、實時化改造。正是在這個背景下,開始了我們與 Flink SQL CDC 的故事。

02 解決方案

針對平臺現在存在的問題,我們提出了把報表的數據實時化的方案。該方案主要通過 Flink SQL CDC + Elasticsearch 實現。Flink SQL 支持 CDC 模式的數據同步,將 MySQL 中的全增量數據實時地採集、預計算、並同步到 Elasticsearch 中,Elasticsearch 作為我們的實時報表和即席分析引擎。項目整體架構圖如下所示:

1.jpg

實時報表實現具體思路是,使用 Flink CDC 讀取全量數據,全量數據同步完成後,Flink CDC 會無縫切換至 MySQL 的 binlog 位點繼續消費增量的變更數據,且保證不會多消費一條也不會少消費一條。讀取到的賬單和訂單的全增量數據會與產品表做關聯補全信息,並做一些預聚合,然後將聚合結果輸出到 Elasticsearch,前端頁面只需要到 Elasticsearch 通過精準匹配(terms)查找數據,或者再使用 agg 做高維聚合統計得到多個服務中心的報表數據。

從整體架構中,可以看到,Flink SQL 及其 CDC 功能在我們的架構中扮演著核心角色。我們採用 Flink SQL CDC,而不是 Canal + Kafka 的傳統架構,主要原因還是因為其依賴組件少,維護成本低,開箱即用,上手容易。具體來說 Flink SQL CDC 是一個集採集、計算、傳輸於一體的工具,其吸引我們的優點有:

① 減少維護的組件、簡化實現鏈路;
② 減少端到端延遲;
③ 減輕維護成本和開發成本;
④ 支持 Exactly Once 的讀取和計算(由於我們是賬務系統,所以數據一致性非常重要);
⑤ 數據不落地,減少存儲成本;
⑥ 支持全量和增量流式讀取;

有關 Flink SQL CDC 的介紹和教程,可以觀看 Apache Flink 社區發佈的相關視頻:

https://www.bilibili.com/video/BV1zt4y1D7kt/

項目使用的是 flink-cdc-connectors 中提供的 mysql-cdc 組件。這是一個 Flink 數據源,支持對 MySQL 數據庫的全量和增量讀取。它在掃描全表前會先加一個全局讀鎖,然後獲取此時的 binlog position,緊接著釋放全局讀鎖。隨後開始掃描全表,當全錶快照讀取完後,會從之前獲取的 binlog position 獲取增量的變更記錄。因此這個讀鎖是非常輕量的,持鎖時間非常短,不會對線上業務造成太大影響。更多信息可以參考 flink-cdc-connectors 項目官網:https://github.com/ververica/flink-cdc-connectors

03 項目運行環境與現狀

我們在生產環境搭建了 Hadoop + Flink + Elasticsearch 分佈式環境,採用的 Flink on YARN 的 per-job 模式運行,使用 RocksDB 作為 state backend,HDFS 作為 checkpoint 持久化地址,並且做好了 HDFS 的容錯,保證 checkpoint 數據不丟失。我們使用 SQL Client 提交作業,所有作業統一使用純 SQL,沒有寫一行 Java 代碼。

目前已上線了 3 個基於 Flink CDC 的作業,已穩定在線上運行了兩個星期,並且業務產生的訂單實收和賬單實收數據能實時聚合輸出到 Elasticsearch,輸出的數據準確無誤。現在也正在對其他報表採用 Flink SQL CDC 進行實時化改造,替換舊的業務系統,讓系統數據更實時。

04 具體實現

① 進入 Flink/bin,使用 ./sql-client.sh embedded 啟動 SQL CLI 客戶端。

② 使用 DDL 創建 Flink Source 和 Sink 表。這裡創建的表字段個數不一定要與 MySQL 的字段個數和順序一致,只需要挑選 MySQL 表中業務需要的字段即可,並且字段類型保持一致。

-- 在Flink創建賬單實收source表
CREATE TABLE bill_info (
  billCode STRING,
  serviceCode STRING,
  accountPeriod STRING,
  subjectName STRING ,
  subjectCode STRING,
  occurDate TIMESTAMP,
  amt  DECIMAL(11,2),
  status STRING,
  proc_time AS PROCTIME() -–使用維表時需要指定該字段
) WITH (
  'connector' = 'mysql-cdc', -- 連接器
  'hostname' = '******',   --mysql地址
  'port' = '3307',  -- mysql端口
  'username' = '******',  --mysql用戶名
  'password' = '******',  -- mysql密碼
  'database-name' = 'cdc', --  數據庫名稱
  'table-name' = '***'
);

-- 在Flink創建訂單實收source表
CREATE TABLE order_info (
  orderCode STRING,
  serviceCode STRING,
  accountPeriod STRING,
  subjectName STRING ,
  subjectCode STRING,
  occurDate TIMESTAMP,
  amt  DECIMAL(11, 2),
  status STRING,
  proc_time AS PROCTIME()  -–使用維表時需要指定該字段
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '******',
  'port' = '3307',
  'username' = '******',
  'password' = '******',
  'database-name' = 'cdc',
  'table-name' = '***',
);

-- 創建科目維表
CREATE TABLE subject_info (
  code VARCHAR(32) NOT NULL,
  name VARCHAR(64) NOT NULL,
  PRIMARY KEY (code) NOT ENFORCED  --指定主鍵
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'table-name' = '***',
  'username' = '******',
  'password' = '******',
  'lookup.cache.max-rows' = '3000',
  'lookup.cache.ttl' = '10s',
  'lookup.max-retries' = '3'
);

-- 創建實收分佈結果表,把結果寫到 Elasticsearch
CREATE TABLE income_distribution (
  serviceCode STRING,
  accountPeriod STRING,
  subjectCode STRING,
  subjectName STRING,
  amt  DECIMAL(13,2),
  PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://xxxx:9200',
  'index' = 'income_distribution',
  'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);

以上的建表 DDL 分別創建了訂單實收 source 表、賬單實收 source 表、產品科目維表和 Elasticsearch 結果表。建表完成後,Flink 是不會馬上去同步 MySQL 的數據,而是等到用戶提交了一個 insert 作業後才會執行同步數據,並且 Flink 不會存儲數據。我們的第一個作業是計算收入分佈,數據來源於 bill_info 和 order_info 兩張 MySQL 表,並且賬單實收表和訂單實收表都需要關聯維表數據獲取應收科目的最新中文名稱,按照服務中心、賬期、科目代碼和科目名稱進行分組計算實收金額的 sum 值,實收分佈具體 DML 如下:

INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt 
FROM (
  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt 
  FROM bill_info AS b
  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code 
  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
  FROM order_info AS b
  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code 
  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;

Flink SQL 的維表 JOIN 和雙流 JOIN 寫法上不太一樣,對於維表,還需要在 Flink source table 上添加一個 proctime 字段 proc_time AS PROCTIME(),關聯的時候使用 FOR SYSTEM_TIME AS OF 的 SQL 語法查詢時態表,意思是關聯查詢最新版本的維表數據。關於維表 JOIN 的使用可參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/joins.html

③ 在 SQL Client 執行以上作業後,YARN 會創建一個 Flink 集群運行作業,並且用戶可以在 Hadoop 上查看到執行作業的所有信息,並且能進入 Flink 的 Web UI 頁面查看 Flink 作業詳情,以下是 Hadoop 所有作業情況。

3.jpg

④ 作業提交後,Flink SQL CDC 會掃描指定的 MySQL 表,在這期間 Flink 也會進行 checkpoint,所以需要按照上文所述的配置 checkpoint 的重試策略和重試次數。當數據被讀取進 Flink 後,Flink 會流式地進行作業邏輯的計算,實時統計出聚合結果輸出到 Elasticsearch(sink 端)。相當於我們使用 Flink 在 MySQL 的表上維護了一個實時的物化視圖,並將這個實時物化視圖的結果存在了 Elasticsearch 中。在 Elasticsearch 中使用 GET /income_distribution/_search{ "query": {"match_all": {}}} 命令查看輸出的實收分佈結果,如下圖:

4.jpg

通過圖中的結果可以看出聚合結果被實時的計算出來,並寫到了 Elasticsearch 中了。

05 踩過的坑和學到的經驗

1. Flink 作業原來運行在 standalone session 模式下,提交多個 Flink 作業會導致作業失敗報錯。

  • 原因:因為 standalone session 模式下啟動多個作業會導致多個作業的 Task 共享一個 JVM,可能會導致一些不穩定的問題。並且排查問題時,多個作業的日誌混在一個 TaskManager 中,增加了排查的難度。
  • 解決方法:採用 YARN 的 per-job 模式啟動多個作業,能有更好的隔離性。

2. SELECT elasticsearch table 報以下錯誤:

5.jpg

  • 原因:Elasticsearch connector 目前只支持了 sink,不支持 source 。所以不能 SELECT elasticsearch table。

3. 在 flink-conf.yaml 裡修改默認並行度,但是在 Web UI 看到作業的並行度還是 1,並行度修改不生效。

  • 解決辦法:在使用 SQL Client 時 sql-client-defaults.yaml 中的並行度配置的優先級更高。在 sql-client-defaults.yaml 中修改並行度,或者刪除 sql-client-defaults.yaml 中的並行度配置。更建議採用後者。

4. Flink 作業在掃描 MySQL 全量數據時,checkpoint 超時,出現作業 failover,如下圖:

6.jpg

  • 原因:Flink CDC 在 scan 全表數據(我們的實收表有千萬級數據)需要小時級的時間(受下游聚合反壓影響),而在 scan 全表過程中是沒有 offset 可以記錄的(意味著沒法做 checkpoint),但是 Flink 框架任何時候都會按照固定間隔時間做 checkpoint,所以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過程中,會讓執行中的 checkpoint 一直等待甚至超時。超時的 checkpoint 會被仍未認為是 failed checkpoint,默認配置下,這會觸發 Flink 的 failover 機制,而默認的 failover 機制是不重啟。所以會造成上面的現象。
  • 解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數,以及失敗重啟策略,如下:
execution.checkpointing.interval: 10min   # checkpoint間隔時間
execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失敗容忍次數
restart-strategy: fixed-delay  # 重試策略
restart-strategy.fixed-delay.attempts: 2147483647   # 重試次數

目前 Flink 社區也有一個 issue(FLINK-18578)來支持 source 主動拒絕 checkpoint 的機制,將來基於該機制,能比較優雅地解決這個問題。

5. Flink 怎麼樣開啟 YARN 的 per-job 模式?

  • 解決方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。

6. 進入 SQL Client 創建 table 後,在另外一個節點進入 SQL Client 查詢不到 table。

  • 原因:因為 SQL Client 默認的 Catalog 是在 in-memory 的,不是持久化 Catalog,所以這屬於正常現象,每次啟動 Catalog 裡面都是空的。

7. 作業在運行時 Elasticsearch 報如下錯誤:

Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]

  • 原因:數據庫表的字段 amt 的類型是 decimal,DDL 創建輸出到 es 的 amt 字段的類型也是 decimal,因為輸出到 es 的第一條數據的amt如果是整數,比如是 10,輸出到 es 的類型是 long 類型的,es client 會自動創建 es 的索引並且設置 amt 字段為 long 類型的格式,那麼如果下一次輸出到 es 的 amt 是非整數 10.1,那麼輸出到 es 的時候就會出現類型不匹配的錯誤。
  • 解決方法:手動生成 es 索引和 mapping 的信息,指定好 decimal 類型的數據格式是 saclefloat,但是在 DDL 處仍然可以保留該字段類型是 decimal。

8. 作業在運行時 mysql cdc source 報如下錯誤:

7.jpg

  • 原因:因為數據庫中別的表做了字段修改,CDC source 同步到了 ALTER DDL 語句,但是解析失敗拋出的異常。
  • 解決方法:在 flink-cdc-connectors 最新版本中已經修復該問題(跳過了無法解析的 DDL)。升級 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。

9. 掃描全表階段慢,在 Web UI 出現如下現象:

8.jpg

  • 原因:掃描全表階段慢不一定是 cdc source 的問題,可能是下游節點處理太慢反壓了。
  • 解決方法:通過 Web UI 的反壓工具排查發現,瓶頸主要在聚合節點上。通過在 sql-client-defaults.yaml 文件配上 MiniBatch 相關參數和開啟 distinct 優化(我們的聚合中有 count distinct),作業的 scan 效率得到了很大的提升,從原先的 10 小時,提升到了 1 小時。關於性能調優的參數可以參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html
configuration:
  table.exec.mini-batch.enabled: true
  table.exec.mini-batch.allow-latency: 2s
  table.exec.mini-batch.size: 5000
  table.optimizer.distinct-agg.split.enabled: true

10. CDC source 掃描 MySQL 表期間,發現無法往該表 insert 數據。

  • 原因:由於使用的 MySQL 用戶未授權 RELOAD 權限,導致無法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會退化成表級讀鎖,而使用表級讀鎖需要等到全表 scan 完,才能釋放鎖,所以會發現持鎖時間過長的現象,影響其他業務寫入數據。
  • 解決方法:給使用的 MySQL 用戶授予 RELOAD 權限即可。所需的權限列表詳見文檔:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。如果出於某些原因無法授予 RELOAD 權限,也可以顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免所有鎖的獲取,但要注意只有當快照期間表的 schema 不會變更才安全。

11. 多個作業共用同一張 source table 時,沒有修改 server id 導致讀取出來的數據有丟失。

  • 原因:MySQL binlog 數據同步的原理是,CDC source 會偽裝成 MySQL 集群的一個 slave(使用指定的 server id 作為唯一 id),然後從 MySQL 拉取 binlog 數據。如果一個 MySQL 集群中有多個 slave 有同樣的 id,就會導致拉取數據錯亂的問題。
  • 解決方法:默認會隨機生成一個 server id,容易有碰撞的風險。所以建議使用動態參數(table hint)在 query 中覆蓋 server id。如下所示:
SELECT *
FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

12. 在啟動作業時,YARN 接收了任務,但作業一直未啟動:

9.jpg

  • 原因:Queue Resource Limit for AM 超過了限制資源限制。默認的最大內存是 30G (集群內存) * 0.1 = 3G,而每個 JM 申請 2G 內存,當提交第二個任務時,資源就不夠了。
  • 解決方法:調大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,代表AM的佔總資源的百分比,默認為0.1,改成0.3(根據服務器的性能靈活配置)。

13. AM 進程起不來,一直被 kill 掉。

10.jpg

  • 原因:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默認物理內存是 1GB,動態申請到了 1GB,其中使用了386.9 MB。物理內存 x 2.1=虛擬內存,1GBx2.1≈2.1GB ,2.1GB 虛擬內存已經耗盡,當虛擬內存不夠時候,AM 的 container 就會自殺。
  • 解決方法:兩個解決方案,或調整 yarn.nodemanager.vmem-pmem-ratio 值大點,或 yarn.nodemanager.vmem-check-enabled=false,關閉虛擬內存檢查。參考:https://blog.csdn.net/lzxlfly/article/details/89175452

06 總結

為了提升了實時報表服務的可用性和實時性,一開始我們採用了 Canal+Kafka+Flink 的方案,可是發現需要寫比較多的 Java 代碼,而且還需要處理好 DataStream 和 Table 的轉換以及 binlong 位置的獲取,開發難度相對較大。另外,需要維護 Kafka 和 Canal 這兩個組件的穩定運行,對於我們小團隊來說成本也不小。由於我們公司已經有基於 Flink 的任務在線上運行,因此採用 Flink SQL CDC 就成了順理成章的事情。基於 Flink SQL CDC 的方案只需要編寫 SQL ,不用寫一行 Java 代碼就能完成實時鏈路的打通和實時報表的計算,對於我們來說非常的簡單易用,而且在線上運行的穩定性和性能表現也讓我們滿意。

我們正在公司內大力推廣 Flink SQL CDC 的使用,也正在著手改造其他幾個實時鏈路的任務。非常感謝開源社區能為我們提供如此強大的工具,也希望 Flink CDC 越來越強大,支持更多的數據庫和功能。也再次感謝雲邪老師對於我們項目上線的大力支持!

作者介紹:

曾慶東,金地物業中級開發工程師,負責聚合營業平臺實時計算開發及運維工作,從事過大數據開發,目前專注於 Apache Flink 實時計算,喜歡開源技術,喜歡分享。

更多 Flink 技術交流可加入 Apache Flink 社區釘釘交流群:

最新釘群二維碼.jpeg

Leave a Reply

Your email address will not be published. Required fields are marked *