作者|崔星燦
本篇內容包含三部分展開介紹Stream Processing with Apache Flink:
- 並行處理和編程範式
- DataStream API概覽及簡單應用
- Flink 中的狀態和時間
一、並行處理和編程範式
眾所周知,對於計算密集型或數據密集型這樣需要計算量比較大的工作,並行計算或分而治之是解決這一類問題非常有效的手段。在這個手段中比較關鍵的部分是,如何對一個已有任務的劃分,或者說如何對計算資源進行合理分配。
舉例說明,上學期間老師有時會找同學來協助批閱考試試卷。假如卷子裡面一共有ABC三個題,那麼同學可能會有如下分工協作方式。
- 方式一:將所有試卷的三個題分別交給不同的人來批閱。這種方式,每個批閱的同學批自己負責的題目後就可以把試卷傳給下一個批閱同學,從而形成一種流水線的工作效果。因為總共只有三道題目,這種流水線的協作方式會隨著同學數量的增加而難以繼續擴展。
- 方式二:分工方式一的擴展,同一題目允許多個同學來共同批閱,比如A題目由兩個同學共同批閱,B題目由三個同學批閱,C題目只由一個同學批閱。這時候我們就需要考慮怎樣進一步的對計算任務做劃分。比如,可以把全部同學分成三組,第一組負責A題目,第二個組負責B題目第三個組負責C。第一個組的同學可以再次再組內進行分工,比如A組裡第一個同學批一半的卷子,第二個同學批另一半卷子。他們分別批完了之後,再將自己手裡的試卷傳遞給下一個組。
像上述按照試卷內題目進行劃分,以及講試卷本身進行劃分,就是所謂的計算的並行性和數據並行性。
我們可以用上面有向無環圖來表示這種並行性。
在圖中,批閱A題目的同學,假設還承擔了一些額外任務,比如把試卷從老師的辦公室拿到批閱試卷的地點;負責C題的同學也額外任務,就是等所有同學把試卷批完後,進行總分的統計和記錄上交的工作。據此,可以把圖中所有的節點劃分為三個類別。第一個類別是Source,它們負責獲取數據(拿試卷);第二類是數據處理節點,它們大多時候不需要和外部系統打交道;最後一個類別負責將整個計算邏輯寫到某個外部系統(統分並上交記錄)。這三類節點分別就是Source節點、Transformation節點和Sink節點。DAG圖中,節點表示計算,節點之間的連線代表計算之間的依賴。
關於編程的一些內容
假設有一個數據集,其中包含1~10十個數字,如果把每一個數字都乘以2並做累計求和操作(如上圖所示)怎麼操作呢?辦法有很多。
如果用編程來解決有兩個角度:第一種是採取命令式編程方式,一步一步的相當於告訴機器應該怎樣生成一些數據結構,怎樣的用這些數據結構去存儲一些臨時的中間結果,怎樣把這些中間結果再轉換成為最終的結果,相當於一步一步告訴機器如何去做;第二種是聲明的方式,聲明式編程裡通常只需要告訴機器去完成怎樣的任務,而不需要像命令式那樣詳細傳遞。例如我們可以把原有的數據集轉化成一個Stream,然後再把 Stream轉化成一個Int類型的Stream,在此過程中,把每一個數字都乘2,最後再調用 Sum方法,就可以獲得所有數字的和。
聲明式編程語言的代碼更簡潔,而簡潔的開發方式,正是計算引擎追求的效果。所以在 Flink 裡所有與任務編寫相關的API,都是偏向聲明式的。
二、DataStream API概覽及簡單應用
在詳細介紹DataStream API之前,我們先來看一下 Flink API的邏輯層次。
在舊版本的 Flink 裡,它的API層次遵循上圖左側這樣四層的關係。最上層表示我們可以用比較高級的API,或者說聲明程度更高的Table API以及SQL的方式來編寫邏輯。所有SQL和Table API編寫的內容都會被Flink內部翻譯和優化成一個用DataStream API實現的程序。再往下一層,DataStream API的程序會被表示成為一系列Transformation,最終 Transformation會被翻譯成JobGraph(即上文介紹的DAG)。
而在較新版本的 Flink 裡發生了一些改變,主要的改變體現在 Table API 和 SQL 這一層上。它不再會被翻譯成 DataStream API 的程序,而是直接到達底層 Transformation 一層。換句話說,DataStream API 和 Table API 這兩者的關係,從一個下層和上層的關係變為了一個平級的關係,這樣流程的簡化,會相應地帶來一些查詢優化方面的好處。
接下來我們用一個簡單的 DataStream API 程序作為示例來介紹,還是上文乘2再求和的需求。
如果用 Flink 表示,它的基本代碼如上圖所示。看上去比單機的示例要稍微的複雜一點,我們一步一步來分解看。
- 首先,用 Flink 實現任何功能,一定要獲取一個相應的運行環境,也就是 Sream Execution Environment;
- 其次,在獲取環境後,可以調用環境的 add Source 方法,來為邏輯添加一個最初始數據源的輸入;設置完數據源後可以拿到數據源的引用,也就是 Data Source 對象;
- 最後,可以調用一系列的轉換方法來對 Data Source 中的數據進行轉化。
這種轉化如圖所示,就是把每個數字都×2,隨後為了求和我們必須利用 keyBy 對數據進行分組。傳入的常數表示把所有的數據都分到一組裡邊,最後再對這個組裡邊的所有的數據,按照第一個字段進行累加,最終得到結果。在得到結果後,不能簡單的像單機程序那樣把它輸出,而是需要在整個邏輯裡面加一個的 Sink 節點,把所有的數據寫到目標位置。上述工作完成後,要去調用 Environment 裡面 Execute 方法,把所有上面編寫的邏輯統一提交到遠程或者本地的一個集群上執行。
Flink DataStream API 編寫程序和單機程序最大的不同就在於,它前幾步的過程都不會觸發數據的計算,而像在繪製一個 DAG 圖。等整個邏輯的 DAG 圖繪製完畢之後,就可以通過 Execute 方法,把整個的圖作為一個整體,提交到集群上去執行。
介紹到這裡,就把Flink DataStream API和DAG圖聯繫在一起了。 事實上,Flink 任務具體的產生過程比上面描述的要複雜得多,它要經過一步步轉化和優化等,下圖展示了Flink 作業的具體生成過程。
DataStream API裡提供的轉換操作
就像上文在示例代碼中展示的,每一個 DataStream對象,在被調用相應方法的時候,都會產生一個新的轉換。相應的,底層會生成一個新的算子,這個算子會被添加到現有邏輯的DAG圖中。相當於添加一條連線來指向現有DAG圖的最後一個節點。所有的這些API在調動它的時候都會產生一個新的對象,然後可以在新的對象上去繼續調用它的轉換方法。就是像這種鏈式的方式,一步一步把這個DAG圖給畫出來。
上述解釋涉及到了一些高階函數思想。每去調用 DataStream上的一個轉換時,都需要給它傳遞的一個參數。換句話說,轉換決定了你想對這個數據進行怎樣的操作,而實際傳遞的包在算子裡面的函數決定了轉換操作具體要怎樣完成。
上圖中,除了左邊列出來的 API, Flink DataStream API 裡面還有兩個非常重要的功能,它們是 ProcessFunction以及 CoProcessFunction。這兩個函數是作為最底層的處理邏輯提供給用戶使用的。上圖所有左側藍色涉及的轉換,理論上來講都可以用底層的ProcessFunction和CoProcessFunction去完成。
關於數據分區
數據分區是指在傳統的批處理中對數據Shuffle的操作。如果把撲克牌想成數據,傳統批處理裡的Shuffle操作就相當於理牌的過程。一般情況下在抓牌過程中,我們都會把牌理順排列好,相同的數字還要放在一起。這樣做最大的好處是,出牌時可以一下子找到想出的牌。Shuffle是傳統的批處理的方式。因為流處理所有的數據都是動態來的,所以理牌的過程或者說處理數據,進行分組或分區的過程,也是在線來完成的。
例如上圖右側所示,上游有兩個算子A的處理實例,下游是三個算子B處理實例。這裡展示的流處理等價於Shuffle的操作被稱為數據分區或數據路由。它用來表示A處理完數據後,要把結果發到下游B的哪個處理實例上。
Flink 裡提供的分區策略
圖X是 Flink 提供的分區策略。需要注意的是, DataStream調用keyBy方法後,可以把整個數據按照一個Key值進行分區。但要嚴格來講,其實keyBy並不算是底層物理分區策略,而是一種轉換操作,因為從API角度來看,它會把DataStream轉化成 KeyedDataStream的類型,而這兩者所支持的操作也有所不同。
所有這些分區策略裡,稍微難理解的可能是Rescale。Rescale涉及到上下游數據本地性的問題,它和傳統的Rebalance,即Round-Pobin,輪流分配類似。區別在於Rescale是它會盡量避免數據跨網絡的傳輸。
如果所有上述的分區策略都不適用的話,我們還可以自己調用 PartitionCustom去自定義一個數據的分區。值得注意的是,它只是自定義的單播,即對每一個數據只能指定它一個下游所要發送的實例,而沒有辦法把它複製成多份發送到下游的多個實例中。
Flink支持的連接器
上文介紹過,圖X裡有兩個關鍵的節點:A節點,需要去連接外部系統,從外部系統把數據讀取到 Flink的處理集群裡;C節點,即Sink節點,它需要彙總處理完的結果,然後把這個結果寫入到某個外部系統裡。這裡的外部系統可以是一個文件系統,也可以是一個數據庫等。
Flink 裡的計算邏輯可以沒有數據輸出,也就是說可以不把最終的數據寫出到外部系統,因為Flink裡面還有一個State的狀態的概念。在中間計算的結果實際上是可以通過 State暴露給外部系統,所以允許沒有專門的Sink。但每一個 Flink 應用都肯定有Source,也就是說必須從某個地方把數據讀進來,才能進行後續的處理。
關於 Source和Sink兩類連接器需要關注的點如下:
- 對於Sourse而言,我們往往比較關心是否支持續監測並接入數據更新,然後把相應的更新數據再給傳輸到這個系統當中來。舉例來說,Flink對於文件有相應的FileSystem連接器,例如CSV文件。CSV文件連接器在定義時,可以通過參數指定是否持續監測某個目錄的文件變化,並接入更新後的文件。
- 對於Sink來講,我們往往關心要寫出的外部系統是否支持更新已經寫出的結果。比如要把數據寫到Kafka裡,通常情況下數據寫入是一種Append-Only,即不能修改已經寫入系統裡的記錄(社區正在利用Kafka Compaction實現Upsert Sink);如果是寫入數據庫,那麼通常可以支持利用主鍵對現有數據進行更新。
以上兩個特性,決定了Flink 裡連接器是面向靜態數據還是面向動態的數據的關鍵點。
提醒,上面截圖是 Flink 1.11版本之後的文檔,連接器在 Flink 1.11 版本里有較大重構。另外,關於Table、SQL、API這個層面的連接器,比起DataStream層面的連接器,會承擔更多的任務。比如是否支持一些謂詞或投影操作的下推等等。這些功能可以幫助提高數據處理的整體性能。
三、Flink 中的狀態和時間
如果想要深入地瞭解DataStream API,狀態和時間是必須掌握的要點。
所有的計算都可以簡單地分為無狀態計算和有狀態計算。無狀態計算相對而言比較容易。假設這裡有個加法算子,每進來一組數據,都把它們全部加起來,然後把結果輸出去,有點純函數的味道。純函數指的是每一次計算結果只和輸入數據有關,之前的計算或者外部狀態對它不會產生任何影響。
這裡我們主要講一下Flink裡邊的有狀態計算。用撿樹枝的小遊戲來舉例。這個遊戲在我看來做的非常好的一點是它自己記錄了非常多的狀態,比如幾天沒上線,然後再去和裡邊的 NPC對話的時候,它就會告訴你已經有好久沒有上線了。換句話說,它會把之前上線的時間作為一種狀態給記錄下來,在生成它NPC對話的時候,是會受到這個狀態的影響。
實現這種有狀態的計算,要做的一點就是把之前的狀態記錄下來,然後再這個狀態注入到新的一次計算中,具體實現方式也有下面兩種:
- 第一種,把狀態數據進入算子之前就給提取出來,然後把這個狀態數據和輸入數據合併在一起,再把它們同時輸入到算子中,得到一個輸出。這種方式是被用在 Spark的StructureStreaming裡邊。其好處是是可以重用已有的無狀態算子。
- 第二種,是 Flink 現在的方法,就是算子本身是有狀態的,算子在每一次到新數據之後做計算的時候,同時考慮新輸數據和已有的狀態對計算過程的影響,最終把結果輸出出去。
計算引擎也應該像上面提到的遊戲一樣變得越來越智能,可以自動學習數據中潛在的規律,然後來自適應地優化計算邏輯,保持較高的處理性能。
Flink 的狀態原語
Flink的狀態原語涉及如何通過代碼使用 Flink的狀態。其基本思想是在編程的時候拋棄原生語言(例如Java或Scala)提供的數據容器,把它們更換為 Flink 裡面的狀態原語。
作為對狀態支持比較好的系統, Flink 內部提供了可以使用的很多種可選的狀態原語。從大的角度看,所有狀態原語可以分為Keyed State和Operator State兩類。Operator State應用相對比較少,我們在這裡不展開介紹。下面重點看一下Keyed State。
Keyed State,即分區狀態。分區狀態的好處是可以把已有狀態按邏輯提供的分區分成不同的塊。塊內的計算和狀態都是綁定在一起的,而不同的Key值之間的計算和狀態的讀寫都是隔離的。對於每個Key值,只需要管理好自己的計算邏輯和狀態就可以了,不需要去考慮其它Key值所對應的邏輯和狀態。
Keyed State可以進一步劃分為下面的5類,它們分別是:
- 比較常用的:ValueState、ListState、MapState
- 不太常用的:ReducingState和AggregationState
Keyed State只能在RichFuction中使用,RichFuction與普通、傳統的Function相比,最大的不同就是它有自己的生命週期。Key State的使用方法分為以下四個步驟:
- 第一步,將 State聲明為RichFunction裡的實例的變量
- 第二步,在RichFunction對應的 open方法中,為 State進行一個初始化的賦值操作。賦值操作要有兩步:先創建一個StateDescriptor,在創建中需要給State指定一個名稱;然後再去調用RichFuntion中的getRuntimeContext().getState(…),把剛剛定義的StateDescriptor傳進去,就可以獲取State。
提醒:如果此流式應用是第一次運行,那麼獲得的State會是空內容的;如果State是從某個中間段重啟的,它會根據配置和之前保存的數據的基礎上進行恢復。
- 第三步,得到State對象後,就可以在RichFunction裡,對對應的State進行讀寫。如果是ValueState,可以調用它的Value方法來獲取對應值。Flink 框架會控制好所有狀態的併發訪問,並進行限制,所以用戶不需要考慮併發的問題。
Flink 的時間
時間也是 Flink非常重要的一點,它和State是相輔相成的。總體來看 Flink引擎裡邊提供的時間有兩類:第一類是Processing Time;第二類是Event Time。Processing Time表示的是真實世界的時間,Event Time是數據當中包含的時間。數據在生成的過程當中會攜帶時間戳之類的字段,因為很多時候需要將數據裡攜帶的時間戳作為參考,然後對數據進行分時間的處理。
Processing Time處理起來相對簡單,因為它不需要考慮亂序等問題;而Event Time處理起來相對複雜。而由於Processing Time在使用時是直接調取系統的時間,考慮到多線程或分佈式系統的不確定性,所以它每次運行的結果可能是不確定的;相反,因為Event Time時間戳是被寫入每一條數據裡的,所以在重放某個數據進行多次處理的時候,攜帶的這些時間戳不會改變,如果處理邏輯沒有改變的話,最後的結果也是比較確定的。
Processing Time和Event Time的區別。
以上圖的數據為例,按照1~7的時間來排列的。對於機器時間而言,每個機器的時間會單調增加。在這種情況下,用Processing Time獲得的時間是完美的按照時間從小到大排序的數據。對於Event Time而言,由於延遲或分佈式的一些原因,數據到來的順序可能和它們真實產生的順序有一定的出入,數據可能存在著一定程度的亂序。這時就要充分利用數據裡邊攜帶的時間戳,對數據進行一個粗粒度的劃分。例如可以把數據分為三組,第一組裡最小的時間是1,第二組最小的時間是4,第三組最小的時間是7。這樣劃分之後,數據在組和組之間就是按從小到大的順序排列好的。
怎樣充分的把一定程度的亂序化解掉,讓整個的系統看上去數據進來基本上是有順序的?一種解決方案是在數據中間插入被稱為Watermark的meta數據。在上圖的例子中,前三個數據到來之後,假設再沒有小於等於3的數據進來了,這時就可以插入一條Watermark 3到整個數據裡,系統在看到Watermark 3時就知道,以後都不會有小於或等於3的數據過來了,這時它就可以放心大膽地進行自己的一些處理邏輯。
總結一下,Processing Time在使用時,是一個嚴格遞增的;而Event Time會存在一定的亂序,需要通過Watermark這種辦法對亂序進行一定緩解。
從API的角度來看,怎樣去分配Timestamp或生成Watermark也比較容易,有兩種方式:
第一種,在SourceFunction當中調用內部提供的 collectWithTimestamp方法,把包含時間戳的數據提取出來;還可以在SourceFunction中使用 emitWatermark方法去產生一個Watermark,然後插入到數據流中。
第二種,如果不在SourceFunction中可以調用DateStream.assignTimestampsAndWatermarks這個方法,同時傳入兩類Watermark生成器:
第一類是定期生成,相當在環境裡通過配置一個值,比如每隔多長時間(指真實時間)系統會自動調用Watermar生成策略。
第二類是根據特殊記錄生成,如果遇到一些特殊數據,可以採取AssignWithPunctuatedWatermarks這個方法來進行時間戳和Watermark的分配。
提醒:Flink 裡內置了一些常用的Assigner,即WatermarkAssigner。比如針對一個固定數據,它會把這個數據對應的時間戳減去固定的時間作為一個Watermark。關於Timestamp分配和Watermark生成接口,在後續的版本可能會有一定的改動。 注意,新版本的Flink裡面已經統一了上述兩類生成器。
時間相關API
Flink 在編寫邏輯時會用到的與時間相關的 API,下圖總結了Event Time和Processing Time相對應的API。
在應用邏輯裡通過接口支持可以完成三件事:
- 第一,獲取記錄的時間。Event Time可以調context.getTimestamp,或在SQL算子內從數據字段中把對應的時間給提取出來。Processing Time可以直接調currentProcessingTime完成調取,它的內部是直接調用了獲取系統時間的靜態方法來返回的值。
- 第二,獲取Watermark。其實只有在Event Time裡才有Watermark的概念,而Processing Time裡是沒有的。但在Processing Time中非要把某個東西當成Watermark,其實就是數據時間本身。也就是說第一次調用timerService.currentProcessingTime方法之後獲取的值。這個值既是當前記錄的這個時間,也是當前的Watermark值,因為時間總是往前流動的,第一次調用了這個值後,第二次調用時這個值肯定不會再比第一次值還小。
- 第三,註冊定時器。定時器的作用是清理。比如需要對一個cache在未來某個時間進行清理工作。既然清理工作應該發生在未來的某個時間點,那麼可以調用 timerServicerEventTimeTimer或ProcessingTimeTimer方法註冊定時器,再在整個方法裡添加一個對定時器回調的處理邏輯。當對應的Event Time或者Processing Time的時間超過了定時器設置時間,它就會調用方法自己編寫定時器的毀掉邏輯。
以上就是關於StreamProcess with Apache Flink的介紹,下一篇內容將著重介紹Flink Runtime Architecture。
活動推薦:
僅需99元即可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506