作者:LittleMagic
之前筆者在介紹 Flink 1.11 Hive Streaming 新特性時提到過,Flink SQL 的 FileSystem Connector 為了與 Flink-Hive 集成的大環境適配,做了很多改進,而其中最為明顯的就是分區提交(partition commit)機制。
本文先通過源碼簡單過一下分區提交機制的兩個要素——即觸發(trigger)和策略(policy)的實現,然後用合併小文件的實例說一下自定義分區提交策略的方法。
PartitionCommitTrigger
在最新的 Flink SQL 中,FileSystem Connector 原生支持數據分區,並且寫入時採用標準 Hive 分區格式,如下所示。
path
└── datetime=2019-08-25
└── hour=11
├── part-0.parquet
├── part-1.parquet
└── hour=12
├── part-0.parquet
└── datetime=2019-08-26
└── hour=6
├── part-0.parquet
那麼,已經寫入的分區數據何時才能對下游可見呢?這就涉及到如何觸發分區提交的問題。根據官方文檔,觸發參數有以下兩個:
- sink.partition-commit.trigger:可選 process-time(根據處理時間觸發)和 partition-time(根據從事件時間中提取的分區時間觸發)。
- sink.partition-commit.delay:分區提交的時延。如果 trigger 是 process-time,則以分區創建時的系統時間戳為準,經過此時延後提交;如果 trigger 是 partition-time,則以分區創建時本身攜帶的事件時間戳為準,當水印時間戳經過此時延後提交。
可見,process-time trigger 無法應對處理過程中出現的抖動,一旦數據遲到或者程序失敗重啟,數據就不能按照事件時間被歸入正確的分區了。所以在實際應用中,我們幾乎總是選用 partition-time trigger,並自己生成水印。當然我們也需要通過 partition.time-extractor.*一系列參數來指定抽取分區時間的規則(PartitionTimeExtractor),官方文檔說得很清楚,不再贅述。
在源碼中,PartitionCommitTrigger 的類圖如下。
下面以分區時間觸發的 PartitionTimeCommitTrigger 為例,簡單看看它的思路。直接上該類的完整代碼。
public class PartitionTimeCommitTigger implements PartitionCommitTrigger {
private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
new ListStateDescriptor<>(
"pending-partitions",
new ListSerializer<>(StringSerializer.INSTANCE));
private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC =
new ListStateDescriptor<>(
"checkpoint-id-to-watermark",
new MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
private final ListState<List<String>> pendingPartitionsState;
private final Set<String> pendingPartitions;
private final ListState<Map<Long, Long>> watermarksState;
private final TreeMap<Long, Long> watermarks;
private final PartitionTimeExtractor extractor;
private final long commitDelay;
private final List<String> partitionKeys;
public PartitionTimeCommitTigger(
boolean isRestored,
OperatorStateStore stateStore,
Configuration conf,
ClassLoader cl,
List<String> partitionKeys) throws Exception {
this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
this.pendingPartitions = new HashSet<>();
if (isRestored) {
pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
}
this.partitionKeys = partitionKeys;
this.commitDelay = conf.get(SINK_PARTITION_COMMIT_DELAY).toMillis();
this.extractor = PartitionTimeExtractor.create(
cl,
conf.get(PARTITION_TIME_EXTRACTOR_KIND),
conf.get(PARTITION_TIME_EXTRACTOR_CLASS),
conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));
this.watermarksState = stateStore.getListState(WATERMARKS_STATE_DESC);
this.watermarks = new TreeMap<>();
if (isRestored) {
watermarks.putAll(watermarksState.get().iterator().next());
}
}
@Override
public void addPartition(String partition) {
if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
this.pendingPartitions.add(partition);
}
}
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
@Override
public void snapshotState(long checkpointId, long watermark) throws Exception {
pendingPartitionsState.clear();
pendingPartitionsState.add(new ArrayList<>(pendingPartitions));
watermarks.put(checkpointId, watermark);
watermarksState.clear();
watermarksState.add(new HashMap<>(watermarks));
}
@Override
public List<String> endInput() {
ArrayList<String> partitions = new ArrayList<>(pendingPartitions);
pendingPartitions.clear();
return partitions;
}
}
注意到該類中維護了兩對必要的信息:
- pendingPartitions/pendingPartitionsState:等待提交的分區以及對應的狀態;
- watermarks/watermarksState:<檢查點 ID, 水印時間戳>的映射關係(用 TreeMap 存儲以保證有序)以及對應的狀態。
這也說明開啟檢查點是分區提交機制的前提。snapshotState() 方法用於將這些信息保存到狀態中。這樣在程序 failover 時,也能夠保證分區數據的完整和正確。
那麼 PartitionTimeCommitTigger 是如何知道該提交哪些分區的呢?來看 committablePartitions() 方法:
- 檢查 checkpoint ID 是否合法;
- 取出當前 checkpoint ID 對應的水印,並調用 TreeMap的headMap() 和 clear() 方法刪掉早於當前 checkpoint ID 的水印數據(沒用了);
- 遍歷等待提交的分區,調用之前定義的 PartitionTimeExtractor(比如${year}-${month}-${day} ${hour}:00:00)抽取分區時間。如果水印時間已經超過了分區時間加上上述 sink.partition-commit.delay 參數,說明可以提交,並返回它們。
PartitionCommitTrigger 的邏輯會在負責真正提交分區的 StreamingFileCommitter 組件中用到(注意 StreamingFileCommitter 的並行度固定為 1,之前有人問過這件事)。StreamingFileCommitter 和 StreamingFileWriter(即 SQL 版 StreamingFileSink)的細節相對比較複雜,本文不表,之後會詳細說明。
PartitionCommitPolicy
PartitionCommitTrigger 解決了分區何時對下游可見的問題,而 PartitionCommitPolicy 解決的是對下游可見的標誌問題。根據官方文檔,我們可以通過 sink.partition-commit.policy.kind 參數進行配置,一共有三種提交策略(可以組合使用):
- metastore:向 Hive Metastore 更新分區信息(僅在使用 HiveCatalog 時有效);
- success-file:向分區目錄下寫一個表示成功的文件,文件名可以通過 sink.partition-commit.success-file.name 參數自定義,默認為_SUCCESS;
- custom:自定義的提交策略,需要通過 sink.partition-commit.policy.class 參數來指定策略的類名。
PartitionCommitPolicy 的內部實現就簡單多了,類圖如下。策略的具體邏輯通過覆寫 commit() 方法實現。
兩個默認實現 MetastoreCommitPolicy 和 SuccessFileCommitPolicy 如下,都非常容易理解。
public class MetastoreCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOG = LoggerFactory.getLogger(MetastoreCommitPolicy.class);
private TableMetaStore metaStore;
public void setMetastore(TableMetaStore metaStore) {
this.metaStore = metaStore;
}
@Override
public void commit(Context context) throws Exception {
LinkedHashMap<String, String> partitionSpec = context.partitionSpec();
metaStore.createOrAlterPartition(partitionSpec, context.partitionPath());
LOG.info("Committed partition {} to metastore", partitionSpec);
}
}
public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOG = LoggerFactory.getLogger(SuccessFileCommitPolicy.class);
private final String fileName;
private final FileSystem fileSystem;
public SuccessFileCommitPolicy(String fileName, FileSystem fileSystem) {
this.fileName = fileName;
this.fileSystem = fileSystem;
}
@Override
public void commit(Context context) throws Exception {
fileSystem.create(
new Path(context.partitionPath(), fileName),
FileSystem.WriteMode.OVERWRITE).close();
LOG.info("Committed partition {} with success file", context.partitionSpec());
}
}
Customize PartitionCommitPolicy
還記得之前做過的 Hive Streaming 實驗麼?
由上圖可見,在寫入比較頻繁或者並行度比較大時,每個分區內都會出現很多細碎的小文件,這是我們不樂意看到的。下面嘗試自定義 PartitionCommitPolicy,實現在分區提交時將它們順便合併在一起(存儲格式為 Parquet)。
Parquet 格式與普通的TextFile等行存儲格式不同,它是自描述(自帶 schema 和 metadata)的列存儲,數據結構按照 Google Dremel 的標準格式來組織,與 Protobuf 相同。所以,我們應該先檢測寫入文件的 schema,再按照 schema 分別讀取它們,並拼合在一起。
下面貼出合併分區內所有小文件的完整策略 ParquetFileMergingCommitPolicy。為了保證依賴不衝突,Parquet 相關的組件全部採用 Flink shade 過的版本。竊以為代碼寫得還算工整易懂,所以偷懶不寫註釋了。
package me.lmagics.flinkexp.hiveintegration.util;
import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);
@Override
public void commit(Context context) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();
List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), partitionPath);
MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", schema.toString());
Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}
private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
List<Path> result = new ArrayList<>();
RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}
return result;
}
private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
if (files.size() == 0) {
return null;
}
HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();
reader.close();
return schema;
}
private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
for (Path file : files) {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while((data = reader.read()) != null) {
writer.write(data);
}
reader.close();
}
writer.close();
for (Path file : files) {
fs.delete(file, false);
}
return mergeDest;
}
}
別忘了修改分區提交策略相關的參數:
'sink.partition-commit.policy.kind' = 'metastore,success-file,custom',
'sink.partition-commit.policy.class' = 'me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy'
重新跑一遍之前的 Hive Streaming 程序,觀察日誌輸出:
20-08-04 22:15:00 INFO me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy - 14 files in path /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13
// 如果看官熟悉Protobuf的話,可以發現這裡的schema風格是完全一致的
20-08-04 22:15:00 INFO me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy - Fetched parquet schema:
message hive_schema {
optional int64 ts;
optional int64 user_id;
optional binary event_type (UTF8);
optional binary from_type (UTF8);
optional binary column_type (UTF8);
optional int64 site_id;
optional int64 groupon_id;
optional int64 partner_id;
optional int64 merchandise_id;
}
20-08-04 22:15:04 INFO me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy - Files merged into /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13/result-1596550500950.parquet
最後來驗證一下,合併成功。
以上。感興趣的同學也可以動手測試~
原文鏈接:
https://www.jianshu.com/p/fb7d29abfa14
▼ 更多技術問題請移步釘釘交流群 ▼