作者:fanrui
現如今想閱讀 HashMap 源碼實際上比較簡單,因為網上一大堆博客去分析 HashMap 和 ConcurrentHashMap。而本文是全網首篇詳細分析 CopyOnWriteStateTable 源碼的博客,閱讀複雜集合類源碼的過程是相當有挑戰的,筆者在剛開始閱讀也遇到很多疑問,最後一一解決了。本文有一萬兩千多字加不少的配圖,實屬不易。
詳細閱讀完本文,無論是針對面試還是開闊視野一定會對大家有幫助的。
聲明:筆者的源碼分析都是基於 flink-1.9.0 release 分支,其實閱讀源碼不用非常在意版本的問題,各版本的主要流程基本都是類似的。如果熟悉了某個版本的源碼,之後新版本有變化,我們重點看一下變化之處即可。
本文主要講述 Flink 中 CopyOnWriteStateTable 相關的知識,當使用 MemoryStateBackend 和 FsStateBackend 時,默認情況下會將狀態數據保存到 CopyOnWriteStateTable 中。CopyOnWriteStateTable 中保存多個 KeyGroup 的狀態,每個 KeyGroup 對應一個 CopyOnWriteStateMap。
CopyOnWriteStateMap 是一個類似於 HashMap 的結構,但支持了兩個非常有意思的功能:
- hash 結構為了保證讀寫數據的高性能,都需要有擴容策略,CopyOnWriteStateMap 的擴容策略是一個漸進式 rehash 的策略,即:不是一下子將數據全遷移的新的 hash 表,而是慢慢去遷移數據到新的 hash 表中。
- Checkpoint 時 CopyOnWriteStateMap 支持異步快照,即:Checkpoint 時可以在做快照的同時,仍然對 CopyOnWriteStateMap 中數據進行修改。問題來了:數據修改了,怎麼保證快照數據的準確性呢?
瞭解 Redis 的同學應該知道 Redis 也是一個大的 hash 結構,擴容策略也是漸進式 rehash。Redis 的 RDB 在持久化數據的過程中同時也是對外服務的,對外服務意味著數據可能被修改,那麼 RDB 如何保證持久化好的數據一定是正確的呢?
舉個例子:17 點00分00秒 RDB 開始持久化數據,過了 1 秒 Redis 中某條數據被修改了,過了一分鐘 RDB 才持久化結束。RDB 預期的持久化結果應該是 17 點00分00秒那一刻 Redis 的完整快照,請問持久化過程中那些修改操作是否會影響 Redis 的快照。答:當然可以做到不影響。
Flink 在 Checkpoint 時的快照與 Redis 類似,都是想在快照時依然對外提供服務,減少服務停頓時間。Flink 具體如何實現上述功能的呢?帶著問題詳細閱讀下文。
1.StateTable 簡介
MemoryStateBackend 和 FsStateBackend 的 KeyedStateBackend 都使用 HeapKeyedStateBackend 存儲數據,HeapKeyedStateBackend 持有 Map> registeredKVStates 來存儲 StateName 與具體 State 的映射關係。registeredKVStates 的 key 就是 StateName,value 為具體的 State 數據。具體 State 的數據存儲在 StateTable 中。
StateTable 有兩個實現:CopyOnWriteStateTable 和 NestedMapsStateTable。
- CopyOnWriteStateTable 屬於 Flink 自己定製化的數據結構,Checkpoint 時支持異步 Snapshot。
- NestedMapsStateTable 直接嵌套 Java 的兩層 HashMap 來存儲數據,Checkpoint 時需要同步快照。
下面詳細介紹 CopyOnWriteStateTable。
2.CopyOnWriteStateTable
StateTable 中持有 StateMap[] keyGroupedStateMaps 真正的存儲數據。StateTable 會為每個 KeyGroup 的數據初始化一個 StateMap 來對 KeyGroup 做數據隔離。對狀態進行操作時,StateTable 會先根據 key 計算對應的 KeyGroup,拿到相應的 StateMap,才能對狀態進行操作。
CopyOnWriteStateTable 中使用 CopyOnWriteStateMap 存儲數據,這裡主要介紹 CopyOnWriteStateMap 的實現。CopyOnWriteStateMap 中就是一個數組 + 鏈表構成的 hash 表。
CopyOnWriteStateMap 中元素類型都是是:StateMapEntry。hash 表的第一層先是一個 StateMapEntry 類型的數組,即:StateMapEntry[]。在 StateMapEntry 類中有個 StateMapEntry next 指針構成鏈表。
CopyOnWriteStateMap 相比普通的 hash 表,有以下幾點需要重點關注:
- CopyOnWriteStateMap 的擴容策略是漸進式 rehash,而不是一下子擴容完
- 為了支持異步的 Snapshot,需要將 Snapshot 時 StateMap 的快照保存下來,具體的保存策略怎麼實現的?
- 為了支持 CopyOnWrite 功能,所以在修改數據時,要進行一系列 copy 的操作,不能修改原始數據,否則會影響 Snapshot。
- Snapshot 異步快照流程及 Snapshot 完成時,如何 release 掉舊版本數據?
3.CopyOnWriteStateMap 的漸進式 rehash 策略
漸進式 rehash 策略表示 CopyOnWriteStateMap 中當前有一個 hash 表對外服務,但是當前 hash 表中元素太多需要擴容了,需要將數據遷移到一個容量更大的 hash 表中。
Java 的 HashMap 在擴容時會一下子將舊 hash 表中所有數據都移動到大 hash 表中,這樣的策略存在的問題是如果 HashMap 當前存儲了 1 G 的數據,那麼瞬間需要將 1 G 的數據遷移完,可能會比較耗時。而 CopyOnWriteStateMap 在擴容時,不會一下子將數據全部遷移完,而是在每次操作 CopyOnWriteStateMap 時,慢慢去遷移數據到大的 hash 表中。
例如:可以在每次 get、put 操作時,遷移 4 條數據到大 hash 表中,這樣經過一段時間的 get 和 put 操作,所有的數據就能遷移完成。所以漸進式 rehash 策略,會分很多次將所有的數據遷移到新的 hash 表中。
3.1 擴容簡述
在內存中有兩個 hash 表,一個是 primaryTable 作為主桶,一個是 rehashTable 作為擴容期間用的桶。初始階段只有 primaryTable,當 primaryTable 中元素個數大於設定的閾值時,就要開始擴容。
擴容過程:申請一個相比 primaryTable 容量大一倍的 hash 表保存到 rehashTable 中,慢慢地將 primaryTable 中的元素遷移到 rehashTable 中。對應到源碼中:putEntry 方法中判斷 size() > threshold 時,會調用 doubleCapacity 方法申請新的 hash 表賦值給 rehashTable。
如下圖所示 primaryTable 中桶的個數為 4,rehashTable 中桶的個數為 8。
擴容時 primaryTable 中 0 位置上的元素會遷移到 rehashTable 的 0 和 4 位置上,同理 primaryTable 中 1 位置上的元素會遷移到 rehashTable 的 1 和 5 位置上。
3.2 選擇 Table 的策略
假設 primaryTable 中 0 桶的數據已經遷移到 rehashTable 桶了,那麼之後無論是 put 還是 get 操作 0 桶的數據,那麼都會去操作 rehashTable。而 1、2、3 桶還未遷移,所以 1、2、3 桶還需要操作 primaryTable 桶。對應到源碼中會有一個選桶的操作,選擇到底使用 primaryTable 還是 rehashTable。
源碼實現如下所示:
// 選擇當前元素到底使用 primaryTable 還是 incrementalRehashTable
private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {
// 計算 hashCode 應該被分到 primaryTable 的哪個桶中
int curIndex = hashCode & (primaryTable.length - 1);
// 大於等於 rehashIndex 的桶還未遷移,應該去 primaryTable 中去查找。
// 小於 rehashIndex 的桶已經遷移完成,應該去 incrementalRehashTable 中去查找。
return curIndex >= rehashIndex ? primaryTable : incrementalRehashTable;
}
首先通過 int curIndex = hashCode & (primaryTable.length - 1); 計算當前 hashCode 應該分到 primaryTable 的哪個桶中。
rehashIndex 用來標記當前 rehash 遷移的進度,即:rehashIndex 之前的數據已經從 primaryTable 遷移到 rehashTable 桶中。假設 rehashIndex = 1,表示 primaryTable 1 桶之前的數據全部遷移完成了,即:0 桶數據全部遷移完了。
策略:大於等於 rehashIndex 的桶還未遷移,應該去 primaryTable 中去查找。小於 rehashIndex 的桶已經遷移完成,應該去 incrementalRehashTable 中去查找。
3.3 遷移過程
每次有 get、put、containsKey、remove 操作時,都會調用 computeHashForOperationAndDoIncrementalRehash 方法觸發遷移操作。
computeHashForOperationAndDoIncrementalRehash 方法作用:
- 檢測是否處於 rehash 中,如果正在 rehash 就會調用 incrementalRehash 遷移一波數據
- 計算 key 和 namespace 對應的 hashCode
重點關注 incrementalRehash 方法實現:
private void incrementalRehash() {
StateMapEntry<K, N, S>[] oldMap = primaryTable;
StateMapEntry<K, N, S>[] newMap = incrementalRehashTable;
int oldCapacity = oldMap.length;
int newMask = newMap.length - 1;
int requiredVersion = highestRequiredSnapshotVersion;
int rhIdx = rehashIndex;
// 記錄本次遷移了幾個元素
int transferred = 0;
// 每次至少遷移 MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 個元素到新桶、
// MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 默認為 4
while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
// 遍歷 oldMap 的第 rhIdx 個桶
StateMapEntry<K, N, S> e = oldMap[rhIdx];
// 每次 e 都指向 e.next,e 不為空,表示當前桶中還有元素未遍歷,需要繼續遍歷
// 每次遷移必須保證,整個桶被遷移完,不能是某個桶遷移到一半
while (e != null) {
// 遇到版本比 highestRequiredSnapshotVersion 小的元素,則 copy 一份
if (e.entryVersion < requiredVersion) {
e = new StateMapEntry<>(e, stateMapVersion);
}
// 保存下一個要遷移的節點節點到 n
StateMapEntry<K, N, S> n = e.next;
// 遷移當前元素 e 到新的 table 中,插入到鏈表頭部
int pos = e.hash & newMask;
e.next = newMap[pos];
newMap[pos] = e;
// e 指向下一個要遷移的節點
e = n;
// 遷移元素數 +1
++transferred;
}
oldMap[rhIdx] = null;
// rhIdx 之前的桶已經遷移完,rhIdx == oldCapacity 就表示遷移完成了
// 做一些初始化操作
if (++rhIdx == oldCapacity) {
XXX
return;
}
}
// primaryTableSize 中減去 transferred,增加 transferred
primaryTableSize -= transferred;
incrementalRehashTableSize += transferred;
rehashIndex = rhIdx;
}
incrementalRehash 方法中第一層 while 循環用於控制每次遷移的最小元素個數。然後遍歷 oldMap 的第 rhIdx 個桶,e 指向當前遍歷的元素,每次 e 都指向 e.next,e 不為空,表示當前桶中還有元素未遍歷,需要繼續遍歷。每次遷移必須保證,整個桶被遷移完,不能是某個桶遷移到一半。
遷移過程中,將當前元素 e 重新計算 hash 值,插入到 newMap 相應桶的頭部(頭插法)。其中 e.entryVersion < requiredVersion 時,需要創建一個新的 Entry,這裡是為了支持 CopyOnWrite 功能,下面會介紹。
4.StateMap 的 Snapshot 策略
StateMap 的 Snapshot 策略是指:為了支持異步的 Snapshot,需要將 Snapshot 時 StateMap 的快照保存下來。
傳統的方法就是將 StateMap 的全量數據在內存中深拷貝一份,然後拷貝的這一份數據去慢慢做快照,原始的數據可以對外服務。但是深拷貝需要拷貝所有的真實數據,所以效率會非常低。為了提高效率,Flink 只是對數據進行了淺拷貝。
4.1 淺拷貝原理分析
淺拷貝就是隻拷貝引用,不拷貝數據。
假如 StateMap 沒有處於擴容中,Snapshot 流程相對比較簡單,創建一個新的 snapshotData,直接將 primaryTable 的數據拷貝到 snapshotData 中即可。
如圖所示,對於淺拷貝可以理解為兩個 Table 的 0 號桶中都引用的同一個鏈表,也就是將 snapshotData 指向圖中的 Entry a 即可。其他桶的淺拷貝也是類似,就不一一畫圖了。
假如 StateMap 當前處於擴容中,Snapshot 流程相對比較繁瑣,創建一個新的 snapshotData,需要將 primaryTable 和 rehashTable 的數據都拷貝到 snapshotData 中。
如圖所示,將原始兩個 Table 數據拷貝到 snapshotData 中,但是 snapshotData 數組的長度並不是 primaryTable 的長度 + rehashTable 的長度。而是分別計算 primaryTable 和 rehashTable 中有幾個桶中有數據。例如上圖案例所示,primaryTable 中有 3 個桶中有元素,rehashTable 中有 2 個桶中有元素,所以snapshotData 的桶數量為 5 即可,沒必要 4 + 8 = 12 個桶。
上圖中也是省略了 Entry,Entry 引用的淺拷貝與之前沒有擴容的情況類似。
4.2 淺拷貝源碼詳解
首先調用 CopyOnWriteStateTable 的 stateSnapshot 方法對整個 StateTable 進行快照。stateSnapshot 方法會創建 CopyOnWriteStateTableSnapshot,CopyOnWriteStateTableSnapshot 的構造器中會調用 CopyOnWriteStateTable 的 getStateMapSnapshotList 方法。
getStateMapSnapshotList 方法源碼如下所示:
List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() {
List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList =
new ArrayList<>(keyGroupedStateMaps.length);
// 調用所有 CopyOnWriteStateMap 的 stateSnapshot 方法
// 生成 CopyOnWriteStateMapSnapshot 保存到 list 中
for (int i = 0; i < keyGroupedStateMaps.length; i++) {
CopyOnWriteStateMap<K, N, S> stateMap =
(CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i];
snapshotList.add(stateMap.stateSnapshot());
}
return snapshotList;
}
CopyOnWriteStateTable 中為每個 KeyGroup 維護了一個 StateMap 到 keyGroupedStateMaps 中,getStateMapSnapshotList 方法會調用所有 CopyOnWriteStateMap 的 stateSnapshot 方法。
CopyOnWriteStateMap 的 stateSnapshot 方法相關源碼如下所示:
public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {
return new CopyOnWriteStateMapSnapshot<>(this);
}
CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {
super(owningStateMap);
// 對 StateMap 的數據進行淺拷貝,生成 snapshotData
this.snapshotData = owningStateMap.snapshotMapArrays();
// 記錄當前的 StateMap 版本到 snapshotVersion 中
this.snapshotVersion = owningStateMap.getStateMapVersion();
this.numberOfEntriesInSnapshotData = owningStateMap.size();
}
CopyOnWriteStateMap 的 stateSnapshot 方法會創建 CopyOnWriteStateMapSnapshot,CopyOnWriteStateMapSnapshot 的構造器中會調用 StateMap 的 snapshotMapArrays 方法對 StateMap 的數據進行淺拷貝生成 snapshotData。且將當前的 StateMap 版本到 snapshotVersion 中。
StateMap 的 snapshotMapArrays 方法對淺拷貝原理進行了代碼實現,代碼如下所示:
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 當前 StateMap 的 version
private int stateMapVersion;
// 所有 正在進行中的 snapshot 的 version
private final TreeSet<Integer> snapshotVersions;
// 正在進行中的那些 snapshot 的最大版本號
private int highestRequiredSnapshotVersion;
StateMapEntry<K, N, S>[] snapshotMapArrays() {
// 1、stateMapVersion 版本 + 1,賦值給 highestRequiredSnapshotVersion,
// 並加入snapshotVersions
synchronized (snapshotVersions) {
++stateMapVersion;
highestRequiredSnapshotVersion = stateMapVersion;
snapshotVersions.add(highestRequiredSnapshotVersion);
}
// 2、 將現在 primary 和 Increment 的元素淺拷貝一份到 copy 中
// copy 策略:copy 數組長度為 primary 中剩餘的桶數 + Increment 中有數據的桶數
// primary 中剩餘的數據放在 copy 數組的前面,Increment 中低位數據隨後,
// Increment 中高位數據放到 copy 數組的最後
StateMapEntry<K, N, S>[] table = primaryTable;
final int totalMapIndexSize = rehashIndex + table.length;
final int copiedArraySize = Math.max(totalMapIndexSize, size());
final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize];
if (isRehashing()) {
final int localRehashIndex = rehashIndex;
final int localCopyLength = table.length - localRehashIndex;
// for the primary table, take every index >= rhIdx.
System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
table = incrementalRehashTable;
System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
System.arraycopy(table, table.length >>> 1, copy,
localCopyLength + localRehashIndex, localRehashIndex);
} else {
System.arraycopy(table, 0, copy, 0, table.length);
}
return copy;
}
}
CopyOnWriteStateMap 中三個比較重要的屬性:
- stateMapVersion:表示當前 StateMap 的版本,每次 Snapshot 時版本號加一
- snapshotVersions:存放所有正在進行中的 snapshot 的版本號(因為可能存在多個同時進行的 Snapshot)
- highestRequiredSnapshotVersion:表示正在進行中的那些 snapshot 的最大版本號,如果當前沒有正在進行中的 Snapshot,那麼賦值為 0
snapshotMapArrays 方法第一步按照上述規則更新這三個屬性,第二步將現在 primaryTable 和 rehashTable 的元素淺拷貝一份到 copy 數組中。
注:copy 數組的長度與上述原理分析不完全一致,原理分析時應該是 copiedArraySize = totalMapIndexSize;實際上 copiedArraySize = Math.max(totalMapIndexSize, size())。
源碼註釋寫到:理論上 totalMapIndexSize 就夠了,這裡考慮 size 主要是為了兼容 StateMap 的 TransformedSnapshotIterator 功能。
5.CopyOnWrite 實現原理
上一部分得出結論,每次 Snapshot 時僅僅是淺拷貝一份,所以 Snapshot 和 StateMap 共同引用真實的數據。假如 Snapshot 還沒將數據 flush 到磁盤,但是 StateMap 中對數據進行了修改,那麼 Snapshot 最後 flush 的數據就是錯誤的。Snapshot 的目標是:將 Snapshot 快照中原始的數據刷到磁盤,既然叫快照,所以不允許被修改。
5.1 CopyOnWrite 原理簡述
那 StateMap 如何來保證修改數據的時候,不會修改 Snapshot 的數據呢?其實原理很簡單:StateMap 和 Snapshot 共享了一大堆數據,既然 Snapshot 要求數據不能修改,那麼 StateMap 在修改某條數據時可以將這條數據複製一份產生一個副本,所以 Snapshot 和 StateMap 就會各自擁有自己的副本,所以 StateMap 對數據的修改就不會影響 Snapshot 的快照。
當然為了節省內存和提高效率,StateMap 只會拷貝那些要改變的數據,儘量多的實現共享,不能實現共享的數據只能 Copy 一份再修改了,這就是類名用 CopyOnWrite 修飾的原因。
5.2 CopyOnWrite 原理詳解
上一部分 Snapshot 時,僅僅對 Table 做了一份淺拷貝,而且可以看到拷貝前後,桶內的數據不變,且桶跟桶之間是沒有交集的,所以這裡的原理詳解主要就分析一個桶中的鏈表如何實現 CopyOnWrite。
■ 5.2.1 修改鏈表頭部節點的場景
如上圖所示,primaryTable 和 snapshotTable 的 0 號桶都指向 Entry a,假設現在應用層要修改 Entry a 的數據,整體流程:
- 深拷貝一個 Entry a 對象為 Entry a copy
- 將 Entry a copy 放到 primaryTable 的鏈表中,且 next 指向 Entry b
- 應用層修改 Entry a copy 的 data,將 data1 修改為設定的 data2
這裡 Entry b 和 c 沒有修改,所以不用拷貝,屬於 primaryTable 和 snapshotTable 共享的。
這裡就引出了 CopyOnWriteStateMap 的設計目標(自己的理解,並不是官方觀點):在保證 Snapshot 數據正確性的前提下,儘量的少拷貝數據提高性能。
■ 5.2.2 修改鏈表中間節點的場景
如上圖所示,primaryTable 和 snapshotTable 的 0 號桶都指向 Entry a,假設現在應用層要修改 Entry b 的數據,整體流程:
- 深拷貝一個 Entry b 對象為 Entry b copy
- 將 Entry b copy 串在 primaryTable 的鏈表中,且 next 指向 Entry c
- 應用層修改 Entry b copy 的 data,將 data 修改為設定的 data2
但是上述流程成立嗎?如上圖所示 Entry a 和 c 是 primaryTable 和 snapshotTable 共享的。每個 Entry 只有一個 next 指針,所以 Entry a 可以同時指向 Entry b 和 b copy 嗎?肯定是不可以的,所以 Entry a 不可以共享。下圖是正確流程。
如下圖所示,在修改 Entry b 時,不僅僅要將 Entry b 拷貝一份,而且還要將鏈表中 Entry b 之前的 Entry 必須全部 copy 一份,這樣才能保證在滿足正確性的前提下修改 Entry b,畢竟正確性是第一位。
正確整體流程:
- 深拷貝 Entry a 和 b 對象為 Entry a copy 和 b copy
- 將 Entry a copy 和 b copy 串在 primaryTable 的鏈表中,且 Entry b 的 next 指向 Entry c
- 應用層修改 Entry b copy 的 data,將 data 修改為設定的 data2
總結:假設要修改 Entry b,那麼要將 Entry b 以及鏈表中 Entry b 之前的 Entry 必須全部 copy 一份,Entry b 之後的 Entry 可以共享。
■ 5.2.3 插入新數據的場景
如上圖所示是插入新數據的場景,會使用頭插法插入 Entry d,頭插法不需要拷貝原始鏈表的任何數據,只需要插入最新的數據到鏈表頭部即可。這樣 primaryTable 可以訪問到插入的數據,且不影響 SnapshotData 訪問原始快照的數據。
注:這裡必須是插入新數據的場景,對於 Map 類型,插入舊數據對應的可能是修改操作
■ 5.2.4 鏈表頭部有新節點再修改鏈表中間節點的場景
如上圖所示是鏈表頭部有新節點 Entry d 再修改 Entry b 的場景,此時正確的流程是:
- 深拷貝 Entry a 和 b 對象為 Entry a copy 和 b copy
- 將 Entry a copy 和 b copy 串在 Entry d 的鏈表中,且 Entry b 的 next 指向 Entry c
- 應用層修改 Entry b copy 的 data,將 data 修改為設定的 data2
之前說過要修改 Entry b 需要將 Entry b 之前的 Entry 全部 copy 一份,但是此時並不需要對 Entry d 進行 copy。之前 copy 是因為 Entry b 之前的元素有被 snapshotData 引用,但是這裡 Entry d 並不被 snapshotData 引用,只有 primaryTable 只有 Entry d,所以不需要 copy。
修改 Entry b 時,Entry b 之前的 Entry 哪些需要 copy,哪些不需要 copy,具體如何區分會在後續的源碼環節詳細介紹。
■ 5.2.5 get 鏈表中間節點的場景
理論來講,訪問中間節點的場景數據數據是非常安全的。
如下圖所示 Flink 應用層通過 primaryTable 訪問 Entry b,理論來講只是讀取的場景就不需要 copy 副本了。因為之前 copy 副本都是因為應用層修改了數據,為了保證 Snapshot 數據的不可變特性,所以專門 copy 一個副本讓 primaryTable 去修改。但神奇的是 CopyOnWriteStateMap 在 get 操作時,也需要將 Entry b 以及 Entry b 之前的所有 Entry 拷貝一個副本。
為什麼呢?雖然是 get 訪問操作,但是應用層拿到了 Entry b 中的 data 對象,萬一應用層修改了 data 對象裡的屬性怎麼辦呢?例如 Entry 中的 data 是 Person 對象,Person 對象可能有一些 setter 方法,可以修改其 name 和 age。如果應用層修改了 name 或 age,那麼在 Snapshot 的過程中,還是出現了數據修改的情況。
所以 CopyOnWriteStateMap 把 get 操作跟 put 操作同等對待,無論是 get 還是 put 都需要將 Entry 及其之前的 Entry copy 一份。
■ 5.2.6 remove 數據的場景
需要區分兩種 case:remove 的 Entry 是鏈表頭節點;remove 的 Entry 不是鏈表頭節點。
Case1:remove 的 Entry 是鏈表頭節點的場景比較簡單,將桶直接指向 Entry a 的 next Entry b 即可。
Case 2:remove 的 Entry 不是鏈表頭節點,需要將 Entry b 之前的所有 Entry 拷貝一份(新插入的 Entry 不需要拷貝),且 Entry b 前一個節點的副本直接指向 Entry b 的下一個節點。具體為什麼 Entry a 需要拷貝一份與 put 和 get 操作類似,因為 Entry a 的 next 指針沒辦法指向兩個節點,所以 primaryTable 和 snapshotTable 要有各自的頭結點。
■ 5.2.7 COW 原理小結
上述 case 基本覆蓋到了各種場景,這裡做一個總結:
- 插入新的 Entry 使用頭插法插入到鏈表中
- 假設要修改 Entry b,那麼要將 Entry b 以及鏈表中 Entry b 之前的 Entry 必須全部 copy 一份(新插入的數據不需要拷貝),Entry b 之後的 Entry 可以共享
- 訪問 Entry b 的場景與修改 Entry b 的場景類似
- 假如修改或訪問的數據是 copy 後的數據,那麼實際上不需要再 copy 了,因為 copy 後的數據已經保證是 primaryTable 獨佔的數據,不與 Snapshot 共享
- remove 數據的場景,分為兩種 case:
- 如果 remove 的 Entry 是鏈表頭節點,將桶直接指向頭結點的 next 節點即可。
- 如果 remove 的 Entry 不是鏈表頭節點,需要將目標 Entry 之前的所有 Entry 拷貝一份,且目標 Entry 前一個節點的副本直接指向目標 Entry 的下一個節點。當然如果前繼節點已經是新版本了,則不需要拷貝,直接修改前繼 Entry 的 next 指針即可。
5.3 CopyOnWriteStateMap 各種操作源碼詳解
■ 5.3.1 CopyOnWriteStateMap 介紹
CopyOnWriteStateMap 類用於存儲數據,支持了 CopyOnWrite 的功能,先介紹 CopyOnWriteStateMap 中一些相對重要的字段,相關源碼如下所示(重點看一下每個字段的註釋):
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 默認容量 128,即:hash 表中桶的個數默認 128
public static final int DEFAULT_CAPACITY = 128;
// hash 擴容遷移數據時,每次最少要遷移 4 條數據
private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
// State 的序列化器
protected final TypeSerializer<S> stateSerializer;
// 空表:提前創建好
private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE =
new StateMapEntry[MINIMUM_CAPACITY >>> 1];
// 當前 StateMap 的 version,每次創建一個 Snapshot 時,StateMap 的版本號加一
private int stateMapVersion;
// 所有 正在進行中的 snapshot 的 version
// 每次創建出一個 Snapshot 時,都需要將 Snapshot 的 version 保存到該 Set 中
private final TreeSet<Integer> snapshotVersions;
// 正在進行中的那些 snapshot 的最大版本號
// 這裡保存的就是 TreeSet<Integer> snapshotVersions 中最大的版本號
private int highestRequiredSnapshotVersion;
// 主表:用於存儲數據的 table
private StateMapEntry<K, N, S>[] primaryTable;
// 擴容時的新表,擴容期間數組長度為 primaryTable 的 2 倍。
// 非擴容期間為 空表
private StateMapEntry<K, N, S>[] incrementalRehashTable;
// primaryTable 中元素個數
private int primaryTableSize;
// incrementalRehashTable 中元素個數
private int incrementalRehashTableSize;
// primary table 中增量 rehash 要遷移的下一個 index
// 即:primaryTable 中 rehashIndex 之前的數據全部搬移完成
private int rehashIndex;
// 擴容閾值,與 HashMap 類似,當元素個數大於 threshold 時,就會開始擴容。
// 默認 threshold 為 StateMap 容量 * 0.75
private int threshold;
// 用於記錄元素修改的次數,遍歷迭代過程中,發現 modCount 修改了,則拋異常
private int modCount;
}
其中 primaryTable 字段是真正存儲數據的 hash 表,primaryTable 是 StateMapEntry 類型的數據,StateMapEntry 用於存儲 StateMap 中的一條數據,下面介紹 StateMapEntry。
■ 5.3.2 StateMapEntry
StateMapEntry 是 CopyOnWriteStateMap 中真正存儲數據的實體。在 Java 的 HashMap 中也是將數據封裝在 Entry 中,HashMap 的 Entry 源碼如下所示:
static class Node<K,V> implements Map.Entry<K,V> {
// 當前 key 對應的 hash 值
final int hash;
final K key;
V value;
// next 指向當前桶中下一個 Node
Node<K,V> next;
}
HashMap 中的靜態內部類 Node 實現 Map.Entry,類中有四個字段:hash、key、value、next。key 和 value 不同解釋,hash 表示當前 key 對應的 hash 值,next 指向當前桶中下一個 Node。
HashMap 在 get(key) 查找數據流程:
- 根據 key 計算 hash 值,定位到具體的桶
- 遍歷當前桶的一個個 Entry,先比較 hash 值是否相同,在比較 key 是否相同(使用 equals 判斷 key 是否相同)
- 如果 hash 值和 key 的 equals 方法都能匹配,表示找到了對應的 Entry,返回 Entry 中的 value 即可
StateMapEntry 源碼如下所示:
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
final K key;
final N namespace;
S state;
final int hash;
StateMapEntry<K, N, S> next;
// new entry 時的版本號
int entryVersion;
// state (數據)更新時的 版本號
int stateVersion;
}
StateMapEntry 與 HashMap 的 Entry 相似度較高,其他 key、hash、next 這三個屬性完全相同,StateMapEntry 中的 state 表示 HashMap 中的 value,即:具體存儲的數據。
StateMapEntry 相比 HashMap 的 Entry,多了三個字段:
- namespace:namespace 是 Flink 中的概念,用於區分不同的 Window,在 StateMapEntry 中 key 和 namespace 組合起來作為共同的主鍵,state 作為 value
- entryVersion:表示創建 entry 時的版本號
- stateVersion:表示當前 StateMapEntry 中 state (數據)更新時的版本號
由於 key 和 namespace 共同作為主鍵,因此在 CopyOnWriteStateMap 的 get 或 put 操作中,判斷是否找到了匹配的 Entry,不僅要判斷 hash 值,還要通過 equals 方法對 key 和 namespace 進行判斷。三個參數都校驗通過才能表示找到了相應的 Entry。這一點是與 HashMap 區別較大的,要注意理解。
■ 5.3.3 插入新數據源碼流程
CopyOnWriteStateMap 類的 put 方法如下所示:
public void put(K key, N namespace, S value) {
// putEntry 用於找到對應的 Entry,
// 包括了修改數據或插入新數據的場景
final StateMapEntry<K, N, S> e = putEntry(key, namespace);
// 將 value set 到 Entry 中
e.state = value;
// state 更新了,所以要更新 stateVersion
e.stateVersion = stateMapVersion;
}
put 方法直接調用 putEntry 方法,putEntry 用於找到對應的 Entry,putEntry 包括了修改數據或插入新數據的場景。找到 Entry 後,將 value set 到 Entry 中。
putEntry 方法源碼如下所示:
private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
// 計算當前對應的 hash 值,選擇 primaryTable 或 incrementalRehashTable
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
// 遍歷當前桶中鏈表的一個個 Entry
for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
// 如果根據 key 和 namespace 找到了對應的 Entry,則認為是修改數據
// 普通的 HashMap 結構有一個 Key ,而這裡 key 和 namespace 的組合當做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 修改數據邏輯(暫時忽略)
if (e.entryVersion < highestRequiredSnapshotVersion) {
e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 修改數據,直接返回對應的 Entry
return e;
}
}
// 代碼走到這裡,說明原始的鏈表中沒找到對應 Entry,即:插入新數據的邏輯
++modCount;
if (size() > threshold) {
doubleCapacity();
}
// 鏈中沒有找到 key 和 namespace 的數據
return addNewStateMapEntry(tab, key, namespace, hash);
}
putEntry 方法首先會計算當前 key 和 namespace 對應的 hash 值,使用 selectActiveTable 選擇使用 primaryTable 或 incrementalRehashTable,然後計算當前元素對應桶的 index。
這裡注意,普通的 HashMap 結構有一個 Key 一個 value。而這裡 key 和 namespace 的組合當做 Map 的 key,value 仍然是原來的 value。
遍歷當前桶中鏈表的一個個 Entry,如果通過 hash 值、 key 和 namespace 的 equals 方法進行匹配,如果匹配成功,表示找到了對應的 Entry,則認為是修改數據。
如果遍歷完當前桶中鏈表的所有元素還沒找到匹配的 Entry,說明是插入一條新數據,則執行 addNewStateMapEntry 方法往鏈表頭部插入一個新的 Entry 返回(頭插法)。
■ 5.3.4 修改數據源碼流程
在 putEntry 中,修改數據場景的源碼如下所示:
// 如果根據 key 和 namespace 找到了對應的 Entry,則認為是修改數據
// 普通的 HashMap 結構有一個 Key ,而這裡 key 和 namespace 的組合當做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// entryVersion 表示 entry 創建時的版本號
// highestRequiredSnapshotVersion 表示 正在進行中的那些 snapshot 的最大版本號
// entryVersion 小於 highestRequiredSnapshotVersion,說明 Entry 的版本小於當前某些 Snapshot 的版本號,
// 即:當前 Entry 是舊版本的數據,當前 Entry 被其他 snapshot 持有。
// 為了保證 Snapshot 的數據正確性,這裡必須為 e 創建新的副本,且 e 之前的某些元素也需要 copy 副本
// handleChainedEntryCopyOnWrite 方法將會進行相應的 copy 操作,並返回 e 的新副本
// 然後將返回 handleChainedEntryCopyOnWrite 方法返回的 e 的副本返回給上層,進行數據的修改操作。
if (e.entryVersion < highestRequiredSnapshotVersion) {
e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 反之,entryVersion >= highestRequiredSnapshotVersion
// 說明當前 Entry 創建時的版本比所有 Snapshot 的版本高
// 即:當前 Entry 是新版本的數據,不被任何 Snapshot 持有
// 注:Snapshot 不可能引用高版本的數據
// 此時,e 是新的 Entry,不存在共享問題,所以直接修改當前 Entry 即可,所以返回當前 e
return e;
}
這裡是上一部分插入新數據的部分源碼,現在重點講述修改數據的過程。如果根據 key 和 namespace 找到了相應的 Entry,則認為是對老數據的修改,走相應的修改邏輯。然後判斷當前 Entry 的 entryVersion 是否小於 highestRequiredSnapshotVersion。
entryVersion 表示 entry 創建時的版本號,highestRequiredSnapshotVersion 表示正在進行中的那些 snapshot 的最大版本號。
- entryVersion 小於 highestRequiredSnapshotVersion,說明 Entry 創建時的版本小於當前某些 Snapshot 的版本號,即:當前 Entry 是舊版本的數據,當前 Entry 被其他 Snapshot 持有。為了保證 Snapshot 的數據正確性,這裡必須為 e 創建新的副本,且 e 之前的某些元素也需要 copy 副本,handleChainedEntryCopyOnWrite 方法將會進行相應的 copy 操作,並返回 e 的新副本。最後將 e 的副本返回給上層,進行數據的修改操作。
- 反之,entryVersion >= highestRequiredSnapshotVersion,說明當前 Entry 創建時的版本比所有 Snapshot 的版本高。Snapshot 不可能引用高版本的數據,所以當前 Entry 是新版本的數據不被任何 Snapshot 持有。此時 e 是新的 Entry,不存在共享問題,所以直接修改當前 Entry 即可,所以返回當前 e。
handleChainedEntryCopyOnWrite 方法的作用:為 Entry e 創建新的副本,且鏈表中 Entry e 之前某些元素也需要 copy 副本,最後返回 e 的副本。
那哪些元素應該拷貝,哪些元素不應該拷貝呢?Snapshot 之後新創建的 Entry 就不需要再拷貝了,Snapshot 之前創建的 Entry 會被 Snapshot 引用所以需要再拷貝。
handleChainedEntryCopyOnWrite 的源碼如下所示:
private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(
StateMapEntry<K, N, S>[] tab,
int mapIdx,
StateMapEntry<K, N, S> untilEntry) {
// current 指向當前桶的頭結點
StateMapEntry<K, N, S> current = tab[mapIdx];
StateMapEntry<K, N, S> copy;
// 判斷頭結點創建時的版本是否低於 highestRequiredSnapshotVersion
// 如果低於,則 current 節點被 Snapshot 引用,所以需要 new 一個新的 Entry
if (current.entryVersion < highestRequiredSnapshotVersion) {
copy = new StateMapEntry<>(current, stateMapVersion);
tab[mapIdx] = copy;
} else {
copy = current;
}
// 依次遍歷當前桶的元素,直到遍歷到 untilEntry 節點,也就是我們要修改的 Entry 節點
while (current != untilEntry) {
current = current.next;
// current 版本小於 highestRequiredSnapshotVersion,則需要拷貝,
// 否則不用拷貝
if (current.entryVersion < highestRequiredSnapshotVersion) {
// entryVersion 表示創建 Entry 時的 version,
// 所以新創建的 Entry 對應的 entryVersion 要更新為當前 StateMap 的 version
copy.next = new StateMapEntry<>(current, stateMapVersion);
copy = copy.next;
} else {
copy = current;
}
}
return copy;
}
從源碼可以看到,,從頭結點到要修改的 Entry 節點依次遍歷桶中元素,都是使用 current.entryVersion < highestRequiredSnapshotVersion 來判斷當前節點的創建創建時的版本是否低於 highestRequiredSnapshotVersion。
- 如果低於則 current 節點被 Snapshot 引用,所以需要 new 一個新的 Entry,也就是所謂的拷貝一個副本。
- 否則不用拷貝。
在新創建 Entry 時,新 Entry 的 entryVersion 要更新為當前 StateMap 的 version,表示這是一個新版本的 Entry,並沒有被 Snapshot 引用。這樣之後再要修改該 Entry 時直接修改該 Entry 即可,不需要再拷貝一份副本了。
■ 5.3.5 訪問數據源碼流程
CopyOnWriteStateMap 類的 get 方法與 putEntry 類似,都是依次遍歷相應桶的元素,直到根據 key 和 namespace 找到了相應的 Entry,則返回相應的 Entry。如果遍歷完相應桶的所有 Entry,都沒有與 key 和 namespace 相匹配的 Entry,則表示 StateMap 中沒有指定的元素則返回 null。
如果找到了相應 Entry,為了保證 Snapshot 引用的數據不被修改,所以也要進行拷貝操作。除了拷貝其他源碼比較簡單與 putEntry 完成類似,所以重點分析找到 Entry 後的相關源碼。相關源碼如下所示:
if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
// 一旦 get 當前數據,為了防止應用層修改數據內部的屬性值,
// 所以必須保證這是一個最新的 Entry,並更新其 stateVersion
// 首先檢查當前的 State,也就是 value 值是否是舊版本數據,
// 如果 value 是舊版本,則必須深拷貝一個 value
// 否則 value 是新版本,直接返回給應用層
if (e.stateVersion < requiredVersion) {
// 此時還有兩種情況,
// 1、如果當前 Entry 是舊版本的,則 Entry 也需要拷貝一份,
// 按照之前分析過的 handleChainedEntryCopyOnWrite 策略拷貝即可
// 2、當前 Entry 是新版本數據,則不需要拷貝,直接修改其 State 即可
if (e.entryVersion < requiredVersion) {
e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
}
// 更新其 stateVersion
e.stateVersion = stateMapVersion;
// 通過序列化器,深拷貝一個數據
e.state = getStateSerializer().copy(e.state);
}
return e.state;
}
一旦 get 當前數據,為了防止應用層修改數據內部的屬性值,所以必須保證這是一個最新的 Entry,並更新其 stateVersion。首先檢查當前的 State,也就是 value 值是否是舊版本數據:
- 如果 value 是舊版本,則必須深拷貝一個 value
- 否則 value 是新版本,直接返回給應用層
如果 value 值是還區分兩種情況:
- 如果當前 Entry 是舊版本的,則 Entry 也需要拷貝一份,按照之前分析過的 handleChainedEntryCopyOnWrite 策略拷貝即可
- 當前 Entry 是新版本數據,則不需要拷貝,直接修改其 State 即可
case 1 容易理解,如下圖所示訪問 Entry b 就是 case 1 的場景,需要使用 handleChainedEntryCopyOnWrite 方法對 Entry b 和 a 進行拷貝操作,然後再對 Entry b 的 value 對象進行一次深拷貝,所以 Entry b 和 b copy 不會共享 data 對象。
雖然 Entry a 也拷貝了一份生成 Entry a copy,但是 Entry a 中的 value 對象並沒有深拷貝一份,而是共享 data1 對象。get Entry b 後 Entry a 和 a copy 引用 data 1 的圖示用下圖會更形象一些,即:Entry a 和 a copy 的 state 會共同引用 data1 對象。對於修改 Entry a 如果下次再有 get 操作,就會對應上述的 case 2 場景:stateVersion 是老版本,但是 Entry a copy 屬於新版本。此時不需要再對 Entry 進行復制操作,只需要對 State 進行一次深拷貝,保證不會將 Entry a 的 State 返回給應用層。
■ 5.3.6 remove 數據源碼流程
removeEntry 源碼如下所示:
private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
for (StateMapEntry<K, N, S> e = tab[index], prev = null;
e != null; prev = e, e = e.next) {
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 如果要刪除的 Entry 不存在前繼節點,說明要刪除的 Entry 是頭結點,
// 直接將桶直接指向頭結點的 next 節點即可。
if (prev == null) {
tab[index] = e.next;
} else {
// 如果 remove 的 Entry 不是鏈表頭節點,需要將目標 Entry 之前的所有 Entry 拷貝一份,
// 且目標 Entry 前一個節點的副本直接指向目標 Entry 的下一個節點。
// 當然如果前繼節點已經是新版本了,則不需要拷貝,直接修改前繼 Entry 的 next 指針即可。
// copy-on-write check for entry
if (prev.entryVersion < highestRequiredSnapshotVersion) {
prev = handleChainedEntryCopyOnWrite(tab, index, prev);
}
prev.next = e.next;
}
// 修改一些計數器
++modCount;
if (tab == primaryTable) {
--primaryTableSize;
} else {
--incrementalRehashTableSize;
}
return e;
}
}
return null;
}
remove 數據的場景,分為兩種 case:
- 如果 remove 的 Entry 是鏈表頭節點,將桶直接指向頭結點的 next 節點即可。
- 如果 remove 的 Entry 不是鏈表頭節點,需要將目標 Entry 之前的所有 Entry 拷貝一份,且目標 Entry 前一個節點的副本直接指向目標 Entry 的下一個節點。當然如果前繼節點已經是新版本了,則不需要拷貝,直接修改前繼 Entry 的 next 指針即可。
源碼比較清晰加上已經詳細分析了 put 和 get 源碼,所以 remove 源碼直接結合原理看註釋即可。
6.Snapshot 流程及完成後的 release 操作
前面已經分析了 CopyOnWriteStateMap 的擴容 rehash 原理和源碼、Snapshot 時淺拷貝原理和源碼以及CopyOnWrite 實現的原理和源碼。
CopyOnWrite 的實現主要為了減少 Checkpoint 同步階段的停頓時間,將數據的快照過程儘量放到異步流程。下面分析 Snapshot 異步快照流程及 Snapshot 完成後 release 相關操作。
HeapSnapshotStrategy 類的 AsyncSnapshotCallable 匿名內部類的 callInternal 方法中會調用 AbstractStateTableSnapshot 的 writeStateInKeyGroup 方法,並依次將每個 KeyGroupId 當做參數傳入。
writeStateInKeyGroup 方法源碼如下所示:
public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) {
// 獲取 KeyGroupId 對應的 CopyOnWriteStateMapSnapshot
StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> stateMapSnapshot =
getStateMapSnapshotForKeyGroup(keyGroupId);
// 將 stateMapSnapshot 中的 State 數據進行序列化輸出
stateMapSnapshot.writeState(localKeySerializer, localNamespaceSerializer,
localStateSerializer, dov, stateSnapshotTransformer);
// stateMapSnapshot 對應的數據已經遍歷完了,所以可以釋放該快照
stateMapSnapshot.release();
}
writeStateInKeyGroup 方法拿到 KeyGroupId 對應的 CopyOnWriteStateMapSnapshot,然後將 stateMapSnapshot 中的 State 數據進行序列化輸出,這一步就會依次遍歷 stateMapSnapshot 所有引用的數據序列化輸出到外部存儲中。序列化完成就可以釋放該快照了。
release 最後會調用 CopyOnWriteStateMap 的 releaseSnapshot 方法,releaseSnapshot 方法源碼如下所示:
void releaseSnapshot(int snapshotVersion) {
synchronized (snapshotVersions) {
// 將 相應的 snapshotVersion 從 snapshotVersions 中 remove
snapshotVersions.remove(snapshotVersion);
// 將 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,
// 如果snapshotVersions 為空,則 highestRequiredSnapshotVersion 更新為 0
highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ?
0 : snapshotVersions.last();
}
}
releaseSnapshot 方法將相應的 snapshotVersion 從 snapshotVersions 中 remove,並將 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,如果snapshotVersions 為空,則 highestRequiredSnapshotVersion 更新為 0。
有個小疑問:根據之前的流程分析,Snapshot 過程中如果 Flink 應用層發生了大量 get 和 put 操作,那麼很多 Entry 和 State 都會出現多個副本。Snapshot 結束後,就應該把那些舊版本的數據清理掉。可是沒有看到對舊版本數據進行清理操作呢?
如上圖所示,Entry b 和 a 都存在副本,當 Snapshot 結束後,因為新數據在 Entry a copy 和 b copy 中,所以 Entry a 和 b 都應該被清理掉,留著 Entry a copy 和 b copy 即可。但是代碼中沒有看到去清理 Entry a 和 b。那麼會不會出現內存洩漏的問題呢?
其實並不會,Snapshot 結束後 snapshotData 對應的 hash 表不會再被異步快照的線程引用,所以 Entry a 和 b 就會變成不可達對象,會被 JVM 的 GC 回收掉。
7.總結
本文詳細介紹了 CopyOnWriteStateTable 的設計原理及相關源碼,主要從 rehash 和 CopyOnWrite 兩個點進行深入剖析,希望對大家能有所幫助。
本文涉及的 github 倉庫,都在 feature/source-code-read-1-9-0 分支,之後也會持續更新:
https://github.com/1996fanrui/flink/tree/feature/source-code-read-1-9-0註釋
▼ 更多 Flink 技術交流 ▼