原文鏈接:https://blog.csdn.net/lsshlsw/article/details/103553289
博客主:breeze_lsw
01
SQL 支持
1.1 DML
背景
delta lake 0.4 只支持以 api 的方式使用 Delete/Update/Merge Into 等 DML,對習慣了使用 sql 的終端用戶會增加其學習使用成本。
解決方式
下文通過 spark sql extension 以插件化的方式擴展 sql parser ,增加 DML 語法的支持。在 spark 推出 sql extension 功能前,也可以用通過 aspectj 通過攔截 sql 的方式實現增加自定義語法的功能。
1.在自定義擴展 g4 文件中相應的 antlr4 DML 語法,部分參考了 databricks 商業版的語法
statement
: DELETE FROM table=qualifiedName tableAlias
(WHERE where=booleanExpression)? #deleteFromTable
| UPDATE table=qualifiedName tableAlias upset=setClause
(WHERE where=booleanExpression)? #updateTable
| MERGE INTO target=qualifiedName targetAlias=tableAlias
USING (source=qualifiedName |
'(' sourceQuery=query')') sourceAlias=tableAlias
ON mergeCondition=booleanExpression
matchedClause*
notMatchedClause* #mergeIntoTable
2.實現對應的 visit,將 sql 翻譯為 delta api,以最簡單的 delete 為例
override def visitDeleteFromTable(ctx: DeleteFromTableContext): AnyRef = withOrigin(ctx) {
DeleteTableCommand(
visitTableIdentifier(ctx.table),
Option(getText(ctx.where)))
}
case class DeleteTableCommand(table: TableIdentifier,
where: Option[String]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
DeltaUtils.deltaTableCheck(sparkSession, table, "DELETE")
val deltaTable = DeltaUtils.getDeltaTable(sparkSession, table)
if (where.isEmpty) {
deltaTable.delete()
} else {
deltaTable.delete(where.get)
}
Seq.empty[Row]
}
}
3.啟動 Spark 時加載打包的 extension jar ,初始化 SparkSession 時指定 Extension 類。
val spark = SparkSession.builder
.enableHiveSupport()
.config("spark.sql.extensions", "cn.tongdun.spark.sql.TDExtensions")
tip
spark 3 之前不支持配置多個 extension ,如果遇到使用多個 extension 的情況,可以將多個 extension 在一個 extension 代碼中進行注入。
以同時增加 tispark extension 和 自定義 extension 為例
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser(TiParser(getOrCreateTiContext))
extensions.injectResolutionRule(TiDDLRule(getOrCreateTiContext))
extensions.injectResolutionRule(TiResolutionRule(getOrCreateTiContext))
extensions.injectPlannerStrategy(TiStrategy(getOrCreateTiContext))
extensions.injectParser { (session, parser) => new TDSparkSqlParser(session, parser)}
}
1.2 Query
識別 delta table 有三種實現方式
- 使用相應表名前綴/後綴作為標識
- 在 table properties 中增加相應的參數進行識別
- 判斷表目錄下是否存在_delta_log
我們一開始是使用 delta_ 的前綴作為 delta 表名標識,這樣實現最為簡單,但是如果用戶將 hive(parquet) 錶轉為 hive(delta) ,要是表名發生變化則需要修改相關代碼,所以後面改為在table propertie 中增加相應的參數進行識別。
也可以通過判斷是否存在 _delta_log 文件識別,該方式需要在建表時寫入帶有 schema 信息的空數據。
Query 通過對sql執行進行攔截,判斷 Statement 為 SELECT 類型,然後將 delta 表的查詢翻譯成對應的 api 進行查詢。
if (statementType == SELECT) {
TableData tableData = (TableData) statementData.getStatement();
sql = DatasourceAdapter.selectAdapter(tableData, sparkSession, sql);
}
1.3 Insert
Insert 需要考慮 INSERT_VALUES/INSERT_SELECT ,還有分區表/非分區表以及寫入方式的一些情況。
sql 類型判斷
if (INSERT_SELECT == statementType) {
isDeltaTable = DatasourceAdapter.deltaInsertSelectAdapter(sparkSession, statementData);
} else if (INSERT_VALUES == statementType) {
isDeltaTable = DatasourceAdapter.deltaInsertValuesAdapter(sparkSession, statementData);
}
INSERT_INTO 需要從 catalog 中獲取對應的 schema 信息,並將 values 轉化為 dataFrame
val rows = statementData.getValues.asScala.map(_.asScala.toSeq).map { x => Row(x: _*) }
import spark.implicits._
val schemaStr = spark.catalog.listColumns(dbName, tableName)
.map(col => col.name + " " + col.dataType)
.collect().mkString(",")
val schema = StructType.fromDDL(schemaStr)
val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows), schema)
INSERT_SELECT 則直接訪問被解析過的 Delta Query 子句。
partition
由於 delta api 的限制,不支持靜態分區,可以從 tableMeta 中解析到對應的動態分區名,使用 partitionBy 寫入即可。
至此,已經實現使用 apache spark 2.4 使用 sql 直接操作 delta table 表。
02
平臺化工作
與 hive metastore 的集成,表數據管理 等平臺化的一些工作。
2.1 瀏覽 delta 數據
用戶在平臺上點擊瀏覽數據,如果通過 delta api ,啟動 spark job 的方式從 HDFS 讀取數據,依賴重,延時高,用戶體驗差。
基於之前在 parquet 格式上的一些工作,瀏覽操作可以簡化為找出 delta 事務日誌中還存活 (add - remove) 的 parquet 文件進行讀取,這樣就避免了啟動 spark 的過程,大多數情況能做到毫秒級返回數據。
需要注意的是,_delta_log 文件只存在父目錄,瀏覽某個分區的數據同樣需要瀏覽父目錄獲取相應分區內的存活文件。
// DeltaHelper.load 方法會從 _delta_log 目錄中找到存活 parquet 文件,然後使用 ParquetFileReader 讀取
List<Path> inputFiles;
if (DeltaHelper.isDeltaTable(dir, conf)) {
inputFiles = DeltaHelper.load(dir, conf);
} else {
inputFiles = getInputFilesFromDirectory(projectCode, dir);
}
從 delta 0.5 開始,瀏覽數據的功能可以通過 manifest 文件進行更簡單的實現,具體內容可以參考下一篇文章。
2.2 瀏覽 delta 數據
將原生 delta lake 基於 path 的工作方式與 hive metastore 進行兼容。
數據寫入/刪除
數據動態分區插入 - 統計寫入的分區信息(我們是通過修改了 spark write 部分的代碼得到的寫入分區信息),如果分區不存在則自動增加分區 add partition if ...。還有一種更簡單的做法是直接使用 msck repair table ,但是這種方式在分區多的情況下,性能會非常糟糕。
刪除分區 - 在界面上操作對某個分區進行刪除時,後臺調用 delta 刪除api,並更新相關 partition 信息。
元數據信息更新
元數據中表/分區記錄數,大小等元數據的更新支持。
2.3 碎片文件整理
- 非 delta lake 表小文件整理方式可以參考我之前在 csdn 上的文章。這種方式採用的是在數據生成後校驗,如果有碎片文件則進行同步合併,Spark 小文件合併優化實踐:
- 非 delta lake 表的小文件整理使用的是同步模式,可能會影響到下有任務的啟動時間。
基於 delta lake 的小文件整理要分為兩塊,存活數據和標記刪除的數據
- 標記刪除的數據
被 delta 刪除的數據,底層 parquet 文件依舊存在,只是在 delta_log 中做了標記,讀取時跳過了該文件。
可以使用 delta 自帶的 vacuum 功能刪除一定時間之前標記刪除的數據。
- 存活數據
可以實現一個 compaction 功能,在後臺定時做異步合併,由於 delta 支持事務管理的特性,該過程對用戶透明,合併過程中保證了數據一致性且不會中斷任務。
03
結語
3.1 一些限制
由於 delta api 的限制,目前 delta delete / update 不支持子句,可以使用 merge into 語法實現相同功能。
由於 delta api 的限制,只支持動態分區插入。
3.2 merge 使用場景
upsert
有 a1,a2 兩張表,如果 a.1eventId = a2.eventId ,則 a2.data 會覆蓋 a1.data,否則將 a2 表中相應的數據插入到 a1 表
MERGE INTO bigdata.table1 a1
USING bigdata.table2 a2
ON a1.eventId = a2.eventId
WHEN MATCHED THEN
UPDATE SET a1.data = a2.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (a2.date, a2.eventId, a2.data)
ETL 避免數據重複場景
如果 uniqueid 只存在於 a2 表,則插入 a2 表中的相應記錄
MERGE INTO logs a1
USING updates a2
ON a1.uniqueId = a2.uniqueId
WHEN NOT MATCHED
THEN INSERT *
維度表更新場景
- 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 為 true ,則刪除 a1 表相應記錄
- 如果 a1 和 a2 表的合作方相同,且 a2 中的 deleted 為 false ,則將 a2 表相應記錄的 value 更新到 a1 表中
- 如果沒有匹配到相應合作方,且 a2 中 deleted 為 fasle ,則將 a2 表相應記錄插入到 a1 表
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a2.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET a1.value = a2.newValue
WHEN NOT MATCHED AND a2.deleted = false THEN INSERT (partnerCode, value) VALUES (partnerCode, newValue)
歷史數據清理場景
如果 a1 和 a2 表的合作方相同,則刪除 a1 表中 ds < 20190101 的所有數據
MERGE INTO logs a1
USING updates a2
ON a1.partnerCode = a2.partnerCode
WHEN MATCHED AND a1.ds < '20190101' THEN
DELETE
阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。
Apache Spark技術交流社區公眾號,微信掃一掃關注