開發與維運

Flink 狀態管理-快照策略

快照策略(SnapshotStrategy)

Flink的檢查點機制是建立在分佈式一致快照之上的,從而實現數據處理的exactly-once處理語義。無論是Keyed state(HeapKeyStateBackend、RocksDBKeyedStateBackend)還是Operator state(DefaultOperatorStateBackend)都會接收快照執行請求(snapshot方法),而具體的快照操作都交由具體的snapshot策略完成。

下面是Flink快照策略UML,可以看到Keyed state中的HeapSnapshotStrategyRocksDBSnapshotStrategyBase分別對應堆內存和RocksDB(RocksDB又細分為全量快照和增量快照)存儲後端的快照執行策略,而DefaultOperatorStateBackendSnapshotStrategy對應著Operator state存儲後端快照執行策略。
除了Keyed state和Operator state之外,因為savepoint本質也是snapshot的特殊實現,所以對應的savepoint執行策略SavepointSnapshotStrategy也實現了SnapshotStrategy接口。

undefined

下面是SnapshotStrategy接口定義,其中定義了執行快照的所需步驟:

  1. 同步執行部分,用於生成執行快照所需的資源,為下一步寫入快照數據做好資源準備。
  2. 異步執行部分,將快照數據寫入到提供的CheckpointStreamFactory中。
public interface SnapshotStrategy<S extends StateObject, SR extends SnapshotResources> {
    //同步執行生成快照的部分,可以理解為為執行快照準備必要的資源。
    SR syncPrepareResources(long checkpointId) throws Exception;
    //異步執行快照寫入部分,快照數據寫入到CheckpointFactory
    SnapshotResultSupplier<S> asyncSnapshot(
            SR syncPartResource,
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions);

    //用於執行異步快照部分的Supplier
    @FunctionalInterface
    interface SnapshotResultSupplier<S extends StateObject> {
        //Performs the asynchronous part of a checkpoint and returns the snapshot result.
        SnapshotResult<S> get(CloseableRegistry snapshotCloseableRegistry) throws Exception;
    }
}

下面是SnapshotResources所對應的UML圖:

  • 全量快照FullSnapshotResources下分別對應著堆內存快照資源HeapSnapshotResources以及RocksDB全量快照資源實現類RocksDBFullSnapshotResources
  • RocksDB增量快照資源實現類IncrementalRocksDBSnapshotResoruces
  • Operator state快照資源實現類DefaultOperatorStateBackendSnapshotResources

undefined

SnapshotResources接口定義如下,只有一個release方法定義,用於在異步Snapshot執行完成後清空資源。

@Internal
public interface SnapshotResources {
    /** Cleans up the resources after the asynchronous part is done. */
    void release();
}

關於具體資源實現類我們在對應的快照策略中來查看。

堆內存快照策略(HeapSnasphotStrategy)

在看堆內存快照策略之前,我們先看下堆內存執行快照所對應的資源類HeapSnapshotResources。通過上面的UML我們可以看到堆內存快照和RocksDB全量快照都實現了FullSnapshotResources,這也說明了堆內存存儲後端不存在增量快照的實現。

FullSnapshotResources定義了與具體存儲後端無關的全量執行全量快照資源,它們都是通過FullSnapshotAsyncWriter來寫快照數據。

FullSnapshotResources接口定義如下,其中泛型K代表了具體存儲key的數據類型。

public interface FullSnapshotResources<K> extends SnapshotResources {

    //返回此狀態快照的元數據列表,StateMetaInfoSnapshot記錄每個狀態對應快照元數據信息,比如state name、    backend 類型、序列化器等。
    List<StateMetaInfoSnapshot> getMetaInfoSnapshots();
    
    //創建用於遍歷當前快照的迭代器
    KeyValueStateIterator createKVStateIterator() throws IOException;
    
    //當前快照對應的KeyGroupRange
    KeyGroupRange getKeyGroupRange();

    /** Returns key {@link TypeSerializer}. */
    TypeSerializer<K> getKeySerializer();

    /** Returns the {@link StreamCompressionDecorator} that should be used for writing. */
    StreamCompressionDecorator getStreamCompressionDecorator();
}

下面我們看下HeapSnapshotStrategy中的兩個核心方法syncPrepareResourcesasyncSnapshot

class HeapSnapshotStrategy<K>
        implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotResources<K>> {
    ...
    //準備snapshot資源HeapSnapshotResources
    @Override
    public HeapSnapshotResources<K> syncPrepareResources(long checkpointId) {
        return HeapSnapshotResources.create(
                registeredKVStates,
                registeredPQStates,
                keyGroupCompressionDecorator,
                keyGroupRange,
                getKeySerializer(),
                totalKeyGroups);
    }

    @Override
    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
            HeapSnapshotResources<K> syncPartResource,
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions) {
            ......
        //SupplierWithException是Java Supplier可能拋出異常的函數接口,第一個泛型參數是supplier執行返回類型,第二個參數為Supplier中函數拋出的異常
        final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
                checkpointStreamSupplier =
                        localRecoveryConfig.isLocalRecoveryEnabled() //是否使用本地恢復
                                        && !checkpointOptions.getCheckpointType().isSavepoint()
                                ? () ->
                                        createDuplicatingStream( //本地恢復並且當前不是savepoint,創建複製流
                                                checkpointId,
                                                CheckpointedStateScope.EXCLUSIVE,
                                                streamFactory,
                                                localRecoveryConfig
                                                        .getLocalStateDirectoryProvider())
                                : () ->
                                        createSimpleStream(//非本地恢復,或者是savepoint,創建簡單流
                                                CheckpointedStateScope.EXCLUSIVE, streamFactory);

        return (snapshotCloseableRegistry) -> {
            ......
            //輸出數據流
            final CheckpointStreamFactory.CheckpointStateOutputStream localStream =
                    streamWithResultProvider.getCheckpointOutputStream();
            ////使用KeyedBackendSerializationProxy寫cp數據
            final DataOutputViewStreamWrapper outView =
                    new DataOutputViewStreamWrapper(localStream);
            serializationProxy.write(outView);
           ......
        };
    }
}

上面asyncSnapshot方法通過CheckpointStreamWithResultProvider來創建快照輸出流。該類核心就是封裝了獲取輸出流,如果沒有配置本地狀態恢復,只會創建一個輸出流來講snapshot數據寫入到job所配置的Checkpoint存儲。如果配置了本地恢復,就需要將狀態數據寫本地了(本地數據恢復),所以對於這種情況會獲取兩個輸出流,一個用於寫配置的Checkpoint存儲,一個用於寫本地。

public interface CheckpointStreamWithResultProvider extends Closeable {
    //關閉輸出流,並返回帶有流句柄的快照結果
    @Nonnull
    SnapshotResult<StreamStateHandle> closeAndFinalizeCheckpointStreamResult() throws IOException;

    //返回snapshot輸出流
    @Nonnull
    CheckpointStreamFactory.CheckpointStateOutputStream getCheckpointOutputStream();

    @Override
    default void close() throws IOException {
        getCheckpointOutputStream().close();
    }
    ...
}

CheckpointStreamWithResultProvider的兩個內部實現類也就分別對應了創建simple流(PrimaryStreamOnly,只會創建一個輸出流, 這個流是我們配置checkpoint存儲的寫入地方,可能是遠端HDFS、JobManager等),和創建duplicating流(PrimaryAndSecondaryStream,兩個輸出流,第一個流和PrimaryStreamOnly一樣;第二個輸出流用於寫入到本地、TaskManager等,用於本地恢復)。

undefined

創建simple stream,下面可以看到只會創建一個primary stream。

static CheckpointStreamWithResultProvider createSimpleStream(
            @Nonnull CheckpointedStateScope checkpointedStateScope,
            @Nonnull CheckpointStreamFactory primaryStreamFactory)
            throws IOException {
        //創建主輸出流
        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
                primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
        return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
    }

創建duplicating stream,可以看到除了一個primary stream外,還會創建寫文件的second stream。

@Nonnull
    static CheckpointStreamWithResultProvider createDuplicatingStream(
            @Nonnegative long checkpointId,
            @Nonnull CheckpointedStateScope checkpointedStateScope,
            @Nonnull CheckpointStreamFactory primaryStreamFactory,
            @Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider)
            throws IOException {

        CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
                primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);

        try {
            //cp數據寫出路徑
            File outFile =
                    new File(
                            secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(
                                    checkpointId),
                            String.valueOf(UUID.randomUUID()));
            Path outPath = new Path(outFile.toURI());

            //構建寫入文件的輸出流
            CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
                    new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);

            return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(
                    primaryOut, secondaryOut);
        } catch (IOException secondaryEx) {
            LOG.warn(
                    "Exception when opening secondary/local checkpoint output stream. "
                            + "Continue only with the primary stream.",
                    secondaryEx);
        }

        return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
    }

上面CheckpointStreamFactory創建輸出流,該輸出流用於將Checkpoint數據寫入到外部,比如通過FsCheckpoihntStreamFactory將檢查點數據寫到外部文件系統。

undefined

public interface CheckpointStreamFactory {

      //創建一個新的狀態輸出流,CheckpointStateOutputStream為當前CheckpointStreamFactory內部靜態抽象類
    CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope)
            throws IOException;

    //CheckpointStateOutputStream基類,相關實現都在CheckpointStreamFactory的子類
    abstract class CheckpointStateOutputStream extends FSDataOutputStream {

        //關閉數據流並獲取句柄
        @Nullable
        public abstract StreamStateHandle closeAndGetHandle() throws IOException;

        //關閉數據流
        @Override
        public abstract void close() throws IOException;
    }
}

RocksDB快照存儲策略

上面的UML我們可以知道RocksDB快照存儲策略主要對應三個核心類,抽象類RocksDBSnapshotStrategyBase、全量快照策略RocksDBFullSnapshotStrategy和增量快照策略RocksDBIncrementalSnapshotStrategy
RocksDBSnapshotStrategyBase定義了一些RocksDB、state相關的成員變量,具體實現都在相關子類中。

全量快照

全量快照RocksDBFullSnapshotStrategy用於創建RocksDBKeyedStateBackend的全量快照,每次Checkpoint會將全量狀態數據同步到遠端(JobManager或HDFS)。

下面我們同樣看下核心方法:asyncPrepareResources和asyncSnapshot。

public class RocksFullSnapshotStrategy<K>
        extends RocksDBSnapshotStrategyBase<K, FullSnapshotResources<K>> {
    ......
    
    @Override
    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
        //構建RocksDB全量快照資源類,RocksDBFullSnapshotResources和HeapFullSnapshotResources相比,包含了
        //RocksDB 實例和快照Snapshot
        return RocksDBFullSnapshotResources.create(
                kvStateInformation,
                registeredPQStates,
                db,
                rocksDBResourceGuard,
                keyGroupRange,
                keySerializer,
                keyGroupPrefixBytes,
                keyGroupCompressionDecorator);
    }

    @Override
    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
            FullSnapshotResources<K> fullRocksDBSnapshotResources,
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
            @Nonnull CheckpointOptions checkpointOptions) {

        if (fullRocksDBSnapshotResources.getMetaInfoSnapshots().isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(
                        "Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
                        timestamp);
            }
            return registry -> SnapshotResult.empty();
        }

        //createCheckpointStreamSupplier和Heap中一樣,根據是否啟動本地恢復,創建Duplicating和simple stream
        final SupplierWithException<CheckpointStreamWithResultProvider, Exception>
                checkpointStreamSupplier =
                        createCheckpointStreamSupplier(
                                checkpointId, checkpointStreamFactory, checkpointOptions);

        //創建全量異步Writer
        return new FullSnapshotAsyncWriter<>(
                checkpointOptions.getCheckpointType(),
                checkpointStreamSupplier,
                fullRocksDBSnapshotResources);
    }
......
}

FullSnapshotAsyncWriter也是一個Supplier,用於異步寫全量快照數據到給定的輸出流中。

public class FullSnapshotAsyncWriter<K>
        implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
        @Override
    public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
            throws Exception {
        ......
        //獲取輸出流
        final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider =
                checkpointStreamSupplier.get();

        snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
        //寫快照數據到輸出流中
        writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
        ......
    }
    
    private void writeSnapshotToOutputStream(
            @Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
            @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets)
            throws IOException, InterruptedException {
        //通過輸出視圖將快照數據寫入到指定輸出流中,注意 checkpointStreamWithResultProvider可能寫兩份數據
        final DataOutputView outputView =
                new DataOutputViewStreamWrapper(
                        checkpointStreamWithResultProvider.getCheckpointOutputStream());
        //寫元數據
        writeKVStateMetaData(outputView);
        //為每個state實例寫狀態數據
        try (KeyValueStateIterator kvStateIterator = snapshotResources.createKVStateIterator()) {
            writeKVStateData(
                    kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
        }
    }
}

下面我們看下最關鍵的writeKVStateData,到底是怎麼將全量數據寫到外部的。我們拋開繁雜的細節,就看這裡怎麼寫的。可以看到實際就是迭代KeyValueStateIterator

private void writeKVStateData(
            final KeyValueStateIterator mergeIterator,
            final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider,
            final KeyGroupRangeOffsets keyGroupRangeOffsets)
            throws IOException, InterruptedException {
        ......
        try {
           ......
            //就是遍歷KeyValueStateIterator迭代器
            // main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking
            // key-group offsets.
            while (mergeIterator.isValid()) {
                ......
                writeKeyValuePair(previousKey, previousValue, kgOutView);
                ......
                // request next k/v pair
                previousKey = mergeIterator.key();
                previousValue = mergeIterator.value();
                mergeIterator.next();
            }
            ......
        } finally {
            // this will just close the outer stream
            IOUtils.closeQuietly(kgOutStream);
        }
    }

KeyValueStateIterator就是記錄了當前快照的所有key-value實體,RocksDB和Heap分別有各自的迭代器實現。
undefined

我們看下RocksStatesPerKeyGroupMergeIterator是如何創建的。我們在上面看FullSnapshotResources接口時看到了抽象方法createKVStateIterator定義,該方法就是專門用於創建迭代器的。HeapSnapshotResourcesRocksDBFullSnapshotResources分別實現了該方法來創建Heap和RocksDB迭代器。下面是RocksDBFullSnapshotResources.createKVStateIterator實現。

@Override
    public KeyValueStateIterator createKVStateIterator() throws IOException {
        ......
        try {
            //創建RocksDB ReadOptions,設置讀取上面的RocksDB snapshot,該snapshot是在Checkpoint同步階段生成的
            ReadOptions readOptions = new ReadOptions();
            closeableRegistry.registerCloseable(readOptions::close);
            readOptions.setSnapshot(snapshot);

            //RocksDBIteratorWrapper是對RocksDBIterator的一層包裝
            List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
                    createKVStateIterators(closeableRegistry, readOptions);
           .......
         //RocksStatesPerKeyGroupMergeIterator實際是將多個state實例(ColumnFamily)的迭代器包成一個迭代器
            return new RocksStatesPerKeyGroupMergeIterator(
                    closeableRegistry,
                    kvStateIterators,
                    heapPriorityQueueIterators,
                    keyGroupPrefixBytes);
        } catch (Throwable t) {
            IOUtils.closeQuietly(closeableRegistry);
            throw new IOException("Error creating merge iterator", t);
        }
    }
private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
            CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
        final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
                new ArrayList<>(metaData.size());
        int kvStateId = 0;
        //每個state,也就是每個RocksDB的ColumnFamily都會創建一個迭代器
        for (MetaData metaDataEntry : metaData) {
            RocksIteratorWrapper rocksIteratorWrapper =
                    createRocksIteratorWrapper(
                            db,
                            metaDataEntry.rocksDbKvStateInfo.columnFamilyHandle,
                            metaDataEntry.stateSnapshotTransformer,
                            readOptions);
            kvStateIterators.add(Tuple2.of(rocksIteratorWrapper, kvStateId));
            closeableRegistry.registerCloseable(rocksIteratorWrapper);
            ++kvStateId;
        }
        return kvStateIterators;
    }

    private static RocksIteratorWrapper createRocksIteratorWrapper(
            RocksDB db,
            ColumnFamilyHandle columnFamilyHandle,
            StateSnapshotTransformer<byte[]> stateSnapshotTransformer,
            ReadOptions readOptions) {
        //創建RocksDB Iterator,被包在了Flink定義的RocksDBIteratorWrapper中
        RocksIterator rocksIterator = db.newIterator(columnFamilyHandle, readOptions);
        return stateSnapshotTransformer == null
                ? new RocksIteratorWrapper(rocksIterator)
                : new RocksTransformingIteratorWrapper(rocksIterator, stateSnapshotTransformer);
    }

上面代碼可以看到這裡的迭代器其實本質還是RocksDB自己的迭代器(指定了讀取的snapshot),Flink將其包在了RocksDBIteratorWrapper中(為什麼需要包一層可以查看RocksDB自身官網Iterator異常處理)。因為可能有多個state實例,每個實例都有自己的一個迭代器,最後Flink將這些迭代器封裝到一個迭代器中,即RocksStatetsPerKeyGroupMergeIterator

增量快照

RocksIncrementalSnapshotStrategyRocksDBKeyedStateBackend增量快照策略,它是基於RocksDB的native Checkpoint來實現增量快照的。

我們在看RocksIncrementalSnapshotStrategy的syncPrepareResources和asyncSnapshot前,先看下RocksDB增量快照會用到的一些關鍵成員變量。

//RocksDB增量快照資源信息為內部類IncrementalRocksDBSnapshotResources
public class RocksIncrementalSnapshotStrategy<K>
        extends RocksDBSnapshotStrategyBase<
                K, RocksIncrementalSnapshotStrategy.IncrementalRocksDBSnapshotResources> {

    //RocksDB實例目錄
    @Nonnull private final File instanceBasePath;

    /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
    @Nonnull private final UUID backendUID;
    
     //記錄了checkpoint id和當前checkpoint sst文件映射關係
    @Nonnull private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;

    //最後一次完成的Checkpoint ID
    private long lastCompletedCheckpointId;

    //用於上傳快照文件(RocksDB checkpoint生成的sst文件等)
    private final RocksDBStateUploader stateUploader;
    ...
}

下面我們再看下同步資源準備階段,主要做了兩件事:

  1. 獲取最近一次Checkpoint生成的sst文件,也就是通過materializedSstFiles獲取。用於增量文件對比。
  2. 創建RocksDB Checkpoint。
@Override
    public IncrementalRocksDBSnapshotResources syncPrepareResources(long checkpointId)
            throws Exception {

        //目錄準備,如果開啟本地恢復,則創建永久目錄,否則創建臨時目錄
        final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
        LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);
        
        final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                new ArrayList<>(kvStateInformation.size());
        //最近一次完成的Checkpoint 所生成的sst文件,用於增量對比
        final Set<StateHandleID> baseSstFiles =
                snapshotMetaData(checkpointId, stateMetaInfoSnapshots);
        //創建RocksDB 檢查點
        takeDBNativeCheckpoint(snapshotDirectory);

        return new IncrementalRocksDBSnapshotResources(
                snapshotDirectory, baseSstFiles, stateMetaInfoSnapshots);
    }

takeDBNativeCheckpoint就是同步創建RocksDB的Checkpoint,Checkpoint數據會在指定目錄生成(sst文件、misc文件)。

private void takeDBNativeCheckpoint(@Nonnull SnapshotDirectory outputDirectory)
            throws Exception {
        try (ResourceGuard.Lease ignored = rocksDBResourceGuard.acquireResource();
                Checkpoint checkpoint = Checkpoint.create(db)) {
            checkpoint.createCheckpoint(outputDirectory.getDirectory().toString());
        } catch (Exception ex) {
            ......
        }
    }

asyncSnapshot內部很簡單,主要創建RocksDBIncrementalSnapshotOperation Supplier來創建增量快照。

@Override
    public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
            IncrementalRocksDBSnapshotResources snapshotResources,
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory checkpointStreamFactory,
            @Nonnull CheckpointOptions checkpointOptions) {
        ...
        return new RocksDBIncrementalSnapshotOperation(
                checkpointId,
                checkpointStreamFactory,
                snapshotResources.snapshotDirectory, //RocksDB Checkpoint生成目錄
                snapshotResources.baseSstFiles, //上次Cp完成的sst文件
                snapshotResources.stateMetaInfoSnapshots);
    }

下面我們看下增量快照實現的核心RocksDBIncrementalSnapshotOperation

private final class RocksDBIncrementalSnapshotOperation
            implements SnapshotResultSupplier<KeyedStateHandle> {
    ...
    
    @Override
     public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry)
             throws Exception {
            ...
            // 當前RocksDB checkpoint生成的sst文件
            final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
            // 當前RocksDB Checkpoint的misc files(元數據文件)
            final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
            ......
            //上傳增量sst文件和misc 文件,uploadSstFiles方法內部獲取遍歷RocksDB Checkpoint目錄比較新增sst文件
            uploadSstFiles(sstFiles, miscFiles, snapshotCloseableRegistry);
            //塞入當前Checkpoint對應sst文件
            synchronized (materializedSstFiles) {
                    materializedSstFiles.put(checkpointId, sstFiles.keySet());
                }
            ......
    }    
}

我們再看下上面的uploadSstFiles方法實現:

 private void uploadSstFiles(
                @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
                @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
                @Nonnull CloseableRegistry snapshotCloseableRegistry)
                throws Exception {
            //增量sst本地文件路徑
            Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
            //misc文件路徑
            Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
            //當前RocksDB Checkpoint目錄
            Path[] files = localBackupDirectory.listDirectory();
            if (files != null) {
                //查找增量文件
                createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths);
                //使用stateUploader上傳增量sst文件
                sstFiles.putAll(
                        stateUploader.uploadFilesToCheckpointFs(
                                sstFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
                //上傳misc文件
                miscFiles.putAll(
                        stateUploader.uploadFilesToCheckpointFs(
                                miscFilePaths, checkpointStreamFactory, snapshotCloseableRegistry));
            }
        }

上面createUploadFilesPaths方法用於對比查找增量sst文件,並生成要被上傳的sst文件和misc文件。

private void createUploadFilePaths(
                Path[] files,
                Map<StateHandleID, StreamStateHandle> sstFiles,
                Map<StateHandleID, Path> sstFilePaths,
                Map<StateHandleID, Path> miscFilePaths) {
            for (Path filePath : files) {
                final String fileName = filePath.getFileName().toString();
                //文件句柄
                final StateHandleID stateHandleID = new StateHandleID(fileName);
                //sst文件和最後一次Cp sst文件對比,查找增量
                if (fileName.endsWith(SST_FILE_SUFFIX)) {
                    final boolean existsAlready =
                            baseSstFiles != null && baseSstFiles.contains(stateHandleID);

                    if (existsAlready) {
                        //對於之前已經存在的sst文件,只使用一個佔位符說明之前上傳過的,文件在共享目錄
                        sstFiles.put(stateHandleID, new PlaceholderStreamStateHandle());
                    } else {
                        //新增文件,將要被上傳的
                        sstFilePaths.put(stateHandleID, filePath);
                    }
                } else {
                    //misc文件全部上傳
                    miscFilePaths.put(stateHandleID, filePath);
                }
            }
        }

可以看到增量快照的實現邏輯就是:

  1. 通過RocksDB的Checkpoint生成當前快照的sst文件(由於LSM特性,sst文件是不可變的).
  2. Flink每次記錄當前Checkpoint id和其快照sst文件的映射關係。
  3. 上傳當前Checkpoint對應的sst文件和misc文件。
  4. 之後的Checkpoint中如果還有之前的sst文件,那這些文件就不需要在上傳到HDFS了。

可以看到Flink的增量Checkpoint就是巧妙利用了LSM 中sst文件是遞增不變的特性。

Operator state快照策略

Operator state的快照策略只有一個,即DefaultOperatorStateBackendSnapshotStrategy,它將Operator state中的ListState和BroadcastState的快照數據寫出到快照存儲端。

class DefaultOperatorStateBackendSnapshotStrategy
        implements SnapshotStrategy<
                OperatorStateHandle,
                DefaultOperatorStateBackendSnapshotStrategy
                        .DefaultOperatorStateBackendSnapshotResources> {
    private final ClassLoader userClassLoader;
    //Operator state中只有兩類state:ListState和BroadcastState
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

    protected DefaultOperatorStateBackendSnapshotStrategy(
            ClassLoader userClassLoader,
            Map<String, PartitionableListState<?>> registeredOperatorStates,
            Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates) {
        this.userClassLoader = userClassLoader;
        this.registeredOperatorStates = registeredOperatorStates;
        this.registeredBroadcastStates = registeredBroadcastStates;
    }
    ......
}

在同步準備資源階段,DefaultOperatorStateBackendSnapshotStrategy只做了一件事:深拷貝ListState和BroadcastState。深拷貝的目的就是同步創建這個時刻的快照,以保證exactly-once。

@Override
    public DefaultOperatorStateBackendSnapshotResources syncPrepareResources(long checkpointId) {
        
        //存放拷貝後的Operator state
        final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
                new HashMap<>(registeredOperatorStates.size());
        final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
                new HashMap<>(registeredBroadcastStates.size());

        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(userClassLoader);
        try {
            //將傳遞ListState和BroadcastState進行深拷貝,便於後續使用
            if (!registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }
            //拷貝broad cast state
            if (!registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        } finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }

        return new DefaultOperatorStateBackendSnapshotResources(
                registeredOperatorStatesDeepCopies, registeredBroadcastStatesDeepCopies);
    }

深拷貝完Operator state後,asyncSnapshot方法就開始異步寫快照數據到CheckpointStreamFactory了。

@Override
    public SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(
            DefaultOperatorStateBackendSnapshotResources syncPartResource,
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions) {
        ......
        return (snapshotCloseableRegistry) -> {
            //創建輸出流
            CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                    streamFactory.createCheckpointStateOutputStream(
                            CheckpointedStateScope.EXCLUSIVE);
            snapshotCloseableRegistry.registerCloseable(localOut);
            ......

            //通過OperatorBackendSerializationProxy寫快照數據到輸出流
            DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
            OperatorBackendSerializationProxy backendSerializationProxy =
                    new OperatorBackendSerializationProxy(
                            operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
            backendSerializationProxy.write(dov);

            ......
                return SnapshotResult.of(retValue);
            } else {
                throw new IOException("Stream was already unregistered.");
            }
        };
    }

Leave a Reply

Your email address will not be published. Required fields are marked *