大數據

Flink CDC 2.0 正式發佈,詳解核心改進

本文由社區志願者陳政羽整理,內容來源自阿里巴巴高級開發工程師徐榜江 (雪盡) 7 月 10 日在北京站 Flink Meetup 分享的《詳解 Flink-CDC》。深入講解了最新發布的 Flink CDC 2.0.0 版本帶來的核心特性,包括:全量數據的併發讀取、checkpoint、無鎖讀取等重大改進。

GitHub 地址:
https://github.com/ververica/flink-cdc-connectors

一、CDC 概述

CDC 的全稱是 Change Data Capture ,在廣義的概念上,只要是能捕獲數據變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向數據庫的變更,是一種用於捕獲數據庫中數據變更的技術。CDC 技術的應用場景非常廣泛:

  • 數據同步:用於備份,容災;
  • 數據分發:一個數據源分發給多個下游系統;
  • 數據採集:面向數據倉庫 / 數據湖的 ETL 數據集成,是非常重要的數據源。

CDC 的技術方案非常多,目前業界主流的實現機制可以分為兩種:

  • 基於查詢的 CDC:

    • 離線調度查詢作業,批處理。把一張表同步到其他系統,每次通過查詢去獲取表中最新的數據;
    • 無法保障數據一致性,查的過程中有可能數據已經發生了多次變更;
    • 不保障實時性,基於離線調度存在天然的延遲。
  • 基於日誌的 CDC:

    • 實時消費日誌,流處理,例如 MySQL 的 binlog 日誌完整記錄了數據庫中的變更,可以把 binlog 文件當作流的數據源;
    • 保障數據一致性,因為 binlog 文件包含了所有歷史變更明細;
    • 保障實時性,因為類似 binlog 的日誌文件是可以流式消費的,提供的是實時數據。

對比常見的開源 CDC 方案,我們可以發現:

img

  • 對比增量同步能力,

    • 基於日誌的方式,可以很好的做到增量同步;
    • 而基於查詢的方式是很難做到增量同步的。
  • 對比全量同步能力,基於查詢或者日誌的 CDC 方案基本都支持,除了 Canal。
  • 而對比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持較好。
  • 從架構角度去看,該表將架構分為單機和分佈式,這裡的分佈式架構不單純體現在數據讀取能力的水平擴展上,更重要的是在大數據場景下分佈式系統接入能力。例如 Flink CDC 的數據入湖或者入倉的時候,下游通常是分佈式的系統,如 Hive、HDFS、Iceberg、Hudi 等,那麼從對接入分佈式系統能力上看,Flink CDC 的架構能夠很好地接入此類系統。
  • 在數據轉換 / 數據清洗能力上,當數據進入到 CDC 工具的時候是否能較方便的對數據做一些過濾或者清洗,甚至聚合?

    • 在 Flink CDC 上操作相當簡單,可以通過 Flink SQL 去操作這些數據;
    • 但是像 DataX、Debezium 等則需要通過腳本或者模板去做,所以用戶的使用門檻會比較高。
  • 另外,在生態方面,這裡指的是下游的一些數據庫或者數據源的支持。Flink CDC 下游有豐富的 Connector,例如寫入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常見的一些系統,也支持各種自定義 connector。

二、Flink CDC 項目

講到這裡,先帶大家回顧下開發 Flink CDC 項目的動機。

1. Dynamic Table & ChangeLog Stream

大家都知道 Flink 有兩個基礎概念:Dynamic Table 和 Changelog Stream。

fcs_1

  • Dynamic Table 就是 Flink SQL 定義的動態表,動態表和流的概念是對等的。參照上圖,流可以轉換成動態表,動態表也可以轉換成流。
  • 在 Flink SQL中,數據在從一個算子流向另外一個算子時都是以 Changelog Stream 的形式,任意時刻的 Changelog Stream 可以翻譯為一個表,也可以翻譯為一個流。

聯想下 MySQL 中的表和 binlog 日誌,就會發現:MySQL 數據庫的一張表所有的變更都記錄在 binlog 日誌中,如果一直對錶進行更新,binlog 日誌流也一直會追加,數據庫中的表就相當於 binlog 日誌流在某個時刻點物化的結果;日誌流就是將表的變更數據持續捕獲的結果。這說明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一張不斷變化的 MySQL 數據庫表。

Debezium_to_cdc

在此基礎上,我們調研了一些 CDC 技術,最終選擇了 Debezium 作為 Flink CDC 的底層採集工具。Debezium 支持全量同步,也支持增量同步,也支持全量 + 增量的同步,非常靈活,同時基於日誌的 CDC 技術使得提供 Exactly-Once 成為可能。

將 Flink SQL 的內部數據結構 RowData 和 Debezium 的數據結構進行對比,可以發現兩者是非常相似的。

  • 每條 RowData 都有一個元數據 RowKind,包括 4 種類型, 分別是插入 (INSERT)、更新前鏡像 (UPDATE_BEFORE)、更新後鏡像 (UPDATE_AFTER)、刪除 (DELETE),這四種類型和數據庫裡面的 binlog 概念保持一致。
  • 而 Debezium 的數據結構,也有一個類似的元數據 op 字段, op 字段的取值也有四種,分別是 c、u、d、r,各自對應 create、update、delete、read。對於代表更新操作的 u,其數據部分同時包含了前鏡像 (before) 和後鏡像 (after)。

通過分析兩種數據結構,Flink 和 Debezium 兩者的底層數據是可以非常方便地對接起來的,大家可以發現 Flink 做 CDC 從技術上是非常合適的。

2. 傳統 CDC ETL 分析

我們來看下傳統 CDC 的 ETL 分析鏈路,如下圖所示:

img

傳統的基於 CDC 的 ETL 分析中,數據採集工具是必須的,國外用戶常用 Debezium,國內用戶常用阿里開源的 Canal,採集工具負責採集數據庫的增量數據,一些採集工具也支持同步全量數據。採集到的數據一般輸出到消息中間件如 Kafka,然後 Flink 計算引擎再去消費這一部分數據寫入到目的端,目的端可以是各種 DB,數據湖,實時數倉和離線數倉。

注意,Flink 提供了 changelog-json format,可以將 changelog 數據寫入離線數倉如 Hive / HDFS;對於實時數倉,Flink 支持將 changelog 通過 upsert-kafka connector 直接寫入 Kafka。

cdc_4_1

我們一直在思考是否可以使用 Flink CDC 去替換上圖中虛線框內的採集組件和消息隊列,從而簡化分析鏈路,降低維護成本。同時更少的組件也意味著數據時效性能夠進一步提高。答案是可以的,於是就有了我們基於 Flink CDC 的 ETL 分析流程。

3. 基於 Flink CDC 的 ETL 分析

在使用了 Flink CDC 之後,除了組件更少,維護更方便外,另一個優勢是通過 Flink SQL 極大地降低了用戶使用門檻,可以看下面的例子:

cdc_etl_sql

該例子是通過 Flink CDC 去同步數據庫數據並寫入到 TiDB,用戶直接使用 Flink SQL 創建了產品和訂單的 MySQL-CDC 表,然後對數據流進行 JOIN 加工,加工後直接寫入到下游數據庫。通過一個 Flink SQL 作業就完成了 CDC 的數據分析,加工和同步。

大家會發現這是一個純 SQL 作業,這意味著只要會 SQL 的 BI,業務線同學都可以完成此類工作。與此同時,用戶也可以利用 Flink SQL 提供的豐富語法進行數據清洗、分析、聚合。

cdc_5_1

而這些能力,對於現有的 CDC 方案來說,進行數據的清洗,分析和聚合是非常困難的。

此外,利用 Flink SQL 雙流 JOIN、維表 JOIN、UDTF 語法可以非常容易地完成數據打寬,以及各種業務邏輯加工。

cdc_5_2

4. Flink CDC 項目發展

  • 2020 年 7 月由雲邪提交了第一個 commit,這是基於個人興趣孵化的項目;
  • 2020 年 7 中旬支持了 MySQL-CDC;
  • 2020 年 7 月末支持了 Postgres-CDC;
  • 一年的時間,該項目在 GitHub 上的 star 數已經超過 800。

cdc_6

三、Flink CDC 2.0 詳解

1. Flink CDC 痛點

MySQL CDC 是 Flink CDC 中使用最多也是最重要的 Connector,本文下述章節描述 Flink CDC Connector 均為 MySQL CDC Connector。

隨著 Flink CDC 項目的發展,得到了很多用戶在社區的反饋,主要歸納為三個:

img

  • 全量 + 增量讀取的過程需要保證所有數據的一致性,因此需要通過加鎖保證,但是加鎖在數據庫層面上是一個十分高危的操作。底層 Debezium 在保證數據一致性時,需要對讀取的庫或表加鎖,全局鎖可能導致數據庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
  • 不支持水平擴展,因為 Flink CDC 底層是基於 Debezium,起架構是單節點,所以Flink CDC 只支持單併發。在全量階段讀取階段,如果表非常大 (億級別),讀取時間在小時甚至天級別,用戶不能通過增加資源去提升作業速度。
  • 全量讀取階段不支持 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支持 checkpoint 的,因此會存在一個問題:當我們同步全量數據時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。

2. Debezium 鎖分析

Flink CDC 底層封裝了 Debezium, Debezium 同步一張表分為兩個階段:

  • 全量階段:查詢當前表中所有記錄;
  • 增量階段:從 binlog 消費變更數據。

大部分用戶使用的場景都是全量 + 增量同步,加鎖是發生在全量階段,目的是為了確定全量階段的初始位點,保證增量 + 全量實現一條不多,一條不少,從而保證數據一致性。從下圖中我們可以分析全局鎖和表鎖的一些加鎖流程,左邊紅色線條是鎖的生命週期,右邊是 MySQL 開啟可重複讀事務的生命週期。

Debezium_lock

以全局鎖為例,首先是獲取一個鎖,然後再去開啟可重複讀的事務。這裡鎖住操作是讀取 binlog 的起始位置和當前表的 schema。這樣做的目的是保證 binlog 的起始位置和讀取到的當前 schema 是可以對應上的,因為表的 schema 是會改變的,比如如刪除列或者增加列。在讀取這兩個信息後,SnapshotReader 會在可重複讀事務裡讀取全量數據,在全量數據讀取完成後,會啟動 BinlogReader 從讀取的 binlog 起始位置開始增量讀取,從而保證全量數據 + 增量數據的無縫銜接。

表鎖是全局鎖的退化版,因為全局鎖的權限會比較高,因此在某些場景,用戶只有表鎖。表鎖鎖的時間會更長,因為表鎖有個特徵:鎖提前釋放了可重複讀的事務默認會提交,所以鎖需要等到全量數據讀完後才能釋放。

經過上面分析,接下來看看這些鎖到底會造成怎樣嚴重的後果:

img

Flink CDC 默認使用無鎖模式,能夠滿足大部分場景,但犧牲了一定的數據準確性。Flink CDC 也支持配置鎖模式,雖然能保證數據一致性,但存在上述 hang 住數據的風險。

3. Flink CDC 2.0 設計 ( 以 MySQL 為例)

通過上面的分析,可以知道 2.0 的設計方案,核心要解決上述的三個問題,即支持無鎖、水平擴展、checkpoint。

img

DBlog 這篇論文裡描述的無鎖算法如下圖所示:

unlock_1

左邊是 Chunk 的切分算法描述,Chunk 的切分算法其實和很多數據庫的分庫分表原理類似,通過表的主鍵對錶中的數據進行分片。假設每個 Chunk 的步長為 10,按照這個規則進行切分,只需要把這些 Chunk 的區間做成左開右閉或者左閉右開的區間,保證銜接後的區間能夠等於表的主鍵區間即可。

右邊是每個 Chunk 的無鎖讀算法描述,該算法的核心思想是在劃分了 Chunk 後,對於每個 Chunk 的全量讀取和增量讀取,在不用鎖的條件下完成一致性的合併。Chunk 的切分如下圖所示:

Chunk_cut

因為每個 chunk 只負責自己主鍵範圍內的數據,不難推導,只要能夠保證每個 Chunk 讀取的一致性,就能保證整張表讀取的一致性,這便是無鎖算法的基本原理。

Netflix 的 DBLog 論文中 Chunk 讀取算法是通過在 DB 維護一張信號表,再通過信號表在 binlog 文件中打點,記錄每個 chunk 讀取前的 Low Position (低位點) 和讀取結束之後 High Position (高位點) ,在低位點和高位點之間去查詢該 Chunk 的全量數據。在讀取出這一部分 Chunk 的數據之後,再將這 2 個位點之間的 binlog 增量數據合併到 chunk 所屬的全量數據,從而得到高位點時刻,該 chunk 對應的全量數據。

Flink CDC 結合自身的情況,在 Chunk 讀取算法上做了去信號表的改進,不需要額外維護信號表,通過直接讀取 binlog 位點替代在 binog 中做標記的功能,整體的 chunk 讀算法描述如下圖所示:

Chunk_read

比如正在讀取 Chunk-1,Chunk 的區間是 [K1, K10],首先直接將該區間內的數據 select 出來並把它存在 buffer 中,在 select 之前記錄 binlog 的一個位點 (低位點),select 完成後記錄 binlog 的一個位點 (高位點)。然後開始增量部分,消費從低位點到高位點的 binlog。

  • 圖中的 - ( k2,100 ) + ( k2,108 ) 記錄表示這條數據的值從 100 更新到 108;
  • 第二條記錄是刪除 k3;
  • 第三條記錄是更新 k2 為 119;
  • 第四條記錄是 k5 的數據由原來的 77 變更為 100。

觀察圖片中右下角最終的輸出,會發現在消費該 chunk 的 binlog 時,出現的 key 是k2、k3、k5,我們前往 buffer 將這些 key 做標記。

  • 對於 k1、k4、k6、k7 來說,在高位點讀取完畢之後,這些記錄沒有變化過,所以這些數據是可以直接輸出的;
  • 對於改變過的數據,則需要將增量的數據合併到全量的數據中,只保留合併後的最終數據。例如,k2 最終的結果是 119 ,那麼只需要輸出 +(k2,119),而不需要中間發生過改變的數據。

通過這種方式,Chunk 最終的輸出就是在高位點是 chunk 中最新的數據。

上圖描述的是單個 Chunk 的一致性讀,但是如果有多個表分了很多不同的 Chunk,且這些 Chunk 分發到了不同的 task 中,那麼如何分發 Chunk 並保證全局一致性讀呢?

這個就是基於 FLIP-27 來優雅地實現的,通過下圖可以看到有 SourceEnumerator 的組件,這個組件主要用於 Chunk 的劃分,劃分好的 Chunk 會提供給下游的 SourceReader 去讀取,通過把 chunk 分發給不同的 SourceReader 便實現了併發讀取 Snapshot Chunk 的過程,同時基於 FLIP-27 我們能較為方便地做到 chunk 粒度的 checkpoint。

snapshot-Chunk

當 Snapshot Chunk 讀取完成之後,需要有一個彙報的流程,如下圖中橘色的彙報信息,將 Snapshot Chunk 完成信息彙報給 SourceEnumerator。

Chunk_report

彙報的主要目的是為了後續分發 binlog chunk (如下圖)。因為 Flink CDC 支持全量 + 增量同步,所以當所有 Snapshot Chunk 讀取完成之後,還需要消費增量的 binlog,這是通過下發一個 binlog chunk 給任意一個 Source Reader 進行單併發讀取實現的。

binlog-Chunk

對於大部分用戶來講,其實無需過於關注如何無鎖算法和分片的細節,瞭解整體的流程就好。

整體流程可以概括為,首先通過主鍵對錶進行 Snapshot Chunk 劃分,再將 Snapshot Chunk 分發給多個 SourceReader,每個 Snapshot Chunk 讀取時通過算法實現無鎖條件下的一致性讀,SourceReader 讀取時支持 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 讀取完成後,下發一個 binlog chunk 進行增量部分的 binlog 讀取,這便是 Flink CDC 2.0 的整體流程,如下圖所示:

Chunk_all

Flink CDC 是一個完全開源的項目,項目所有設計和源碼目前都已貢獻到開源社區,Flink CDC 2.0 也已經正式發佈,此次的核心改進和提升包括:

  • 提供 MySQL CDC 2.0,核心feature 包括

    • 併發讀取,全量數據的讀取性能可以水平擴展;
    • 全程無鎖,不對線上業務產生鎖的風險;
    • 斷點續傳,支持全量階段的 checkpoint。
  • 搭建文檔網站,提供多版本文檔支持,文檔支持關鍵詞搜索

筆者用 TPC-DS 數據集中的 customer 表進行了測試,Flink 版本是 1.13.1,customer 表的數據量是 6500 萬條,Source 併發為 8,全量讀取階段:

  • MySQL CDC 2.0 用時 13 分鐘;
  • MySQL CDC 1.4 用時 89 分鐘;
  • 讀取性能提升 6.8 倍。

為了提供更好的文檔支持,Flink CDC 社區搭建了文檔網站,網站支持對文檔的版本管理:

img

文檔網站支持關鍵字搜索功能,非常實用:

img

四、未來規劃

img

關於 CDC 項目的未來規劃,我們希望圍繞穩定性,進階 feature 和生態集成三個方面展開。

  • 穩定性

    • 通過社區的方式吸引更多的開發者,公司的開源力量提升 Flink CDC 的成熟度;
    • 支持 Lazy Assigning。Lazy Assigning 的思路是將 chunk 先劃分一批,而不是一次性進行全部劃分。當前 Source Reader 對數據讀取進行分片是一次性全部劃分好所有 chunk,例如有 1 萬個 chunk,可以先劃分 1 千個 chunk,而不是一次性全部劃分,在 SourceReader 讀取完 1 千 chunk 後再繼續劃分,節約劃分 chunk 的時間。
  • 進階 Feature

    • 支持 Schema Evolution。這個場景是:當同步數據庫的過程中,突然在表中添加了一個字段,並且希望後續同步下游系統的時候能夠自動加入這個字段;
    • 支持 Watermark Pushdown 通過 CDC 的 binlog 獲取到一些心跳信息,這些心跳的信息可以作為一個 Watermark,通過這個心跳信息可以知道到這個流當前消費的一些進度;
    • 支持 META 數據,分庫分表的場景下,有可能需要元數據知道這條數據來源哪個庫哪個表,在下游系統入湖入倉可以有更多的靈活操作;
    • 整庫同步:用戶要同步整個數據庫只需一行 SQL 語法即可完成,而不用每張表定義一個 DDL 和 query。
  • 生態集成

    • 集成更多上游數據庫,如 Oracle,MS SqlServer。Cloudera 目前正在積極貢獻 oracle-cdc connector;
    • 在入湖層面,Hudi 和 Iceberg 寫入上有一定的優化空間,例如在高 QPS 入湖的時候,數據分佈有比較大的性能影響,這一點可以通過與生態打通和集成繼續優化。

最後,歡迎大家加入 Flink CDC 用戶群一起交流。

img

附錄

[1] Flink-CDC 項目地址

[2] Flink-CDC 文檔網站

[3] Percona - MySQL 全局鎖時間分析

[4] DBLog - 無鎖算法論文

[5] Flink FLIP-27 設計文檔


實時數倉 Meetup 議題徵集

8 月 29 日左右 (時間暫定),Flink 社區計劃舉辦 Meetup 實時數倉專場,現徵集議題中!
關於實時數倉,大家的關注度一直很高,目前業界也有許多落地的公司。在 Meetup 實時數倉專場, 我們將更加註重 “交流”,希望將大家聚集在一起相互探討關於實時數倉的話題,重點在踩過的坑、碰到的痛點都是怎樣解決的~
現徵集實時數倉 Meetup 的議題,圍繞 “實時數倉踩坑痛點和避坑經驗”,歡迎各位老師和同學帶上貴公司的介紹,以及議題的初步大綱來找小松鼠。
公司不議大小,經驗才論足缺。我們會選取其中最具代表性的議題,邀請您參加實時數倉 Meetup 專場~ 你們的經驗對於其他技術開發者和 Flink 社區都很重要!

▼ 掃碼添加小松鼠微信 ▼
img


更多 Flink 相關技術問題,可掃碼加入社區釘釘交流群
第一時間獲取最新技術文章和社區動態,請關注公眾號~

image.png

活動推薦

阿里雲基於 Apache Flink 構建的企業級產品-實時計算Flink版現開啟活動:
99 元試用 實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定製T恤;另包 3 個月及以上還有 85 折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *