大數據

Flink SQL FileSystem Connector 分區提交與自定義小文件合併策略

作者:LittleMagic

之前筆者在介紹 Flink 1.11 Hive Streaming 新特性時提到過,Flink SQL 的 FileSystem Connector 為了與 Flink-Hive 集成的大環境適配,做了很多改進,而其中最為明顯的就是分區提交(partition commit)機制。

本文先通過源碼簡單過一下分區提交機制的兩個要素——即觸發(trigger)和策略(policy)的實現,然後用合併小文件的實例說一下自定義分區提交策略的方法。

PartitionCommitTrigger

在最新的 Flink SQL 中,FileSystem Connector 原生支持數據分區,並且寫入時採用標準 Hive 分區格式,如下所示。

path
└── datetime=2019-08-25
    └── hour=11
        ├── part-0.parquet
        ├── part-1.parquet
    └── hour=12
        ├── part-0.parquet
└── datetime=2019-08-26
    └── hour=6
        ├── part-0.parquet

那麼,已經寫入的分區數據何時才能對下游可見呢?這就涉及到如何觸發分區提交的問題。根據官方文檔,觸發參數有以下兩個:

  • sink.partition-commit.trigger:可選 process-time(根據處理時間觸發)和 partition-time(根據從事件時間中提取的分區時間觸發)。
  • sink.partition-commit.delay:分區提交的時延。如果 trigger 是 process-time,則以分區創建時的系統時間戳為準,經過此時延後提交;如果 trigger 是 partition-time,則以分區創建時本身攜帶的事件時間戳為準,當水印時間戳經過此時延後提交。

可見,process-time trigger 無法應對處理過程中出現的抖動,一旦數據遲到或者程序失敗重啟,數據就不能按照事件時間被歸入正確的分區了。所以在實際應用中,我們幾乎總是選用 partition-time trigger,並自己生成水印。當然我們也需要通過 partition.time-extractor.*一系列參數來指定抽取分區時間的規則(PartitionTimeExtractor),官方文檔說得很清楚,不再贅述。

在源碼中,PartitionCommitTrigger 的類圖如下。

1.png

下面以分區時間觸發的 PartitionTimeCommitTrigger 為例,簡單看看它的思路。直接上該類的完整代碼。

public class PartitionTimeCommitTigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
            new ListStateDescriptor<>(
                    "pending-partitions",
                    new ListSerializer<>(StringSerializer.INSTANCE));

    private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC =
            new ListStateDescriptor<>(
                    "checkpoint-id-to-watermark",
                    new MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));

    private final ListState<List<String>> pendingPartitionsState;
    private final Set<String> pendingPartitions;

    private final ListState<Map<Long, Long>> watermarksState;
    private final TreeMap<Long, Long> watermarks;
    private final PartitionTimeExtractor extractor;
    private final long commitDelay;
    private final List<String> partitionKeys;

    public PartitionTimeCommitTigger(
            boolean isRestored,
            OperatorStateStore stateStore,
            Configuration conf,
            ClassLoader cl,
            List<String> partitionKeys) throws Exception {
        this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        this.pendingPartitions = new HashSet<>();
        if (isRestored) {
            pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
        }

        this.partitionKeys = partitionKeys;
        this.commitDelay = conf.get(SINK_PARTITION_COMMIT_DELAY).toMillis();
        this.extractor = PartitionTimeExtractor.create(
                cl,
                conf.get(PARTITION_TIME_EXTRACTOR_KIND),
                conf.get(PARTITION_TIME_EXTRACTOR_CLASS),
                conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));

        this.watermarksState = stateStore.getListState(WATERMARKS_STATE_DESC);
        this.watermarks = new TreeMap<>();
        if (isRestored) {
            watermarks.putAll(watermarksState.get().iterator().next());
        }
    }

    @Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
            this.pendingPartitions.add(partition);
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        if (!watermarks.containsKey(checkpointId)) {
            throw new IllegalArgumentException(String.format(
                    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
                    checkpointId, watermarks));
        }

        long watermark = watermarks.get(checkpointId);
        watermarks.headMap(checkpointId, true).clear();

        List<String> needCommit = new ArrayList<>();
        Iterator<String> iter = pendingPartitions.iterator();
        while (iter.hasNext()) {
            String partition = iter.next();
            LocalDateTime partTime = extractor.extract(
                    partitionKeys, extractPartitionValues(new Path(partition)));
            if (watermark > toMills(partTime) + commitDelay) {
                needCommit.add(partition);
                iter.remove();
            }
        }
        return needCommit;
    }

    @Override
    public void snapshotState(long checkpointId, long watermark) throws Exception {
        pendingPartitionsState.clear();
        pendingPartitionsState.add(new ArrayList<>(pendingPartitions));

        watermarks.put(checkpointId, watermark);
        watermarksState.clear();
        watermarksState.add(new HashMap<>(watermarks));
    }

    @Override
    public List<String> endInput() {
        ArrayList<String> partitions = new ArrayList<>(pendingPartitions);
        pendingPartitions.clear();
        return partitions;
    }
}

注意到該類中維護了兩對必要的信息:

  • pendingPartitions/pendingPartitionsState:等待提交的分區以及對應的狀態;
  • watermarks/watermarksState:<檢查點 ID, 水印時間戳>的映射關係(用 TreeMap 存儲以保證有序)以及對應的狀態。

這也說明開啟檢查點是分區提交機制的前提。snapshotState() 方法用於將這些信息保存到狀態中。這樣在程序 failover 時,也能夠保證分區數據的完整和正確。

那麼 PartitionTimeCommitTigger 是如何知道該提交哪些分區的呢?來看 committablePartitions() 方法:

  1. 檢查 checkpoint ID 是否合法;
  2. 取出當前 checkpoint ID 對應的水印,並調用 TreeMap的headMap() 和 clear() 方法刪掉早於當前 checkpoint ID 的水印數據(沒用了);
  3. 遍歷等待提交的分區,調用之前定義的 PartitionTimeExtractor(比如${year}-${month}-${day} ${hour}:00:00)抽取分區時間。如果水印時間已經超過了分區時間加上上述 sink.partition-commit.delay 參數,說明可以提交,並返回它們。

PartitionCommitTrigger 的邏輯會在負責真正提交分區的 StreamingFileCommitter 組件中用到(注意 StreamingFileCommitter 的並行度固定為 1,之前有人問過這件事)。StreamingFileCommitter 和 StreamingFileWriter(即 SQL 版 StreamingFileSink)的細節相對比較複雜,本文不表,之後會詳細說明。

PartitionCommitPolicy

PartitionCommitTrigger 解決了分區何時對下游可見的問題,而 PartitionCommitPolicy 解決的是對下游可見的標誌問題。根據官方文檔,我們可以通過 sink.partition-commit.policy.kind 參數進行配置,一共有三種提交策略(可以組合使用):

  • metastore:向 Hive Metastore 更新分區信息(僅在使用 HiveCatalog 時有效);
  • success-file:向分區目錄下寫一個表示成功的文件,文件名可以通過 sink.partition-commit.success-file.name 參數自定義,默認為_SUCCESS;
  • custom:自定義的提交策略,需要通過 sink.partition-commit.policy.class 參數來指定策略的類名。

PartitionCommitPolicy 的內部實現就簡單多了,類圖如下。策略的具體邏輯通過覆寫 commit() 方法實現。

2.png

兩個默認實現 MetastoreCommitPolicy 和 SuccessFileCommitPolicy 如下,都非常容易理解。

public class MetastoreCommitPolicy implements PartitionCommitPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(MetastoreCommitPolicy.class);

    private TableMetaStore metaStore;

    public void setMetastore(TableMetaStore metaStore) {
        this.metaStore = metaStore;
    }

    @Override
    public void commit(Context context) throws Exception {
        LinkedHashMap<String, String> partitionSpec = context.partitionSpec();
        metaStore.createOrAlterPartition(partitionSpec, context.partitionPath());
        LOG.info("Committed partition {} to metastore", partitionSpec);
    }
}

public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(SuccessFileCommitPolicy.class);

    private final String fileName;
    private final FileSystem fileSystem;

    public SuccessFileCommitPolicy(String fileName, FileSystem fileSystem) {
        this.fileName = fileName;
        this.fileSystem = fileSystem;
    }

    @Override
    public void commit(Context context) throws Exception {
        fileSystem.create(
                new Path(context.partitionPath(), fileName),
                FileSystem.WriteMode.OVERWRITE).close();
        LOG.info("Committed partition {} with success file", context.partitionSpec());
    }
}

Customize PartitionCommitPolicy

還記得之前做過的 Hive Streaming 實驗麼?

3.png

由上圖可見,在寫入比較頻繁或者並行度比較大時,每個分區內都會出現很多細碎的小文件,這是我們不樂意看到的。下面嘗試自定義 PartitionCommitPolicy,實現在分區提交時將它們順便合併在一起(存儲格式為 Parquet)。

4.png

Parquet 格式與普通的TextFile等行存儲格式不同,它是自描述(自帶 schema 和 metadata)的列存儲,數據結構按照 Google Dremel 的標準格式來組織,與 Protobuf 相同。所以,我們應該先檢測寫入文件的 schema,再按照 schema 分別讀取它們,並拼合在一起。

下面貼出合併分區內所有小文件的完整策略 ParquetFileMergingCommitPolicy。為了保證依賴不衝突,Parquet 相關的組件全部採用 Flink shade 過的版本。竊以為代碼寫得還算工整易懂,所以偷懶不寫註釋了。

package me.lmagics.flinkexp.hiveintegration.util;

import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
  private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

  @Override
  public void commit(Context context) throws Exception {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    String partitionPath = context.partitionPath().getPath();

    List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
    LOGGER.info("{} files in path {}", files.size(), partitionPath);

    MessageType schema = getParquetSchema(files, conf);
    if (schema == null) {
      return;
    }
    LOGGER.info("Fetched parquet schema: {}", schema.toString());

    Path result = merge(partitionPath, schema, files, fs);
    LOGGER.info("Files merged into {}", result.toString());
  }

  private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
    List<Path> result = new ArrayList<>();

    RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
    while (dirIterator.hasNext()) {
      LocatedFileStatus fileStatus = dirIterator.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
        result.add(filePath);
      }
    }

    return result;
  }

  private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
    if (files.size() == 0) {
      return null;
    }

    HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
    ParquetFileReader reader = ParquetFileReader.open(inputFile);
    ParquetMetadata metadata = reader.getFooter();
    MessageType schema = metadata.getFileMetaData().getSchema();

    reader.close();
    return schema;
  }

  private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
    Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
    ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
      .withType(schema)
      .withConf(fs.getConf())
      .withWriteMode(Mode.CREATE)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build();

    for (Path file : files) {
      ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
        .withConf(fs.getConf())
        .build();
      Group data;
      while((data = reader.read()) != null) {
        writer.write(data);
      }
      reader.close();
    }
    writer.close();

    for (Path file : files) {
      fs.delete(file, false);
    }

    return mergeDest;
  }
}

別忘了修改分區提交策略相關的參數:

'sink.partition-commit.policy.kind' = 'metastore,success-file,custom', 
'sink.partition-commit.policy.class' = 'me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy'

重新跑一遍之前的 Hive Streaming 程序,觀察日誌輸出:

20-08-04 22:15:00 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - 14 files in path /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13

// 如果看官熟悉Protobuf的話,可以發現這裡的schema風格是完全一致的
20-08-04 22:15:00 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - Fetched parquet schema: 
message hive_schema {
  optional int64 ts;
  optional int64 user_id;
  optional binary event_type (UTF8);
  optional binary from_type (UTF8);
  optional binary column_type (UTF8);
  optional int64 site_id;
  optional int64 groupon_id;
  optional int64 partner_id;
  optional int64 merchandise_id;
}

20-08-04 22:15:04 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - Files merged into /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13/result-1596550500950.parquet

最後來驗證一下,合併成功。

5.png

以上。感興趣的同學也可以動手測試~

原文鏈接:
https://www.jianshu.com/p/fb7d29abfa14

▼ 更多技術問題請移步釘釘交流群 ▼

最新釘群二維碼.jpeg

Leave a Reply

Your email address will not be published. Required fields are marked *