01 覆蓋寫
INSERT OVERWRITE TABLE 分區表時分三種場景:
- 動態分區 - 寫入前會刪除所有分區,並根據數據中的分區字段寫入相應新分區
- 靜態分區 - 只會對指定的分區進行覆蓋寫操作
- 混合分區(動態+靜態分區) - 上述兩種情況的結合
如果想通過 SQL 轉化為上述 API ,首先需要在 sql parser 的時候獲取到 insertMode 和 partitions 信息,並將 partitions 信息存在一個有序的結構中,例如 LinkedHashMap。然後利用這些信息,就可以拼裝進行拼裝實現上述三種場景。
1.1 動態分區
對所有 ds 分區進行覆蓋寫操作,將會清空所有 ds 分區
sql
INSERT OVERWRITE TABLE db.tableA partition(ds)
select name,ds from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite").partitionBy(ds)
1.2 靜態分區
對 ds=20200101 的分區進行覆蓋寫操作,Delta 不能直接將數據寫入分區目錄
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101)
select name from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 20200101").partitionBy(ds)
1.3 混合分區
對 ds=20200101 中的所有 event 的分區進行覆蓋寫操作,將會清空所有 event 分區
sql
INSERT OVERWRITE TABLE db.tableA partition(ds=20200101,event)
select name,event from db.tableB
Delta Lake API
df.write.format("delta").mode("overwrite")
.option("replaceWhere", "ds = 2020010
02 後記
- 分區操作,一定要保證 partition 信息的有序
- 新表需要從 hive metastore 中獲取 partition 信息,Delta Table 在第一次寫入數據前,是不會生成 _DELTA_LOG 目錄的,此時可以從 hive metastore 中獲取建表時的分區名和其對應的類型,例如:
//ddl: `ds` INT COMMENT 'ds'
val ddl = spark.sharedState.externalCatalog.getTable(dbName, tableName).partitionSchema.toDDL
val partitionNameAndType = new mutable.HashMap[String, String]()
ddl.split(",").foreach { r =>
val x = r.split(" ")
partitionNameAndType.put(x(0).replace("`", ""), x(1))
}
-語義不同
Hive Table 直接使用 insert overwrite 動態分區只會覆蓋數據涉及到的分區,而 Spark 和 Delta Lake 的 API 則會將所有所有分區進行覆蓋。Spark 2.3 以後也可以通過下述API實現 Hive insert overwrite 語義
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
-動態分區覆蓋寫是高危操作
該操作很有可能會刪除一些你不期望的數據,所以 Delta Lake 目前的 API 提供了 replaceWhere option 進行約束
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Spark技術交流社區公眾號,微信掃一掃關注