開發與維運

Flink 必知必會經典課程8:Flink Connector 詳解

作者|任慶盛

關於Flink Connector的詳解,本文將通過四部分展開介紹:

  1. 連接器
  2. Source API
  3. Sink API
  4. Collector的未來發展

一. 連接器Connecter的概述-Flink與外部系統的橋樑

1. 連接器 Connector

image.png

Flink的數據重要的來源和去向

連接器是Flink與外部系統間溝通的橋樑。如:我們需要從Kafka裡讀取數據,在Flink裡把數據處理之後再重新寫回到HIVE、elastic search這樣的外部系統裡去。

處理流程中的事件控制:事件處理水印(watermark),檢查點對齊記錄
負載均衡:根據不同併發的負載對數據分區進行合理的分配
數據解析與序列化:我們的數據在外部系統裡可能是以二進制的形式存儲的,在數據庫裡可能是以各種列的形式來存儲的。我們再把它讀到Flink裡後,需要對他進行一個解析,之後才能夠進行後面的數據處理。所以我們同樣在寫回外部系統的時候也需要對數據進行一個序列化的操作-把它轉換成外部系統裡對應的存儲格式來進行存儲。

image.png

上圖顯示的是一項十分典型的例子。
我們首先從kafka裡通過Source讀取其中的部分記錄。然後把這些記錄送到Flink當中的一些算子進行對應的運算,再通過Sink寫出到elastic search當中去,所以Source和Sink在這個Flink作業的兩端起到了一個接口的作用。

二. Source API- Flink數據的入口

1. Source 接口演進

image.png

Source在Flink 1.10版本之前是左側的這兩個接口:SourceFunction API(用來處理流式數據),InputFormat API( 用來處理批式數據)。在Flink 1.10之後,社區引入了一個新的Source API,對整個的Source進行了重構。那麼為什麼我們社區要做這樣的一個工作呢?

批流實現不一致:生態不斷壯大的過程中,舊的API暴露出來一些問題。其中最直觀的問題就是批流實現的不一致。
接口簡單但實現複雜:之前的API可能接口實現比較簡單,但實際上對於開發者來講,在實現這個接口的時候,所有的邏輯、所有的操作實現起來是非常複雜的,對於開發者來講也不夠友好。
因此,基於這些問題,在FLIP-27中提出了一個新的Source API的設計。其特點有二:
批流統一:流式數據處理和批式數據處理不需要再維護兩套代碼,用一套代碼就夠了。
實現簡單:Source API定義了很多概念上的抽象,雖然說這些抽象看起來會比較複雜,但是實際上是簡化了開發者操作的開發者開發工作。

2. 核心抽象

image.png

1) 記錄分片(Split)

有編號的記錄集合

以Kafka來舉例子。Kafka的分片既可以定義成一整個分區,也可以定義成一個分區裡的某一部分。比如說我從 offset 為 100的數據開始消費,到200號之間我們定義為一個分片;201~300定義成另外一個分片,這樣也是可以的。只要他是一個記錄的集合、我們給他一個唯一的編號,我們就可以定義這樣的一個記錄分片。

進度可追蹤

我們需要在這個分片當中記錄現在處理到了哪一個位置,我們在記錄檢查點的時候需要知道當前處理了哪些東西,便於一旦出現了故障,可以直接從故障中恢復起來。

記錄分片的所有信息

以Kafka舉例來講,一個分區的起始和終止位點等信息是都要包含在整個記錄分片裡的。因為我們在做Checkpoint的時候也是以記錄分片為單位的,所以說記錄分辨裡的信息也應該是自洽的。

2) 記錄分片枚舉器(Split Enumerator)

發現記錄分片:檢測外部系統中所存在的分片
分配記錄分片:Enumerator是處於一個協調者的角色存在的。它需要給我們的Source讀取器分配任務。
協調Source讀取器:例如某些讀取器的進度可能太快了,此時便要告訴他稍微慢一點兒來保證watermark大致是一致的。

3) Source讀取器(Source Reader)
從記錄分片讀取數據:根據枚舉器分配的記錄分片來讀取數據
事件時間水印處理:需要從我們從外部系統中讀下來的數據裡提取事件時間,然後做出對應的水印發送的操作。
數據解析:對從外部系統中讀取到的數據進行反序列化,發送至下游算子

3. 枚舉器-讀取器架構

image.png

分片枚舉器是運行在Job Master上面的,Source讀取器是運行在Task Executor上面的。因此,枚舉器是領導者、協調者的角色,讀取器是執行者的角色。
他們的檢查點存儲也是各自分開的,但之間會存在一些通信。比如說枚舉器是需要給讀取器來分配任務,也要通知讀取器後續沒有更多的分片需要處理。由於一個運行環境不一樣,他們兩個之間也不可避免地會存在一些網絡通信。便有了如下通訊棧的定義。

image.png

這個通訊棧上面確定了一些event來提供給開發者進行自己的實現。

首先,最上面這層是Source Event,留給開發者自己去定義一些客戶化的操作。比如假使現在設計的一個Source,可能reader在某些條件下可能要暫停讀取,那麼SplitEnumerator可以通過這種Source event的方式發送給Source Reader。

其次,再下面一層分別是叫Operator Coordinator,算子的協調者。它和真正去執行任務的算子通過Operator Event算子事件進行溝通的。我們已經事先定義好了一些算子事件,如添加分片、通知我們的leader沒有新的分片了等。這些對於所有的Source都通用的事件,是在Operator Event這一層來進行抽象的。

Address Lookup是用來定位消息應該發送給哪一個Operator的。因為Flink整個作業執行起來後會有一個加一個有向無環圖的。不同的算子可能運行在不同的Task Manager上面,那麼怎麼去找到對應的task、對應的算子便是這一層的任務。

由於網絡通信的存在,Job Master和Task Executor之間有一個RPC Gateway。所有的Event最終都會通過RPC Gateway、通過RPC調用的方式來進行網絡傳輸。

4. Source讀取器設計

為了簡化Source讀取器實踐步驟,減少開發者工作,社區已經為大家提供了SourceReaderBase。用戶在開發的時候可以直接繼承SourceReaderBase類,從而大大簡化開發者的一些開發工作。那麼我們接下來對 SourceReaderBase進行分析。看上去好像這張圖裡有非常多的組件,但實際上我們可以把它拆成兩部分來理解。

image.png

以中間elementQueue隊列作為界限,隊列左側用藍色標出來的部分是需要和外部系統打交道的組件,在elementQueue的右側用橙色標出來的部分是和Flink的引擎側打交道的部分。

首先,左側是由一個或者是多個分片的讀取器構成的,每一個reader通過一個Fetcher來驅動,多個Fetcher會統一由一個Fetcher Manager來管理。這裡的實現也有非常多種,比如說可以只開一個線程、只開這一個SplitReader,通過這一個讀取器來消費多個分區。此外,我們也可以根據需求,開多個線程-一個線程運行一個feature,進行一個reader,每個reader負責一個分區來並行的去消費數據。這些完全取決於用戶的實現、選擇。
出於性能考慮,每次SplitReader會從外部系統中取一批數據,把它們放到elementQueue裡。如圖所示,在這個藍色框子裡的是每次取下來的一批數據,而後橙色框是這一批數據下面的每條數據。

其次,elementQueue的右側是由RecordEmitter和SourceOutput組成的。RecordEmitter把每條記錄發送給下游的另外一個SourceOutput會把記錄輸出出去。每次RecordEmitter會從中間elementQueue裡拿一批數據下來,把它們一條一條發送到下游。由於RecordEmitter是由主線程來驅動的,該主線程現在的設計裡是用了一個無鎖的mailbox模型,它會把需要執行工作分成一個一個mail,每次工作線程從mailbox裡取出來一個mail然後來進行工作,所以我們應該注意,這裡的實現一定要是無阻塞的。

RecordEmitter每次往下游發送數據的同時會向下遊彙報-後面會不會還有後續的數據需要處理。與此同時呢,我們也會把當前這個分片的處理進度記錄在SplitStates當中,記錄它當前的狀態、處理到了什麼位置。

當SplitEnumerator在外部系統當中發現了新的分片,它需要通過RPC調用addSplits方法將新的分片添加讀取器。在SplitFetchermanager這一側會根據之前用戶已經選定的線程模型把新分片分配出去(如只有一個線程,那便會給這個線程分配一個新任務,再讓reader去讀取這個新的分片。如果整體是多線程的實現的,那便新建一個線程,新建一個reader來單獨去處理分片。同樣我們也要在SplitStates中記錄當前處理的這個進度是怎麼樣的。

5. 創建檢查點

image.png

接下來我們來看一下在新的Source API當中是怎麼處理檢查點的。
首先,左側我們的協調者,分片枚舉器。圖中所示,它目前手中還有一個分片(Split_5)沒有分配出去。中間箭頭部分是正在傳輸路上的一些分片。虛線是這個檢查點的邊界。我們可以看到二號分片已經在檢查點前面了,四號分片在檢查點後面,最下方的reader正在向SplitEnumerator請求一個新的分片。再看reader,三個reader分別已經分配到了某一些Split、也進行了一些處理,已經有Position了。那我們分別來看一下枚舉器和讀取器需要在檢查點的時候存儲哪些東西
· 枚舉器:未分配記錄分片(Split_5),已分配未存入檢查點記錄分片(Split_4)
· 讀取器:已被分配記錄分片(Split_0,1,3),記錄分配狀態(Split_2)

6. 三步簡單實現Source

1) Split/SplitState

  • Split:外部系統分片
  • SplitSerializer:序列化/反序列化Split傳遞給SourceReader
  • SplitState:Split狀態,用於Checkpoint與恢復

2) SplitEnumerator

  • 發現與訂閱Split
  • EnumState:Enumerator的狀態,用於Checkpoint與恢復
  • EnumStateSerializer:序列化/反序列化EnumState

3) SourceReader

  • SplitReader:與外部系統進行數據交互的接口
  • FetcherManager:選擇線程模型(目前已有)
  • RecordEmitter:轉換消息類型與處理事件時間

如果我們仔細去想一下就會發現,其實這些東西絕大多數都是和外部系統打交道的,也就是說和Flink引擎本身打交道的部分很少,用戶不再需要去擔心 checkpoint 鎖的問題,多線程的問題等等,能夠把更多的開發精力來集中在開發和外部系統交互的部分上。所以說,新的Source API是通過這些抽象來大大的簡化了開發者的開發。

三. Sink API- Flink數據的出口

如果對Flink有一定的瞭解的話會發現它可以做到精確一次的語義,數據既不重複也不丟失。那麼為了實現這個“精確一次”Flink也做了很多的工作,其中非常重要的一點就是在Sink端實現了二階段提交。

image.png

1. 預提交階段

在預提交階段裡,由於我們的這個分佈式系統一般是存在這種“協調者1+執行者n”的模式,那麼在預提交的預提交階段裡,首先我們的協調者是需要請求提交的,也就是說他需要給所有的執行者來發送請求提交的消息,從而來開始整個的二階段提交。
當執行者收到了請求提交的消息,他會做一些提交的準備工作。在所有的準備工作都做完之後,他所有的執行者會向這個協調者回復說明現在已經準備好進行下一步的提交工作了。當協調者收到了所有執行者的“可繼續”請求後,預提交階段結束,進入我們提交第二階段-提交執行階段。

2. 提交執行階段

image.png

提交者會向執行者發送決定提交的消息,執行者會把剛剛準備好的提交相關的東西來進行一個處理,來真正的去執行一個提交的動作。在完成之後會向協調者彙報一個回覆的結果,反饋提交是否正常執行。

一旦協調者決定進入第二個提交執行階段,所有的執行者必須要不打折扣地把命令執行下去。也就是說如果某個協調者在這一階段出了問題的話,他在恢復起來之後還是要把這個決定執行下去的。也就是說一旦決定提交,執行者便必須要把提交這一個動作貫徹下去。

image.png

如果在預提交階段某一個執行者準備提交的時候可能出現了一些故障等、沒有做正確的提交動作,那麼他可能向協調者會迴應了一個錯誤,比如網絡斷了,也可能經過一段時間超時之後協調者沒有收到這個三號執行者的迴應請求,那麼協調者就會觸發第二階段的回滾動作。也就是會告訴所有的執行者“這次提交嘗試失敗了,需要大家回滾到之前的狀態”。而後我們的執行者便會出現一個回滾動作,撤銷上一步操作。

3. 二階段提交在Flink中的做法

1)預提交階段

image.png

以這個文件系統的Sink來舉個例子。
文件系統的Sink在接收到了檢查點邊界之後做預提交動作(把當前的數據落盤寫到硬盤上的某一個臨時文件裡),當預提交階段完成之後,所有的operator會向我們的協調者回復 “已經準備好進行提交”的信息。

2) 提交執行階段
image.png

第二個階段,提交執行階段開啟。JobManager會向所有的算子發送提交執行的指令,Sink在接收到這個指令之後,便會真正的去做最後的提交動作。
我們還是以文件系統來來舉例子,那麼剛剛我們已經說過了,在預提交的階段數據被寫到了一個臨時文件裡,那麼在真正的進行提交的時候,臨時文件會被按照我們事先定義好的這個名字規範重命名,相當於實現了提交。
這裡要注意,臨時文件這一設置並非無用,它對後續可能發生的回滾等狀況具有鋪墊性的作用。我們是巧妙利用了二階段提交的機制來保障精確一次的語義。

4. Sink模型

image.png

1) Writer:負責在寫入或預提交的階段,把上游源源不斷的數據寫到中間的某一個狀態裡去。
2) Committable:上述所說的“中間的狀態”,是可以進行這個提交操作的元件。
3) Committer:把Committable真正的去提交上去
4) Global Commiter:全局提交器。這個組件是可選的、取決於你的外部系統。例:Iceberg。

四. 未來發展

image.png

  • 完善新Source

因為Source和Sink剛剛推出不久,所以說相對來講還是存在一些問題的。有些開發者可能會有一些新的需求、需要新的更新與提升。目前已經算一個相對穩定的狀態,但還是需要去不斷地完善。

  • 遷移現有連接器至新API

隨著流批一體連接器的不斷推進,所有的連接器會遷移到新的API上。

  • 連接器測試框架

連接器測試框架嘗試去給所有的connector提供一個相對來講比較一致、統一的測試標準。測試開發者不再需要去自己寫一些case、考慮各種各樣的測試環境、測試場景等等。讓我們的開發者能夠像搭積木一樣快速的用不同的場景,不同的用例來測試自己的代碼,從而把更多的開發精力集中在開發這個本身的邏輯上面,大大減少開發者的測試負擔。這也是Source API,Sink API和後續的framework研發的一致目標。是為了讓連接器開發更加簡單、門檻更低,從而吸引更多的開發者為Flink生態做貢獻。

活動推薦:

僅需99元即可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *