大數據

Delta Lake 如何幫助雲用戶解決數據實時入庫問題

嘉賓簡介:辛現銀,花名辛庸,阿里巴巴計算平臺事業部 EMR 技術專家,Apache Hadoop,Apache Spark contributor,對 Hadoop、Spark、Hive、Druid 等大數據組件有深入研究。目前從事大數據雲化相關工作,專注於計算引擎、存儲結構、數據庫事務等內容,今天為大家介紹Delta Lake 如何幫助雲用戶解決數據實時入庫的問題。

直播回放:https://developer.aliyun.com/live/2894

以下是視頻內容精華整理。


一、CDC簡介

CDC是Change Data Capture的縮寫,也就是改變數據捕獲。比如在最開始的時候我們用工具將業務數據導入數據倉庫、數據湖當中,之後導入數據的時候我們希望反映數據的動態變化,進行增量導入,並且能夠儘快的捕獲這些變化數據,以便更快地進行後續的分析,而CDC技術能夠幫助我們捕獲這些變化的數據。
截屏2020-07-03 下午3.42.12.png

大數據場景下我們常用的工具是Sqoop,它是一個批處理模式的工具,我們可以用它把業務庫中的數據導入到數據倉庫。需要注意的時候我們在導入之前要在業務庫中的數據中選出能反映時間變化的字段,然後依據時間戳將發生變化的數據導入數據倉庫中,這是使用它的一個限制。另外,這個工具還有如下幾個缺點:

  • 對源庫產生壓力;
  • 延遲大,依賴於調用它的頻次;
  • 無法處理delete事件,源庫中被delete的數據無法同步在數倉中被delete;
  • 無法應對schema變動,一旦源庫中的scheme發生變化,就在對數倉中的表模型重新建模和導入。

除了使用 sqoop,還有一種方式是使用binlog 的方式進行數據同步。源庫在進行插入、更新、刪除等操作的時候會產生binlog,我們只需要將binlog打入KafKa,從 Kafka 中讀取 binlog,逐條解析後執行對應的操作即可。但是這種方式要求下游能夠支持比較頻繁的update/delete操作,以應對上游頻繁的 update/delete 情形。這裡可以選擇KUDU或者HBASE 作為目標存儲。但是,由於KUDU和HBASE不是數倉,無法存儲全量的數據,所以需要定期把其中的數據導入到Hive中,如下圖所示。需要注意的是,這種方式存在多個組件運維壓力大、Merge邏輯複雜等缺點。

截屏2020-07-03 下午3.46.27.png

二、基於Spark Streaming SQL & Delta 的CDC方案

(一)Spark Streaming SQL

Spark Streaming SQL是阿里巴巴計算平臺事業部EMR團隊基於Spark Streaming開發的SQL支持,社區版本是沒有的。Spark Streaming SQL在這套CDC方案中不是必須的,但是它對於用戶更加的友好,尤其是對習慣於使用SQL的用戶來說,因此 EMR 團隊開發了 Spark Streaming SQL 的支持。如下圖所示,EMR 的 Spark Streaming SQL在諸多方面實現了對SQL語法的支持,比如DDL、DML、SELECT等等,下面撿幾個分別予以介紹。

截屏2020-07-03 下午3.47.25.png

(1)CREATE SCAN & CREATE STREAM

下面所示的是一個例子,我們的目標是從KafKa中的一張表中select一些數據,設計目標是儘可能的支持批和流兩種方式。在普通的SQL中,實際上select就產生了讀操作,但是這裡為了區分batch和Streaming,我們需要顯式的create scan,因為我們無法從data source上區分是batch讀還是Streaming讀,如果是batch,我們就使用 USING batch,如果是Streaming,我們就使用USING stream。
截屏2020-07-03 下午3.48.46.png

對於 batch scan,在create scan之後就可以直接從scan中select,把scan當作一張表;然而對於Streaming,如果要讀這個scan,就需要設計很多參數,因為要發起一個job,於是有了如下圖所示的create stream語法,其本質是對select語法的封裝。

截屏2020-07-03 下午3.49.25.png

(2)MERGE INTO

另外一個比較核心的語法是MERGE INTO,其在Delta Lake的CDC方案中有著非常重要的地位。MERGE INTO的語法是比較複雜的,具體如下圖所示。需要注意的是MERGE INTO中的mergeCondition必須在源表和target表中產生一一對應的關係,不然如果一條 source record 對應多條 target records,系統就不知道應該對哪條進行操作了。所以這裡實際上要求 mergeCondition 是一個主鍵連接,或者等同於主鍵連接的效果。

截屏2020-07-03 下午3.50.10.png

除了上面介紹的幾個語法,為了大家更加方便地使用Spark Streaming SQL,我們還實現了一些其他的UDF,比如DELAY、TUMBLING等。

(二)Delta Lake

數據湖是近些年比較火熱的一個技術。早先大家用的都是一些比較成熟的數據倉庫系統,數據通過 ETL 導入到數倉。數倉的典型用途是用於 BI 報表之類的分析場景,場景比較有限。在移動互聯網時代,數據來源更加豐富多樣,數據結構也不僅僅是結構化數據,數據用途也不僅限於分析,於是出現了數據湖。數據先不做,或者僅做簡單的處理導入到數據湖,然後再進行篩選、過濾、轉換等 transform 操作,於是數倉時代的 ETL 變成了數據湖時代的 ELT。

數據湖的典型架構是上層一個/或者多個分析引擎/或者其他計算框架,下層架設一個分佈式存儲系統,如下圖左邊所示。但是這種原始的數據湖用法是缺少管理的,比如缺少事務的支持,缺少數據質量的校驗等等,一切數據管理完全靠人工手動保證。

截屏2020-07-03 下午3.51.27.png

Delta Lake 就是在統一的存儲層上面架上一層管理層,以解決人們手動管理數據湖數據的痛點。加上了一層管理層,首先我們就可以引入meta data管理,有了meta data管理,如果數據有schema,我們就可以管理schema,在數據入庫的過程中對數據質量進行校驗,並將不符合的數據剔除。另外,管理了meta data,還可以實現ACID Transactions,也就是事務的特性。在沒有管理層的時候如果進行併發的操作,多個操作之間可能互相影響,比如一個用戶在查詢的時候另外一個用戶進行了刪除操作,有了事務的支持,就可以避免這種情況,在事務的支持下,每個操作都會生成一個快照,所有操作會生成一個快照序列,方便進行時間上的回溯,也就是時間旅行。

Data Warehouse、Data Lake和Delta Lake三者的主要特性對比如下圖所示。可以看出,Delta Lake相當於結合了Data Warehouse和Data Lake的優點,引入一個管理層,解決了大部分兩者的缺點。

截屏2020-07-03 下午3.52.03.png

(三)基於Spark Streaming SQL & Delta 的CDC方案

那麼,我們現在回到我們的主題,即,如何實現基於Spark Streaming SQL & Delta 的CDC方案呢?如下圖所示,還是先從binlog到KafKa,與之前的方式不同的是無需將KafKa中的binlog回放到HBASE或者KUDU,而是直接放入Delta Lake即可。這種方案使用方便,無需額外運維,Merge邏輯容易實現,且幾乎是一個實時的數據流。

截屏2020-07-03 下午3.53.31.png

上述方案的具體操作步驟如下圖所示。其本質就是不斷的將每一個mini batch給Merge INTO到目標表中。由於 Spark Streaming 的 mini batch 調度建個可以設置在秒級,因此該方案實現了近實時的數據同步。
截屏2020-07-03 下午3.54.01.png

在該方案的實際執行的過程中我們也遇到了一些問題,最主要的就是小文件問題,比如每五秒執行一次batch,那麼一天就會有非常多的batch,可能產生海量的小文件,嚴重影響表的查詢性能。對於小文件問題,其解決思路有以下幾個:

  • 增大調度批次間隔:如果對實時性要求不是很高,可以增大調度批次間隔,減少小文件產生的頻率;
  • 小文件合併:進行小文件的合併,減小小文件的數量,其語法如下:
    OPTIMIZE WHERE where_clause]
  • 自適應執行:自適應執行可以合併一些小的reduce task,從而減少小文件數量。

對於小文件合併的optimize觸發我們做了兩種方式。第一種是自動化的optimize,就是在每一個mini batch執行完之後都進行檢測是否需要進行合併,如果不需要就跳到下一個mini batch,判斷的規則有很多,比如小文件達到一定數量、總得文件體積達到一定大小就進行合併,當然在合併的時候也進行了一些優化,比如過濾掉本身已經比較大的文件。自動化的optimize方式每過一定數量的batch就要進行一次merge操作,可能對數據數據攝入造成一定影響,因此還有第二種方式,就是定期執行optimize的方式,這種方式對於數據實時攝入沒有影響。但是,定期執行optimize的方式會存在事務衝突的問題,也就是optimize與流衝突,對於這種情況我們優化了Delta內部的事務提交機制,讓insert流不必失敗,如果在optimize之前進行了update/delete,而optimize成功了,那麼在成功之後要加一個重試的過程,以免流斷掉。

OPTIMIZE的實現也是比較複雜的,我們開發了bin-packing機制和自適應機制,達成的效果就是在OPTIMIZE後所有文件(除了最後一個)都達到目標大小(比如128M),而不論是否做了 re-partition。

截屏2020-07-03 下午3.55.34.png

三、未來工作

未來,以下幾方面將會是我們的工作目標:

(1)自動Schema檢測

使用Delta Lake的用戶接觸的可能不只是業務數據,還可能有機器數據。在很多場景下,機器數據的字段可能會發生變化。對於這種場景的用戶來說,迫切需要一種自動Schema檢測的機制。下一階段我們的目標就是在binlog解析的時候能夠自動檢測新增字段、變化字段等,並且反映在Delta表中。

(2)流式Merge性能(Merge on Read)

上面提到了Spark Streaming SQL & Delta 的CDC方案本質上是發起了一個流處理,然後按照mini batch將數據merge到目標表中,merge的實現實際上是一個join,當表越來越大的時候merge性能會越來越差,嚴重影響性能。解決這個問題的方式是採用merge on read的方式,就是類似於HIVE的方式,是我們下一步的目標。

(3)更易用的體驗

可以看到,上文提到的CDC方案還是需要用戶有一定的專業知識,並且需要手動做一些工作,下一步我們希望能夠提供更易用的體驗,進一步降低用戶的使用負擔。


關鍵詞:Delta Lake、CDC、實時數倉、OPTIMIZE、Spark Streaming SQL


EMR釘釘產品交流群
產品群.JPG

對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註“進群”)進入技術交流微信群。

image.png

Apache Spark技術交流社區公眾號,微信掃一掃關注

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *