該功能與我們之前平臺化 Delta Lake 平臺化實踐(離線篇) 的很多工作都較為相似,比如與 metastore 的集成,直接通過 manifest 讀取 delta 存活文件等。
Delta Lake 在 0.5 之前只支持通過 Spark 讀取數據,在新版本中增加了其他處理引擎通過 manifest 文件訪問 Delta Lake 的能力。下文以Presto 為例說明如何通過 manifest 文件訪問數據,manifest 文件的生成及其一些限制。
01 使用
Presto 使用 manifest 文件從 hive 外部表中讀取數據,manifest文件是一個文本文件,包含該表/分區所有存活數據的路徑列表。
當使用 manifest 文件在 Hive metastore 中定義外部表時,Presto 將會先讀取 manifest 中的文件路徑列表再去訪問想要的文件,而不是直接通過目錄列表來查找文件。
1.1 通過 Spark 生成 manifest 文件
支持 sql / scala / java / python 四種 api,以 sql 和 scala 為例。
SQL
GENERATE symlink_format_manifest FOR TABLE delta.`pathToDeltaTable`
Scala
val deltaTable = DeltaTable.forPath(pathToDeltaTable)
deltaTable.generate("symlink_format_manifest")
使用 GENERATE 命令會在/path/to/deltaTable/_symlink_format_manifest/ 目錄下生成一個 manifest 文件,其中包含了所有存活的文件路徑。
查看清單文件
cat /path/to/deltaTable/_symlink_format_manifest/manifest
hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-0a69ce8d-0d9e-47e2-95b2-001bd196441d-c000.snappy.parquet
hdfs://tdhdfs-cs-hz/user/hive/warehouse/bigdata.db/delta_lsw_test/part-00000-ba1767cb-ff0e-4e65-8e83-7a0cdce6a2f4-c000.snappy.parquet
如果是分區表,例如以 ds 作為分區字段,生成的結構如果下,每個分區下都有一個 manifest 文件包含了該分區的存活文件路徑。
/path/to/table/_delta_log
/path/to/table/ds=20190101
/path/to/table/ds=20190102
/path/to/table/_symlink_format_manifest
---- /path/to/table/_symlink_format_manifest/ds=20190101/manifest
---- /path/to/table/_symlink_format_manifest/ds=20190102/manifest
存活文件定義:add file - remove file
1.2 定義 Hive Metastore 外部表讀取相應文件
CREATE EXTERNAL TABLE mytable ( ... ) -- 與 Delta table 一致的 schema 信息
PARTITIONED BY ( ... ) -- 分區參數可選,需要與 Delta table 一致
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '<pathToDeltaTable>/_symlink_format_manifest/' -- 指定 manifest 地址
通過 SymlinkTextInputFormat ,Presto 可以直接從 manifest 中讀取需要的文件而不需要直接定位到數據目錄。
如果是分區表的話,在運行 generate 後,需要運行 MSCK REPAIR TABLE 使 Hive Metastore 能發現最新的分區。使用 repair 有兩種場景:
- 每次清單文件生成後調用:每次 generate 都調用 repair,這種方式在分區多的情況下性能表現會非常糟糕,我們的做法是在數據寫入時從 spark 獲取到相應的變更分區然後依次執行 ADD PARTITION操作。
- 在需要新分區的時候調用:如果是按天粒度的分區表,可以選擇在午夜12點創建新分區同時執行 generate 後運行一次 repair 。
important: 如果使用了 kerberos 認證,必須要在 presto 目錄的etc/catalog/hive.properties 中配置 yarn-site.xml,否則在查詢數據時會提示錯誤
com.facebook.presto.spi.PrestoException: Can't get Master Kerberos principal for use as renewer
at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:191)
at com.facebook.presto.hive.util.ResumableTasks.safeProcessTask(ResumableTasks.java:47)
at com.facebook.presto.hive.util.ResumableTasks.access$000(ResumableTasks.java:20)
at com.facebook.presto.hive.util.ResumableTasks$1.run(ResumableTasks.java:35)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Can't get Master Kerberos principal for use as renewer
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:206)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadPartition(BackgroundHiveSplitLoader.java:304)
at com.facebook.presto.hive.BackgroundHiveSplitLoader.loadSplits(BackgroundHiveSplitLoader.java:258)
at com.facebook.presto.hive.BackgroundHiveSplitLoader.access$300(BackgroundHiveSplitLoader.java:93)
at com.facebook.presto.hive.BackgroundHiveSplitLoader$HiveSplitLoaderTask.process(BackgroundHiveSplitLoader.java:187)
... 7 more
02 Generate 過程
Generate 命令生成 manifest 的邏輯並不複雜,有興趣的同學可以看下,方法入口:
DeltaGenerateCommand ->
GenerateSymlinkManifest.generateFullManifest(spark: SparkSession,deltaLog: DeltaLog)
- 在分區表每個分區或者非分區表中原子性的更新 manifest 文件
def writeSingleManifestFile(
manifestDirAbsPath: String,
dataFileRelativePaths: Iterator[String]): Unit = {
val manifestFilePath = new Path(manifestDirAbsPath, "manifest")
val fs = manifestFilePath.getFileSystem(hadoopConf.value)
fs.mkdirs(manifestFilePath.getParent())
val manifestContent = dataFileRelativePaths.map { relativePath =>
DeltaFileOperations.absolutePath(tableAbsPathForManifest, relativePath).toString
}
val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value)
logStore.write(manifestFilePath, manifestContent, overwrite = true)
}
// 我修復了 Delta 0.5 刪除非分區表失效的 BUG,已將 PR 提交社區
val newManifestPartitionRelativePaths =
if (fileNamesForManifest.isEmpty && partitionCols.isEmpty) {
writeSingleManifestFile(manifestRootDirPath, Iterator())
Set.empty[String]
} else {
withRelativePartitionDir(spark, partitionCols, fileNamesForManifest)
.select("relativePartitionDir", "path").as[(String, String)]
.groupByKey(_._1).mapGroups {
(relativePartitionDir: String, relativeDataFilePath: Iterator[(String, String)]) =>
val manifestPartitionDirAbsPath = {
if (relativePartitionDir == null || relativePartitionDir.isEmpty) manifestRootDirPath
else new Path(manifestRootDirPath, relativePartitionDir).toString
}
writeSingleManifestFile(manifestPartitionDirAbsPath, relativeDataFilePath.map(_._2))
relativePartitionDir
}.collect().toSet
}
- 刪除分區表中失效分區的 manifest 文件
val existingManifestPartitionRelativePaths = {
val manifestRootDirAbsPath = fs.makeQualified(new Path(manifestRootDirPath))
if (fs.exists(manifestRootDirAbsPath)) {
val index = new InMemoryFileIndex(spark, Seq(manifestRootDirAbsPath), Map.empty, None)
val prefixToStrip = manifestRootDirAbsPath.toUri.getPath
index.inputFiles.map { p =>
val relativeManifestFilePath =
new Path(p).toUri.getPath.stripPrefix(prefixToStrip).stripPrefix(Path.SEPARATOR)
new Path(relativeManifestFilePath).getParent.toString
}.filterNot(_.trim.isEmpty).toSet
} else Set.empty[String]
}
val manifestFilePartitionsToDelete =
existingManifestPartitionRelativePaths.diff(newManifestPartitionRelativePaths)
deleteManifestFiles(manifestRootDirPath, manifestFilePartitionsToDelete, hadoopConf)
03 一些限制
3.1 數據一致性
在 Delta Lake 更新 manifest 時,它會原子的自動覆蓋現有的 manifest 文件。因此,Presto 將始終看到一致的數據文件視圖,然而,保證一致性的粒度取決於表是否分區。
非分區表
所有的文件路徑都寫在一個會原子更新的 manifest 文件中(參考上文結構),這種情況下 Presto 能看到一致性快照。
分區表
manifest 文件將以 hive 分區的目錄結構 (參考上文結構),這意味著每個分區都是原子更新,所以 Presto 能看到一個分區內的一致性視圖而不是跨分區的一致性視圖。此外,由於所有的分區並不是同時更新,所以讀取時可能會在不同分區中讀到不同 manifest 版本。
簡單的說,當在更新清單文件時,Presto 發起讀請求,由於 manifest 所有分區並不是一次原子更新操作,所以有可能得到的結果並不是最新的數據。
3.2 性能
大量的文件數量會造成 Presto 性能下降,官方的建議是在執行 generate 生成 manifest 前先對文件進行 compact 操作。分區表的單個分區或是非分區表的文件數量不超過1000。
3.3 Schema 推斷
原生的 Delta Lake 支持 schema evolution,意味著無論 hive metastore 定義的 schema 如何,都會基於文件使用最新的 Schema。由於 Presto 直接使用了定義在 hive metastore 中的 schema ,所以如果要修改 schema 信息,必須要對錶進行相應更新 。
04 後記
一些BUG
測試過程中還發現了一個 BUG,如果將非分區表的數據全部刪除,則 generate 後 manifest 不會更新。
已將 PR 提交社區 https://github.com/delta-io/delta/issues/275
實踐經驗
首先,由於需要額外的調用 generate 命令生成/更新 manifest 文件,使用體驗肯定不如直接通過 Spark 讀取數據。
其次,在 generate 過程中進行數據讀取有可能會遇到跨分區查詢版本不一致的情況,但是瑕不掩瑜,通過 manifest,與大數據生態其他處理引擎的道路被打開了。
就像在 Delta Lake 平臺化實踐(離線篇) 這篇文章中提到的,我們的大數據平臺有一個功能是表數據/分區數據預覽,通過 spark 去查用戶體驗會相當差(耗時長),我們之前的做法是自定義了一個工具類在查詢時從 _delta_log中生成 manifest,再通過 manifest 獲取到的文件路徑直接從文件系統中讀取 Parquet 實現,有了 generate 功能,就可以直接讀取 manifest 文件,外部系統擴展工作量極大的簡化。
在我們的生產環境中,presto 和 spark 使用的同一套 hive metastore ,但是 spark 直接讀取上述創建的外部表會報錯(就算能讀也會有一致性風險),解決辦法是在平臺攔截了 sql 方法,通過判斷 table properties 識別 delta 表,然後將其直接轉化為 delta api 對數據進行操作,Presto 則是直接訪問外表,解決了衝突的問題。
----
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Apache Spark技術交流社區公眾號,微信掃一掃關注