大數據

實戰 | 利用Delta Lake使Spark SQL支持跨表CRUD操作

本文轉載自公眾號: eBay技術薈
作者 | 金瀾濤
原文鏈接:https://mp.weixin.qq.com/s/L64xhtKztwWhlBQrreiDfQ


摘要

大數據處理技術朝傳統數據庫領域靠攏已經成為行業趨勢,目前開源的大數據處理引擎,如Apache Spark、Apache Hadoop、Apache Flink等等都已經支持SQL接口,且SQL的使用往往佔據主導地位。各個公司使用以上開源軟件構建自己的ETL框架和OLAP技術,但在OLTP技術上,仍然是傳統數據庫的強項。其中的一個主要原因是傳統數據庫對ACID的支持。具有ACID能力的傳統商用數據庫基本都實現了完整的CRUD操作。而在大數據技術領域,由於缺少ACID的支持,基本只實現了C/R操作,對U/D操作很少涉及。

eBay數據倉庫的部分基礎設施是構建在商用數據產品Teradata之上的,近年來,隨著公司整體朝開源技術遷移,數據倉庫的基礎設施已基本遷移到Apache Hadoop、Apache Spark平臺。但要完全從Teradata上遷移下來,必須構建具有相同能力的SQL處理引擎。在Teradata上的分析型SQL,有超過5%的查詢使用Update/Delete操作,目前Apache Spark並不具備這個能力。

本文介紹eBay Carmel團隊利用Delta Lake,使Spark SQL支持Teradata的Update/Delete語法。對比標準SQL的Update/Delete語法,以及目前尚未正式發佈的Apache Spark 3.0 提供的語法(不含實現),我們還實現了Teradata的擴展語法,可以進行跨表更新和刪除的SQL操作。

1.簡介

Carmel Spark是Carmel團隊基於Apache Spark進行魔改的SQL-on-Hadoop引擎。主要改善了交互式分析的使用體驗,提供即席查詢(ad-hoc)服務。Carmel Spark是“Teradata退出”項目的重要組成部分,在功能性和性能上,都做了大量開發和優化。例如全新的CBO、併發調度、物化視圖、索引、臨時表、Extended Adaptive Execution、Range Partition、列級訪問權限控制,以及各類監控和管理功能等,目前已經在線上使用且滿足業務需求。

但由於Apache Spark缺少ACID事務能力,並沒有提供Update/Delete語法。去年年初,Databricks開源了存儲層Delta Lake,為Apache Spark 提供可伸縮的 ACID 事務,提供事務管理、統一流批、元數據管理、版本回溯等數據庫領域常見功能。一年過去了,Delta Lake的版本也更新到了0.5.0,但開源版本始終沒有提供Update/Delete的SQL實現。目前只提供Dataframe API,用戶需通過編寫代碼來對數據進行更新和刪除等操作。此外,根據Apache Spark 3.0分支上提供的SQL語法接口,也只支持基本的單表Update/Delete操作,對於複雜的帶有join語義的跨表操作,則完全不支持。而Teradata用戶已經在廣泛使用擴展的SQL語法對數據進行更新和刪除操作。

基於Delta Lake存儲層提供的ACID事務能力,Carmel Spark實現了Update/Delete的SQL語法,且該語法完全兼容Teradata的擴展語義,即能進行跨表的更新和刪除。同時,我們拓展了delta表的數據分佈,支持bucket delta表,並對其進行了bucket join等優化。此外,由於Carmel Spark集群部署是多租戶的,所以同一套代碼會長期運行在YARN的不同隊列中。雖然Delta Lake存儲層提供了良好的事務隔離性,但仍會出現重複操作的風險(非同一事務)。因此,我們使用delta表本身來治理delta表,即將所有delta表的元信息存儲在一張delta表中,通過對該元數據表的增刪改查操作,來對用戶使用的所有delta表進行管理。

本文的組織結構如下:第二節介紹相關技術和產品;第三節闡述項目的整體架構和實現;第四節詳細介紹如何利用Delta Lake使SparkSQL支持CRUD操作;第五節介紹delta表的bucket優化;第六節介紹delta表的自治和管理;最後兩節分別談一下未來的工作和對本文的總結。

2.相關工作

2.1 Spark SQL

Apache Spark[1]是一款開源的分佈式計算框架,誕生於2009年加州大學伯克利分校AMPLab的一個研究項目,於2013年捐贈給了Apache軟件基金會。在處理結構化數據上,Spark提供了DataFrame API和Spark SQL模塊。DataFrame API允許用戶通過表、行和列的概念對數據進行操作。

同樣,用戶可以使用SQL來操作它們。Spark SQL模塊將SQL查詢轉成一棵查詢計劃樹(query plan tree)。給定一個原始SQL查詢,該查詢首先經詞法分析和解析,轉換為邏輯查詢計劃(logical plan)。該邏輯查詢計劃經過查詢優化器,產生優化的查詢計劃(optimized plan)。最終,優化的查詢計劃被轉換為物理計劃(physical plan),物理計劃會被轉成job和task最終提交到集群上執行。

Apache Spark 3.0開始,SQL模塊提供了Update/Delete的語法定義,定義在Antlr4的語法文件裡,但並沒有具體實現,而是交由第三方實現。如圖1所示:
image.png

Teradata

Teradata[2]是Teradata Corp.開發的可橫向擴展的關係型數據庫管理系統,設計用於分析型查詢,主要用於數據倉庫領域,採用大規模並行處理(MPP)架構。Teradata對Update/Delete等語法支持非常完備,除了ANSI SQL: 2011定義的標準Update/Delete語法,Teradata還做了大量擴展,如跨表更新和刪除。其所提供的豐富的語法也給我們遷移到Spark帶來了挑戰。圖2所示為Teradata支持的更新和刪除語法:
image.png

2.3 Delta Lake

2018年初,Databricks開源了存儲層Delta Lake[3],為Apache Spark 提供可伸縮的 ACID 事務,提供事務管理、統一流批、元數據管理、版本回溯等數據庫領域常見功能。Delta Lake將其數據存儲在Parquet文件中,並提供ACID事務。它使用名為Delta Log的事務日誌來跟蹤對錶所做的所有更改。

與開源的Delta Lake相比,Databricks內部版本可以通過SQL來進行Update/Delete操作,而目前開源版本只支持DataFrame的API,只能通過Parquet[4]文件推斷表的Schema信息,對Hive Metastore[5]的支持較弱,且不支持bucket表等等。Apache Iceberg[6]和Apache Hudi[7]雖然實現形式與Delta Lake不同,但在Update/Delete的SQL語法支持上,目前都不完善。表1給出了這三個系統的對比(截止2019年11月)。

image.png

3.項目概述

有了Delta Lake在存儲層提供ACID事務保障,我們的主要工作就是利用Delta Lake,在我們的Spark版本上實現和Teradata相同的Update/Delete功能。要達到這個目標,有以下任務有待完成:

  1. Delta Lake目前只支持Apache Spark 2.4+版本,而Carmel團隊使用的Spark版本是基於2.3版本的,所以我們改了Delta Lake的部分實現併為我們的Spark版本打了一些補丁。
  1. Spark 3.0中雖然沒有Update/Delete語法的具體實現,但仍然在Catalyst[8]中加入了相關的邏輯計劃節點。不過這些新增的接口都是基於DataSourceV2的,我們需要將這部分代碼在DataSourceV1上進行重寫:

image.png

  1. Teradata支持跨表的Update/Delete語法,目前Delta Lake和Spark都不支持,我們需要自己實現帶join的跨表連接更新和刪除操作。
  1. Delta Lake目前對Catalog[9]的訪問還不成熟,delta表的schema是通過Parquet文件推斷出來的,通過Catalog訪問Hive Metastore是使用SQL訪問delta表的重要一環。
  2. 由於上述原因,delta表無法識別bucket信息,更沒有考慮讀寫bucket表時的分佈(distribution)。
  3. 在以上3,4,5步驟完成之後,還要對跨表操作進行優化,這裡將主要介紹bucket join的優化。
  4. 開源版本的Delta Lake缺少一定的管理機制,需要實現一些自動化管理功能,如自動清理和合並文件等。

4.CRUD的實現

4.1 前置工作

首先,要在我們的Spark 2.3內部版本中使用Delta Lake,就需要從社區打一些補丁。這裡重點說一下SPARK-28303。

SPARK-28303引入了基於DataSource V2的DELETE / UPDATE / MERGE語法。由於Spark 2.3不支持DataSource V2,因此我們需要將此功能移植到V1版本,在ddl.scala中增加了UpdateTableStatement和DeleteFrom Statement。Antlr4[10]的語法結構如下所示:
image.png

4.2 實現單表更新

Delta Lake目前不支持Update/Delete SQL的解析,我們增加了兩個類:DeltaSqlResolution和PreprocessTableUpdateDelete,通過SparkSessionExtensions注入到Analyzer:

image.png

DeltaSqlResolution主要是用於解析condition和assignments表達式:

image.png

再由PreprocessTableUpdateDelete生成RunnableCommand。如果是delta表的話,這裡可以從LogicalRelation中拿出delta表的TahoeFileIndex(在DataSource.scala的resolveRelation中添加的),如果是非delta表,則會拋出AnalysisException。

image.png

UpdateCommand是Delta Lake自帶的類,我們對其改動不多,主要改了如下幾個地方:

一個是鑑於目前Update操作不會更新表的統計信息(Statistics),造成delta表在進行join等操作時無法正確判斷是走SortMergeJoin還是BroadcastJoin,我們增加了catalog的訪問使delta表的CRUD操作都能更新表的統計信息。

第二個改動是增加了update/delete的row級別metrics信息。Delta Lake已經發布的0.5.0版本update和delete缺少row級別的metrics。社區最新的代碼已經做了添加,但當更新或刪除單個partition或全表時仍舊是缺少的,而我們的實現在無論何種情況下都做了收集。

4.3實現跨表更新

目前Spark3.0定義的Update/Delete語法不支持跨表操作,而跨表更新和刪除操作卻十分普遍,比如更新目標表中具有(在inner join情況下)或可能沒有(在left outer join情況下)另一個表匹配行的行。

許多數據庫都提供跨表更新和刪除的語法。下面給出了幾種常用數據庫的跨表更新的例子。

MYSQL[11]跨表更新:
image.png

Teradata的跨表更新:
image.png

PostgreSQL[12]的跨表更新:
image.png

Teradata的語法和PostgreSQL的基本一致,只是FROM子句和SET子句順序調換了一下,而MYSQL支持在一條SQL裡同時更新多張表。Carmel Spark目前參考的是Teradata的語法,同時在DeltaSqlResolution中增加了帶join的解析:

image.png

和單表Update一樣,首先對condition和SET子句進行解析。不同的是,除了被更新的target是一個LogicalRelation以外,這裡的source可以是一個LogicalRelation,也可以是多張表連接在一起的join plan。

我們從WHERE條件的condition中分離出哪些是target和source之間的join criteria,哪些是source中自身的join criteria(source可以是多表join的plan),以及哪些是分別作用在target或source上的普通Filter。同樣地,再由PreprocessTableUpdateDelete生成Runnable Command:

image.png

上述代碼中,跨表更新和單表更新的區別是多構建了一個DeltaMergeAction。可見跨表更新的實現參考了MergeInto。

UpdateWithJoinCommand是跨表更新的主要執行類,一共分為三步:

  1. 通過將需要被更新的target表和source(可以是一個帶join的plan)進行內連接(inner join)找出所有會被更新的行所涉及的文件,標記為removeFiles。這一步還能簡化後續的步驟,例如不涉及任何文件或者只涉及partition目錄時,不用全表執行第2步。
  1. 將target和source使用左外連接(left outer join),對於join條件匹配的行,使用build side iterator的數據(右表),不匹配的行使用stream side iterator的數據(左表)。將數據寫出到target表,寫出的數據文件標記為addedFiles。
  2. 將1中removeFiles和2中的addedFiles寫入transaction log中,即delta log。
    刪除操作和更新操作基本類似,可以視為更新操作的簡化版,這裡就不展開了。

4.4 實現SELECT/INSERT

對delta表的讀操作(SELECT)實際上是對delta表的解析。Delta表是DataSource表的一種。在FindDataSourceTable這條rule中,通過resolveRelation方法對delta表進行特殊處理:

image.png

這裡我們把catalogTable對象傳入到DeltaDataSource的createRelation方法裡。補充一點,之所以這個case可以匹配到DeltaDataSource,是因為我們在ConvertToDelta Command裡,通過alterTable,把provider從parquet改成了delta:

image.png

回到createRelation。通過傳入的catalogTable對象,我們在DeltaLog.scala裡將表的信息填到HadoopFsRelation裡面:

image.png

Delta表的INSERT操作也很簡單。在DataSourceStrategy中添加InsertIntoData SourceCommand:

image.png

普通delta表的insert我們沒有進行修改,這裡就不展開了,下一節講bucket表的insert時再詳細闡述這部分的改動。

4.5 創建Delta表

創建delta表(CREATE操作)目前完全複用了普通Parquet表的CREATE,只是需要在建完表後執行CONVERT TO DELTA命令。我們簡單做了一些修改,使其可以CONVERT一張空的Parquet表,目前社區版是不支持的。其他的修改主要是針對管理上的,在第六節會詳細介紹。

到此,CRUD功能的SQL實現已經基本完成。在這一節裡,我們引入了跨表更新操作,但是跨表更新涉及到join算子,這在大表之間進行更新操作時會有性能問題。在下一節中會介紹如何針對bucket表進行優化。

5.Bucket優化

跨表更新操作中,會有多次連接算子,當進行連接操作的表是上TB數據量的大表時,整個更新操作就會變得非常慢。甚至,大量數據的SortMergeJoin可能拋出OutOfMemory。事實上,在我們實際的業務場景中,就存在著大量的大表更新。例如被更新的表往往是一張幾個TB的大表,然後和另一張或幾張中型表進行連接操作。為了優化這類SQL,最容易想到的方法是通過bucket join來避免大表數據的shuffle。現實中,我們用戶的許多大表也的確做了分桶(bucket)。

然而目前delta表並不支持分桶表,相關代碼的BucketSpec都被默認填了None,對更新和刪除的操作也沒有考慮數據的分佈(Distribution)。那麼該如何實現bucket表的數據分佈呢?

5.1 創建delta bucket表和讀取

首先和Parquet表一樣,我們需要在建表時指定分桶字段。形如:CLUSTERED BY (col) [SORTED BY (col) ] INTO number BUCKETS。

在4.3小節中我們提到了在ResolveRelation時將CatalogTable對象傳入了HadoopFsRelation。有了這個CatalogTable對象,就可以幫我們在後續的各類操作中識別bucket表了。

5.2 插入數據到delta bucket表

上一步只是告訴Spark,這是一張bucket表,真正寫入數據的時候發現數據並沒有分桶分佈。這是因為Insert操作在delta表上是走InsertIntoDataSource -> InsertIntoDataSourceCommand的,而不是通過DataWritingCommand,所以也就走不到ensureDistributionAndOrdering的邏輯。以下代碼是社區版InsertIntoDataSourceCommand的實現:

image.png

如上代碼所示,它的實現非常簡單,將需要insert的邏輯計劃“query”封裝成一個data frame,然後傳入到實現類的insert方法裡。在Delta Lake中這個data frame會被傳入到TransactionalWrite的writeFiles方法中。最終從這個data frame中取出physical plan並傳入DataFormatWriter的write方法。之後就是真正的生成job並分發執行了。

從整個流程可以看出,從一開始的邏輯計劃對象“query”到最後的物理計劃,並沒有機會進行數據分佈的實現。所以不管在建表時是否指定分桶,插入數據時都不會滿足數據分佈。

鑑於目前DataSource並沒有考慮數據分佈的問題,我們在resolution階段就需要進行處理。大體就是在Catalyst裡增加一個InsertIntoDataSource的邏輯計劃節點和一個InsertIntoDataSourceExec的物理計劃節點。在InsertIntoDataSourceExec這個物理計劃中實現了requiredChildDistribution和requiredChildOrdering方法(代碼可以參考InsertIntoHadoopFsRelationCommand的requiredDistribution和requiredOrdering方法)。

這裡說一下整體流程。首先,DataSourceStrategy原本是匹配到了InsertIntoTable就會將邏輯計劃“query”原封不動地傳入InsertIntoDataSource Command。我們現在做出如下改變:增加一個新的邏輯計劃節點InsertIntoDataSource,為其添加partition,bucket等信息,並將“query”作為該新節點的child:

image.png

然後在SparkStrategy.scala的BasicOperators裡將InsertIntoDataSource節點轉成物理計劃節點InsertIntoDataSourceExec,通過planLater(i.query)得到物理計劃作為該物理節點的child。這樣InsertIntoDataSourceExec的requiredChildDistribution和requiredChildOrdering方法就可以對數據進行分佈了:

image.png

5.3 在跨表更新或刪除操作中利用bucket join

到目前為止,對delta表的改造已經使其具有了bucketSpec字段和數據分佈的特性。在跨表更新或刪除時,無論是inner join還是left outer join,只要target和source都是bucket表且滿足bucket join條件,就能走bucket join而不是SortMergeJoin。這就解決了大表之間join產生大量shuffle帶來的性能問題。

下面這個例子是跨表更新一張3.9TB的表,source則是一張5.2TB的表。圖3所示是left outer join階段,右表雖然有一個Filter,但是仍然不滿足broadcast join閾值,這個更新操作在非bucket join的情況下,會造成大量Executor OOM,最終導致job失敗。通過引入bucket join,該job在2分鐘左右就能順利完成。從圖3可以看到在SortMergeJoin的前後,已經沒有ShuffleExchange了。

圖3 跨表更新中利用bucket join避免shuffle
image.png

但是,這裡仍然可能存在問題,因為被更新的表仍然是一張bucket表,而圖3的輸出沒有考慮數據的分佈。對於bucket表尚不滿足數據分佈的情況,我們需要在SortMergeJoin之後增加一輪HashRepartition,以保證最終的結果輸出符合被更新表的數據分佈特性:

image.png

6.Delta的自治和管理

介紹完CRUD的功能和相關優化,這一節講一下我們是如何管理delta表的,主要包括:如何統計delta的使用情況,如何自動進行文件清理,如何管理TimeTraval[13]等。

在這之前我們需要簡單介紹一下eBay Carmel Spark的基本架構。eBay的Carmel Spark平臺是計算存儲分離的。數據存儲有一個專門的Hadoop集群(Apollo),Carmel Spark集群(Hermes)主要是由大內存加SSD的計算節點組成,通過YARN[14]進行調度。除了本地SSD以外,也有一部分存儲容量搭建了一個小容量的HDFS,主要是拿來做Relation Cache和物化視圖,這部分以後有機會另起一篇文章進行介紹。

我們使用Spark Thriftserver來提供JDBC和ODBC服務,但所有的Thriftserver並不是固定在某個機器上的,而是通過YARN進行調度,通過cluster mode將Spark Thriftserver提交到集群內部。同時,根據Budget Group對YARN集群分queue,不同的Budget Group有一個YARN的queue,例如廣告部門有一個queue,數據部門有一個queue,每個queue可以有多個Spark Thriftserver。Carmel Spark對scheduler模塊做過大量併發優化,經過壓測,一個Driver調度起來的任務能把200臺物理機的所有CPU壓滿。所以Driver調度並不是瓶頸,目前最大的一個queue僅使用一個Thriftserver就可以調度近7000個executors。

image.png

目前有多少個queue,就有多少個Thriftserver,也就有多少個Application。但不同的Thriftserver仍然共享了一些組件,例如HDFS,Hive Metastore等。這就要求我們對所有的queue做一些管理。例如在物化視圖功能中,當對一張基礎表構建物化視圖後,所有的queue都需要在內存裡構建一些邏輯計劃樹。delta表的管理也類似,不過相比物化視圖簡單的多。例如我們要對所有的delta表進行自動化的文件清理工作,一種方式是起一個後臺線程遍歷Hive Metastore的所有表,對provider是delta的表進行處理。這樣的好處是不需要跨Thriftserver進行任何消息的同步,壞處自然是不斷遍歷Hive Metastore帶來的壓力(多集群公用的Hive Metastore壓力已經比較大了)。所以我們使用了一種更加直觀的方式進行管理,即用delta表來管理delta表。

我們創建了一張名為carmel_system.carmel_ delta_meta的表,記錄瞭如表名、owner、deltalog路徑、是否自動清理、清理週期等元信息,並將其CONVERT成一張delta表。所以carmel_delta_meta表的第一條記錄就是自己的信息。然後我們提供了一套操作這張表的API,以調用靜態方法的方式放在DeltaTableMetadata類的半生對象中:
image.png

如下所示,當用戶對一張表執行CONVERT TO DELTA命令時,會生成一個事件,通過DeltaTableListener捕獲後將該delta表的元信息寫入carmel_delta_meta,當用戶刪除delta表時,DropTableEvent同樣可以觸發上圖的刪除操作API,從carmel_delta_meta刪除這條記錄:

image.png

另外在YARN的保留隊列(reserved queue只允許管理員權限連接)裡啟動一個DeltaValidate線程,通過讀取carmel_delta_meta中的數據進行驗證,觸發如刪除記錄等操作。同時,如果用戶在CONVERT TO DELTA時指定了Vacuum保留時間:
image.png
或是一開始沒有指定保留時間,後續通過命令VACUUM AUTO RUN進行修改:
image.png
DeltaValidate線程會自動生成Vacuum任務,並丟到Vacuum線程池調度執行。這裡就不貼代碼了。整個架構如圖5所示:
image.png

此外,我們還增加了TimeTravel的SQL語義,用戶可以通過在SELECT命令裡增加AT關鍵字,單次讀取delta表某個version的快照。也可以通過ROLLBACK命令永久回到某個版本:

image.png

通過carmel_delta_meta中記錄的一些表的血緣信息,可以實現delta表的及聯回滾。在某個delta表rollback後,觸發器根據carmel_delta_meta的血緣信息,自動回滾其他相關表(這需要事先定義在carmel_delta_meta的rollback依賴樹和觸發器條件,該功能目前還未上線)。

上面介紹了通過delta表來管理delta表的方式,這一方法能很好地幫我們解耦隊列同步和外部系統依賴的問題,既方便靈活,又快捷安全。

7.未來的工作

7.1 持續的性能優化

Carmel Spark項目經過兩年的技術迭代,已經具備非常多的功能和優化,例如Range Partation、Optimized Bucket Join、Broadcast/Local Cache、Extended Adaptive Execution、Parquet File Index、Materialized View、ACL、Volcano CBO、Adaptive Runtime Filter、Mutiple Files Scan等,如何讓新的功能如CRUD複用以上優化和特性,也變得越來越富有挑戰了。例如我在測試時發現Broadcast Cache和Mutiple Files Scan兩個功能在和CRUD功能集成時存在bug,又或者目前的Volcano CBO和Parquet File Index還不能應用在delta表上等。

此外,在跨表更新操作上,大表連接的優化目前只針對bucket表,但是當兩張非bucket表進行連接時,性能仍然不夠好。這裡就有多個優化點,比如Adaptive Runtime Filter,就是Join Pushdown,可以將join表的min/max或者join key的bloomfilter推到兩邊進行過濾,以減少參與連接的記錄數,目前只完成了在inner join下的部分功能。

7.2 更完備的語義

除了性能的優化,Carmel Spark作為Teradata戰略代替品,需要儘可能兼容Teradata的語義,後續如果有用戶需要MERGE INTO或者UPSERT操作,這部分還要繼續擴展。此外,目前UPDATE和DELETE的WHERE條件還不支持子查詢,CONVERT TO DELTA不支持Parquet Format的Hive表,這些都將是後續的工作。

7.3 高度自治的管理

第6節最後提到過的及聯回滾功能,以及對delta表的審計和監控都屬於平臺管理的範疇。這些有的已經具備成熟的解決方案,如我們已經有完全和Teradata對標的列級訪問權限控制和審計功能。有的還在不斷完善,如用於File Index和Materialized View的Hive Metastore同步機制還沒有上線,目前用的還是過渡方案。這部分不止針對delta表,有些還可以應用於整個Carmel Spark。

8.實施和總結

8.1 技術之外

最後簡單說一下項目的情況。這個項目找到我的時候是在2019年的10月底,我剛上線完Spark臨時表功能,物化視圖項目也還陸陸續續有一些bug fix的工作要做,所以真正開始投入去做應該是在11月中旬。
CRUD功能目標上線時間是在2020年的2月份,不像物化視圖這類優化型項目,功能型項目承諾上線時間的要求往往更高一些。加之期間還有春節假期,oncall和各種bug fix的工作,對於該項目來說排期還是比較緊的。

此外,我們對Delta Lake的成熟度和性能也比較擔憂(現實也驗證了Delta Lake的開源版本在SQL成熟度上的確不足)。實踐中發現除了ACID這個核心功能不用操心以外,基本上都要二次開發。最後和我們使用的基於社區2.3版本進行魔改的Carmel Spark的集成相比,也存在許多挑戰。

再說一下為什麼選擇Delta Lake。目前來看,除了Delta Lake之外,Apache Hudi和Apache Iceberg也能完成ACID的功能。當時選擇Delta Lake一是因為它是Databricks的產品,在Databricks內部版本比較成熟,長期來看其開源版本也會和Apache Spark更加緊密。二是當時公司內部還有一個準實時數倉的項目,立項也是使用Delta Lake。考慮到儘可能保持技術棧一致,我們選擇了Delta Lake,而且單從這個項目上Apache Hudi和Apache Iceberg並沒有特別的優勢。

最後說一下用戶支持,其實做一個項目最複雜也是最耗時的並不是編碼階段,而是上線後接受用戶的考驗。該功能的第一批用戶是來自eBay瑞士的財務部門分析師團隊,因為不在同一個時區,春節假期裡幾乎每晚都會通過Zoom和我溝通。這種在用戶和開發者之間的持續交流,使得一些隱藏的問題即時浮現出來,用戶也得到了較好的使用體驗。我們的Carmel Spark每週都會有半個小時的例行發佈窗口,用戶遇到的bug幾乎都在下次發佈窗口時得到了修復。在這一週中,我們也會找出workaround方式,幫助用戶進度的推進。目前該功能已經在所有隊列上啟用,越來越多的用戶開始參與試用。

8.2 總結

本文從源碼角度講解如何利用Delta Lake使老版本的Spark SQL支持跨表的CRUD操作,以及我們所做的優化和管理工作。最後,簡單介紹了未來的工作方向以及項目實施上的一些感悟,希望能對閱讀者有所幫助。

參考文獻

[1] https://spark.apache.org/
[2] https://www.teradata.com/
[3] https://delta.io/
[4] https://parquet.apache.org/
[5] https://hive.apache.org/
[6] https://iceberg.apache.org/
[7] https://hudi.apache.org/
[8]https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
[9]http://blog.madhukaraphatak.com/introduction-to-spark-two-part-4/
[10] https://www.antlr.org/
[11] https://www.mysql.com/
[12] https://www.postgresql.org/
[13]https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
[14]https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
image.png

對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。Apache
image.png

Spark技術交流社區公眾號,微信掃一掃關注
image.png

Leave a Reply

Your email address will not be published. Required fields are marked *