雲計算

聚焦 | 數據湖分析如何面向對象存儲OSS進行優化?

背景

數據湖當前在國內外是比較熱的方案,MarketsandMarkets https://www.marketsandmarkets.com/Market-Reports/data-lakes-market-213787749.html市場調研顯示預計數據湖市場規模在2024年會從2019年的79億美金增長到201億美金。一些企業已經構建了自己的雲原生數據湖方案,有效解決了業務痛點;還有很多企業在構建或者計劃構建自己的數據湖。Gartner 2020年發佈的報告顯示https://www.gartner.com/smarterwithgartner/the-best-ways-to-organize-your-data-structures/目前已經有39%的用戶在使用數據湖,34%的用戶考慮在1年內使用數據湖。隨著對象存儲等雲原生存儲技術的成熟,一開始大家會先把結構化、半結構化、圖片、視頻等數據存儲在對象存儲中。當需要對這些數據進行分析時,會選擇比如Hadoop或者阿里雲的雲原生數據湖分析服務DLA進行數據處理。對象存儲相比部署HDFS在分析性能上面有一定的劣勢,目前業界做了廣泛的探索和落地。

一、基於對象存儲分析面臨的挑戰

1、什麼是數據湖

Wikipedia上說數據湖是一類存儲數據自然/原始格式的系統或存儲,通常是對象塊或者文件,包括原始系統所產生的原始數據拷貝以及為了各類任務而產生的轉換數據,包括來自於關係型數據庫中的結構化數據(行和列)、半結構化數據(如CSV、日誌、XML、JSON)、非結構化數據(如email、文檔、PDF、圖像、音頻、視頻)。

從上面可以總結出數據湖具有以下特性:

  • 數據來源:原始數據、轉換數據
  • 數據類型:結構化數據、半結構化數據、非結構化數據、二進制
  • 數據湖存儲:可擴展的海量數據存儲服務

2、數據湖分析方案架構
image.png

主要包括五個模塊:

  • 數據源:原始數據存儲模塊,包括結構化數據(Database等)、半結構化(File、日誌等)、非結構化(音視頻等);
  • 數據集成:為了將數據統一到數據湖存儲及管理,目前數據集成主要分為三種形態外表關聯、ETL、異步元數據構建;
  • 數據湖存儲:目前業界數據湖存儲包括對象存儲以及自建HDFS。隨著雲原生的演進,對象存儲在擴展性、成本、免運維有大量的優化,目前客戶更多的選擇雲原生對象存儲作為數據湖存儲底座,而不是自建HDFS。
  • 元數據管理:元數據管理,作為連接數據集成、存儲和分析引擎的總線;
  • 數據分析引擎:目前有豐富的分析引擎,比如Spark、Hadoop、Presto等。

3、面向對象存儲分析面臨的挑戰

對象存儲相比HDFS為了保證高擴展性,在元數據管理方面選擇的是扁平的方式;元數據管理沒有維護目錄結構,因此可以做到元數據服務的水平擴展,而不像HDFS的NameNode會有單點瓶頸。同時對象存儲相比HDFS可以做到免運維,按需進行存儲和讀取,構建完全的存儲計算分離架構。但是面向分析與計算也帶來了一些問題:

  • List慢:對象存儲按照目錄/進行list相比HDFS怎麼慢這麼多?
  • 請求次數過多:分析計算的時候怎麼對象存儲的請求次數費用比計算費用還要高?
  • Rename慢:Spark、Hadoop分析寫入數據怎麼一直卡在commit階段?
  • 讀取慢:1TB數據的分析,相比自建的HDFS集群居然要慢這麼多!
  • ......

4、業界面向對象存儲分析優化現狀

上面這些是大家基於對象存儲構建數據湖分析方案遇到的典型問題。解決這些問題需要理解對象存儲相比傳統HDFS的架構區別進行針對性的優化。目前業界做了大量的探索和實踐:

  • JuiceFS:維護獨立的元數據服務,使用對象存儲作為存儲介質。通過獨立元數據服務來提供高效的文件管理語義,比如list、rename等。但是需要部署額外服務,所有的分析讀取對象存儲依賴該服務;
  • Hadoop:由於Hadoop、Spark寫入數據使用的是基於OutputCommitter兩階段提交協議,在OutputCommitter V1版本在commitTask以及commitJob會進行兩次rename。在對象存儲上面進行rename會進行對象的拷貝,成本很高。因此提出了OutputCommitter V2,該算法只用做一次rename,但是在commitjob過程中斷會產生髒數據;
  • Alluxio:通過部署獨立的Cache服務,將遠程的對象存儲文件Cache到本地,分析計算本地讀取數據加速;
  • HUDI:當前出現的HUDI、Delta Lake、Iceberg通過metadata的方式將dataset的文件元信息獨立存儲來規避list操作 ,同時提供和傳統數據庫類似的ACID以及讀寫隔離性;
  • 阿里云云原生數據湖分析服務DLA:DLA服務在讀寫對象存儲OSS上面做了大量的優化,包括Rename優化、InputStream優化、Data Cache等。

二、DLA面向對象存儲OSS的架構優化

由於對象存儲面向分析場景具有上面的問題,DLA構建了統一的DLA FS層來解決對象存儲元信息訪問、Rename、讀取慢等問題。DLA FS同時支持DLA的Serverless Spark進行ETL讀寫、DLA Serverless Presto數據交互式查詢、Lakehouse入湖建倉數據的高效讀取等。面向對象存儲OSS的架構優化整體分為四層:

  • 數據湖存儲OSS:存儲結構化、半結構化、非結構化,以及通過DLA Lakehouse入湖建倉的HUDI格式;
  • DLA FS:統一解決面向對象存儲OSS的分析優化問題,包括Rename優化、Read Buffer、Data Cache、File List優化等;
  • 分析負載:DLA Serverless Spark主要讀取OSS中的數據ETL後再寫回OSS,Serverless Presto主要對OSS上面建倉的數據進行交互式查詢;
  • 業務場景:基於DLA的雙引擎Spark和Presto可以支持多種模式的業務場景。

image.png

三、DLA FS面向對象存儲OSS優化技術解析

下面主要介紹DLA FS面向對象存儲OSS的優化技術:

1、Rename優化

在Hadoop生態中使用OutputCommitter接口來保證寫入過程的數據一致性,它的原理類似於二階段提交協議。

開源Hadoop提供了Hadoop FileSystem的實現來讀寫OSS文件,它默認使用的OutputCommitter的實現是FileOutputCommitter。為了數據一致性,不讓用戶看到中間結果,在執行task時先把結果輸出到一個臨時工作目錄,所有task都確認輸出完成時,再由driver統一將臨時工作目錄rename到生產數據路徑中去。如下圖:

image.png

由於OSS相比HDFS它的Rename操作十分昂貴,是copy&delete操作,而HDFS則是NameNode上的一個元數據操作。在DLA的分析引擎繼續使用開源Hadoop的FileOutputCommitter性能很差,為了解決這個問題,我們決定在DLA FS中引入OSS Multipart Upload特性來優化寫入性能。

3.1 DLA FS支持Multipart Upload模式寫入OSS對象

阿里雲OSS支持Multipart Upload功能,原理是把一個文件分割成多個數據片併發上傳,上傳完成後,讓用戶自己選擇一個時機調用Multipart Upload的完成接口,將這些數據片合併成原來的文件,以此來提高文件寫入OSS的吞吐。由於Multipart Upload可以控制文件對用戶可見的時機,所以我們可以利用它代替rename操作來優化DLA FS在OutputCommitter場景寫OSS時的性能。

基於Multipart Upload實現的OutputCommitter,整個算法流程如下圖:

image.png

利用OSS Multipart Upload,有以下幾個好處:

  • 寫入文件不需要多次拷貝。可以看到,本來昂貴的rename操作已經不需要了,寫入文件不需要copy&delete。另外相比於rename,OSS 的completeMultipartUpload接口是一個非常輕量的操作。
  • 出現數據不一致的機率更小。雖然如果一次要寫入多個文件,此時進行completeMultipartUpload仍然不是原子性操作,但是相比於原先的rename會copy數據,他的時間窗口會縮短很多,出現數據不一致的機率會小很多,可以滿足絕大部分場景。
  • rename中的文件元信息相關操作不再需要。經過我們的統計,算法1中一個文件的元數據操作可以從13次下降到6次,算法2則可以從8次下降到4次。

OSS Multipart Upload中控制用戶可見性的接口是CompleteMultipartUpload和abortMultipartUpload,這種接口的語義類似於commit/abort。Hadoop FileSystem標準接口沒有提供commit/abort這樣的語義。

為了解決這個問題,我們在DLA FS中引入Semi-Transaction層。

3.2 DLA FS引入Semi-Transaction層

前面有提到過,OutputCommitter類似於一個二階段提交協議,因此我們可以把這個過程抽象為一個分佈式事務。可以理解為Driver開啟一個全局事務,各個Executor開啟各自的本地事務,當Driver收到所有本地事務完成的信息之後,會提交這個全局事務。

基於這個抽象,我們引入了一個Semi-Transaction層(我們沒有實現所有的事務語義),其中定義了Transaction等接口。在這個抽象之下,我們把適配OSS Multipart Upload特性的一致性保證機制封裝進去。另外我們還實現了OSSTransactionalOutputCommitter,它實現了OutputCommitter接口,上層的計算引擎比如Spark通過它和我們DLA FS的Semi-Transaction層交互,結構如下圖:

image.png

下面以DLA Serverless Spark的使用來說明DLA FS的OSSTransactionalOutputCommitter的大體流程:

  1. setupJob。Driver開啟一個GlobalTransaction,GlobalTransaction在初始化的時候會在OSS上新建一個隱藏的屬於這個GlobalTransaction的工作目錄,用來存放本job的文件元數據。
  2. setupTask。Executor使用Driver序列化過來的GlobalTransaction生成LocalTransaction。並監聽文件的寫入完成狀態。
  3. Executor寫文件。文件的元數據信息會被LocalTransaction監聽到,並儲存到本地的RocksDB裡面,OSS遠程調用比較耗時,我們把元數據存儲到本地RocksDB上等到後續一次提交能夠減少遠程調用耗時。
  4. commitTask。當Executor調用LocalTransaction commit操作時,LocalTransaction會上傳這個Task它所相關的元數據到OSS對應的工作目錄中去,不再監聽文件完成狀態。
  5. commitJob。Driver會調用GlobalTransaction的commit操作,全局事務會讀取工作目錄中的所有元數據中的待提交文件列表,調用OSS completeMultipartUpload接口,讓所有文件對用戶可見。

引入DLA FS的Semi-Transaction,有兩個好處:

  • 它不對任何計算引擎的接口有依賴,因此後面可以比較方便的移植到另外的一個計算引擎,通過適配可以將它所提供的實現給Presto或者其它計算引擎使用。
  • 可以在Transaction的語義下添加更多的實現。例如對於分區合併的這種場景,可以加入MVCC的特性,在合併數據的同時不影響線上對數據的使用。

2、InputStream優化

用戶反饋OSS請求費用高,甚至超過了DLA費用(OSS請求費用=請求次數×每萬次請求的單價÷10000)。調查發現,是因為開源的OSSFileSystem在讀取數據的過程中,會按照512KB為一個單位進行預讀操作。例如,用戶如果順序讀一個1MB的文件,會產生兩個對OSS的調用:第一個請求讀前512KB,第二個請求讀後面的512KB。這樣的實現就會造成讀大文件時請求次數比較多,另外由於預讀的數據是緩存在內存裡面的,如果同時讀取的文件比較多,也會給內存造成一些壓力。

image.png

因此,在DLA FS的實現中,我們去掉了預讀的操作,用戶調用hadoop的read時,底層會向OSS請求讀取從當前位置到文件結尾整個範圍的數據,然後從OSS返回的流中讀取用戶需要的數據並返回。這樣如果用戶是順序讀取,下一個read調用就自然從同一個流中讀取數據,不需要發起新的調用,即使順序讀一個很大的文件也只需要一次對OSS的調用就可以完成。

另外,對於小的跳轉(seek)操作,DLA FS的實現是從流中讀取出要跳過的數據並丟棄,這樣也不需要產生新的調用,只有大的跳轉才會關閉當前的流並且產生一個新的調用(這是因為大的跳轉讀取-丟棄會導致seek的延時變大)。這樣的實現保證了DLA FS的優化在ORC/Parquet等文件格式上面也會有減少調用次數的效果。

image.png

3、Data Cache加速

基於對象存儲OSS的存儲計算分離的架構,通過網絡從遠端存儲讀取數據仍然是一個代價較大的操作,往往會帶來性能的損耗。雲原生數據湖分析DLA FS中引入了本地緩存機制,將熱數據緩存在本地磁盤,拉近數據和計算的距離,減少從遠端讀取數據帶來的延時和IO限制,實現更小的查詢延時和更高的吞吐。

3.1 Local Cache架構

我們把緩存的處理邏輯封裝在DLA FS中。如果要讀取的數據存在於緩存中,會直接從本地緩存返回,不需要從OSS拉取數據。如果數據不在緩存中,會直接從OSS讀取同時異步緩存到本地磁盤。

image.png

3.2 Data Cache命中率提高策略

這裡以DLA Serverless Presto來說明如何提高DLA FS的local Cache的命中率提高。Presto默認的split提交策略是NO_PREFERENCE,在這種策略下面,主要考慮的因素是worker的負載,因此某個split會被分到哪個worker上面很大程度上是隨機的。在DLA Presto中,我們使用SOFT_AFFINITY提交策略。在提交Hive的split時,會通過計算split的hash值,儘可能將同一個split提交到同一個worker上面,從而提高Cache的命中率。

image.png

使用_SOFT_AFFINITY_策略時,split的提交策略是這樣的:

  1. 通過split的hash值確定split的首選worker和備選worker。
  2. 如果首選worker空閒,則提交到首選worker。
  3. 如果首選worker繁忙,則提交到備選worker。
  4. 如果備選worker也繁忙,則提交到最不繁忙的worker。

四、DLA FS帶來的價值

1、Rename優化在ETL寫入場景的效果

客戶在使用DLA過程中,通常使用DLA Serverless Spark做大規模數據的ETL。我們用TPC-H 100G數據集中的orders表進行寫入測試,新建一個以o_ordermonth字段為分區的orders_test表。在Spark中執行sql:"insert overwrite table `tpc_h_test`.`orders_test` select * from `tpc_h_test`.`orders`"。使用同樣的資源配置,使用的Spark版本一個為開源Spark,另外一個為DLA Serverless Spark,將它們的結果進行對比。

image.png

從圖中可以得出:

  • 這次優化對算法1和算法2都有比較大的提升。
  • 算法1和算法2開啟這個特性以後都會得到優化,但是算法1更明顯。這是由於算法1需要進行兩次rename,並且有一次rename是在driver上單點進行的;算法2是各executor進行分佈式rename操作且只要進行一次。
  • 在當前這個數據量下,算法1和算法2在開啟這個特性之後,差距沒有那麼明顯。兩種方式都不需要進行rename操作,只不過是completeMultipart是否是在driver上單點執行(算法2我們的改造是completeMultipartUpload在commitTask的時候執行),大數據量可能仍會有比較大的影響。

2、InputStream優化在交互式場景的效果

DLA客戶會使用DLA的Serverless Presto對多種格式進行分析,比如Text、ORC、Parquet等。下面對比基於DLA FS以及社區OSSFS在1GB Text及ORC格式的訪問請求次數。

image.png

1GB Text文件分析的請求次數對比

image.png

  • Text類調用次數降低為開源實現的1/10左右;
  • ORC格式調用次數降低為開源實現1/3左右;
  • 平均來看可以節省OSS調用費用60%到90%;

3、Data Cache在交互式場景的效果

我們針對社區版本prestodb和DLA做了性能對比測試。社區版本我們選擇了prestodb 0.228版本,並通過複製jar包以及修改配置的方式增加對oss數據源的支持。我們對DLA Presto CU版512核2048GB通社區版本集群進行了對比。

測試的查詢我們選擇TPC-H 1TB數據測試集。由於TPC-H的大部分查詢並不是IO密集型的,所以我們只從中挑選出符合如下兩個標準的查詢來做比較:

  1. 查詢中包含了對最大的表lineitem的掃描,這樣掃描的數據量足夠大,IO有可能成為瓶頸。
  2. 查詢中不涉及多個表的join操作,這樣就不會有大數據量參與計算,因而計算不會先於IO而成為瓶頸。

按照這兩個標準,我們選擇了對lineitem單個表進行查詢的Q1和Q6,以及lineitem和另一個表進行join操作的Q4、Q12、Q14、Q15、Q17、Q19和Q20。

可以看到Cache加速能管理在這幾個query都有明顯的效果。

image.png

五、雲原生數據湖最佳實踐

最佳實踐,以DLA為例子。DLA致力於幫助客戶構建低成本、簡單易用、彈性的數據平臺,比傳統Hadoop至少節約50%的成本。其中DLA Meta支持雲上15+種數據數據源(OSS、HDFS、DB、DW)的統一視圖,引入多租戶、元數據發現,追求邊際成本為0,免費提供使用。DLA Lakehouse基於Apache Hudi實現,主要目標是提供高效的湖倉,支持CDC及消息的增量寫入,目前這塊在加緊產品化中。DLA Serverless Presto是基於Apache PrestoDB研發的,主要是做聯邦交互式查詢與輕量級ETL。DLA支持Spark主要是為在湖上做大規模的ETL,並支持流計算、機器學習;比傳統自建Spark有著300%的性價比提升,從ECS自建Spark或者Hive批處理遷移到DLA Spark可以節約50%的成本。基於DLA的一體化數據處理方案,可以支持BI報表、數據大屏、數據挖掘、機器學習、IOT分析、數據科學等多種業務場景。

image.png

歡迎大家進入我們的釘釘群一起交流討論~

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *