大數據

數據工程師眼中的 Delta lake

一、Delta Lake的誕生

相信作為一個數據工程師,心中都有這麼一個理想的工具:

  • 可以持續不斷地對各種各樣的數據源進行增量處理;
  • 批流合一;
  • 處理速率高效,智能化生成報表;
  • ······

image.png

想要實現上面的工具,一個最簡單的辦法就是先用一個Spark Streaming Job把各種各樣的數據源寫到一個表中,如下圖,然後再根據業務需求選擇是用流作業還是批作業去進行相應的查詢工作。但是,這種方式會存在一些問題,比如因為是流式寫入,會產生大量的小文件,對後續的性能產生很大的影響。

image.png

面對上面遇到的小文件問題,一個改進的方法如下圖所示,是在上述方法中創建的表之後加一個批作業定時的將小文件合併起來,但是這個改進方法仍然有明顯的缺點,那就是存在著小時級別的延遲,這種級別的延遲對於很多業務來講是無法滿足要求的。

image.png

為了解決上述延遲問題,Lambda架構暢行一時。其架構思路如下圖所示,簡單說就是分別用流和批的方式對數據源處理兩次,然後將批和流的視角合起來提供給後續業務。Lambda架構雖然解決了上述的問題,但是也存在自身的缺點:

  • 因為業務邏輯在要用批和流的方式處理兩次,而批和流的處理方式不一致,可能會導致某些問題;
  • 如果處理邏輯中加入了數據校驗的工作,就需要在批和流上分別校驗兩次,一旦需要回滾等操作,數據修正也需要進行兩次,費時費力;
  • 如果涉及到Merge、Update等操作,也需要進行兩次修改,使得整個事務變得複雜;
  • ······

image.png

上面的幾種方案都有自己的缺點,Lambda架構雖然看似有效但是架構過於複雜。那麼,有沒有一種方案可以將Lambda架構進行簡化呢?其實,我們的目標很簡單,就是讓流作業處理我們的源數據,並且後續作業可以批流統一的處理,具體來說有:

  • 保證數據的一致性;
  • 保證每次是增量的讀取;
  • 能夠做回滾;
  • 能夠訪問歷史記錄。

結合以上幾點目標,有了目前的解決方案:Delta Lake + Structured Streaming = The Delta Architecture。這套方案的優點很明顯,首先是批流合一的,其次Delta Lake可以很方便的做時間旅行類似的操作,且Delta Lake是單純的儲存層,與計算層分離,符合當前雲數據計算的大方向,方便用戶靈活的進行擴容。

二、Delta Lake的工作原理

Delta Lake的核心是其事務日誌,它的表跟普通的表沒有大的區別,但是在表下會建立一個隱藏文件,其中的JSON存儲了一些關於事務的記錄,如下圖所示。

image.png

因此,在Delta Lake中,讀取一張表也會重放這張錶的歷史記錄,比如表的重命名、修改Schema等等操作。

更細節地來說,在Delta Lake中的每個JSON文件都是一次commit,這個commit是原子性的,保存了事務相關的詳細記錄。另外,Delta Lake還可以保證多個用戶同時commit而不會產生衝突,它用的是一種樂觀處理的方式,其邏輯如下圖所示。這種解決衝突的方案適用於寫比較少,讀取比較多的場景,大家在使用的時候要注意場景是否適用。

image.png

假設我們要處理一個非常大的表,有百萬級別的文件,那麼如何高效的處理元數據呢?Delta Lake的處理方案如下圖所示,用Spark來讀取事務日誌,然後Delta Lake隔一段時間對commit做一次合併,之後可以從Checkpoint啟動後續的操作。

image.png

總結起來,Delta Lake解決數據一致性、增量讀取、歷史回溯等問題的方案即為下圖所示。

image.png

三、Demo

從以下鏈接大家可以看到詳細的Demo展示,還有詳細的社區版本(免費)Databricks的設置方法:https://github.com/delta-io/delta/tree/master/examples/tutorials/saiseu19

Demo中提供了Python API和Scala API的實現文件,大家可以根據自己的實際情況進行嘗試。上面鏈接的Demo中展示的主要features有:

  • Schema Enforcement:在做Pipeline的時候我們一定要保證數據質量,因此Schema Enforcement可以幫助我們做到這點。
  • Schema Evolution:隨著公司業務的發展,一開始的表結構可能不適用於當前的業務,Schema Evolution可以幫助我們進行表結構的演化。
  • Delete from Delta Lake table:Delete操作可以控制表的無限制增長,並且通過事務日誌來進行操作,實際上數據沒有被刪掉,只是在Log中進行了標記。
  • Audit Delta Lake Table History:通過此功能可以看到對錶的詳細歷史操作。
  • Travel back in time:有了錶的歷史數據,我們便可以訪問表在各個歷史節點的數據。
  • Vacuum old versions of Delta Lake tables:Delta Lake通過標記的方式來實現刪除,隨著時間的增長會佔用大量儲存空間,Vacuum操作將刪除在一定時間內從表中刪除的數據文件,實現物理刪除,默認會保留七天內的數據。
  • Upsert into Delta Lake table using Merge:在一個命令中同時做update和insert操作。
    上述Features的具體代碼可以在Github中查看。

四、Q&A

Q1:上文介紹的Demo是開源的嗎?
A1:是的,大家可以在Github上下載後進行嘗試。

Q2:是否可以純SQL實現?
A2:Delta Lake是一個數據儲存層,如果是與Hive等引擎做整合,沒有提供delete等操作,就只能用Delta Lake自己的API,提供了Scala、Python的API;如果使用的是Spark 3.0的話,像Merge、Delete等都提供了SQL語法,可以直接用SQL開發,但是某些語法沒有對應的Spark API,所以還是要用Delta Lake來操作。


關鍵詞:Databricks、Spark、Delta Lake、Schema Enforcement

Leave a Reply

Your email address will not be published. Required fields are marked *