開發與維運

通過Job Committer保證Mapreduce/Spark任務數據一致性

併發地向目標存儲系統寫數據是分佈式任務的一個天然特性,通過在節點/進程/線程等級別的併發寫數據,充分利用集群的磁盤和網絡帶寬,實現高容量吞吐。併發寫數據的一個主要需要解決的問題就是如何保證數據一致性的問題,具體來說,需要解決下面列出的各個問題:

  1. 在分佈式任務寫數據的過程中,如何保證中間數據對外不可見。
  2. 在分佈式任務正常完成後,保證所有的結果數據同時對外可見。
  3. 在分佈式任務失敗時,所有結果數據對外不可見且能正確清理。
  4. 開啟預測執行時,保證多個執行相同任務的task只有一份結果數據在最終結果中。

此外,還要一些作業的異常情況需要處理,例如task失敗重試,作業重啟等等。Job Committer是MapReduce用來實現分佈式寫入一致性的保證,通過Job Committer的各種實現,保證MapReduce任務在各種異常場景中數據寫出的一致性。Spark支持MapReduce的JobCommitter,同樣也是通過JobCommitter實現Spark作業寫出數據的一致性。

JobCommitter接口

MapReduce有V1和V2兩套API接口,在包名中以mapredmapreduce區分,v1和v2版本的JobCommitter抽象接口基本一致,下面以org.apache.hadoop.mapreduce.OutputCommitter為例介紹主要的接口定義:

Modifier and Type Method and Description
abstract void setupJob(JobContext jobContext)For the framework to setup the job output during initialization.
void commitJob(JobContext jobContext)For committing job's output after successful job completion.
void abortJob(JobContext jobContext, org.apache.hadoop.mapreduce.JobStatus.State state)For aborting an unsuccessful job's output.
boolean isCommitJobRepeatable(JobContext jobContext)Returns true if an in-progress job commit can be retried.
abstract void setupTask(TaskAttemptContext taskContext)Sets up output for the task.
abstract void commitTask(TaskAttemptContext taskContext)To promote the task's temporary output to final output location.
abstract void abortTask(TaskAttemptContext taskContext)Discard the task output.
abstract boolean needsTaskCommit(TaskAttemptContext taskContext)Check whether task needs a commit.
boolean isRecoverySupported(JobContext jobContext)Is task output recovery supported for restarting jobs? If task output recovery is supported, job restart can be done more efficiently.
void recoverTask(TaskAttemptContext taskContext)Recover the task output.

根據接口的調用時機和順序,我們可以大致梳理出MapReduce任務是如何通過JobCommitter的工作機制。

  1. 在job初始化時,調用setupJob,進行一些作業級別的初始化工作,例如設置job的工作目錄等等。
  2. 如果已有相同作業正在執行,調用isCommitJobRepeatable判斷是否繼續。
  3. 在task初始化時,調用setupTask,進行一些作業級別的初始化工作,例如設置task工作目錄,task輸出目錄等。
  4. 如果task輸出已存在,通過isRecorverySupport判斷是否支持recovery,是的話,調用recoverTask,避免task的計算。
  5. 如果task執行失敗,調用abortTask,清理task輸出。
  6. 如果task執行成功,調用commitTask。
  7. 如果所有task都全部完成,調用commitJob。
  8. 如果job失敗,調用abortJob。

可以看到,JobCommitter的基本機制是基於一種類似於分佈式數據庫中的兩階段提交協議的方式,task首先commit,主要的工作在task中完成,在appmaster收到所有task成功提交的信息後,進行job commit完成最後的提交工作。通過兩階段提交協議實現數據一致性有兩個主要的需求需要滿足:

  1. 在commit job以前,數據對外不可見,且可回退。
  2. commit job過程要儘量短,最好是原子操作,較長的commit job過程,中間發生失敗的風險較大,一旦失敗,會導致數據處於某種中間狀態,無法滿足數據一致性的要求。

在MapReduce中,FileOutputCommitter是最常使用的一個Job Commiter實現,在寫入數據到HDFS上時,完全滿足兩階段提交協議的兩個要求。

FileOutputCommitter

下面簡單介紹FileOutputCommitter主要接口的一些具體實現細節。FileOutputCommitter主要涉及到四個目錄:

  • 最終目錄:$dest/
  • Job臨時目錄:$dest/_temporary/$appAttemptId/
  • Task臨時目錄:$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID/
  • Task輸出目錄:$dest/_temporary/$appAttemptId/$taskAttemptID/

整個JobCommitter執行過程如圖所示:

img

  1. setupJob:設置Job臨時目錄。
  2. setupTask:確定Task臨時目錄和輸出目錄。
  3. commitTask:將Task臨時目錄rename到輸出目錄。
  4. abortTask:清理Task臨時目錄。
  5. commitJob:將Job臨時目錄中的數據(包含所有Task輸出目錄中的文件)合併到Job最終目錄。
  6. abortJob:清理Job臨時目錄。

根據以上FileOutputCommitter的實現,在可以看到,在commitJob之前,所有mapreduce任務寫的數據都在臨時目錄中,讀取Job最終目錄不會讀到臨時數據,在Job執行的任意過程失敗,清理臨時目錄文件即可。FileOutputCommitter在Job執行的過程中,每一個產生的文件需要進行兩次Rename操作,第一次是commitTask,在Task中執行,多個節點中執行的task可以併發地進行Rename。第二次是commitJob,MapReduce或者Spark的Job Driver端執行的,是個單點操作。在commitJob時,由於需要將Job臨時目錄中的文件移動到最終目錄,會有一個時間窗口,在過程中失敗的話,會導致部分數據對外可見,這個時間窗口隨著文件數量的增加也會隨之增加。對於HDFS這類分佈式文件系統來說,rename是一個十分高效的操作,只涉及到NameNode上相關元數據的修改,所以這個時間窗口非常小,可以滿足絕大部分場景的需求。

在對於S3,OSS等公有云上的對象存儲系統來說,並不直接支持Rename操作,文件系統級別的Rename操作一般會轉換成Copy+Delete操作,這個代價相對於HDFS會大大增加。commitJob是在MapReduce或者Spark的Job Driver端執行的,是個單點操作,雖然有實現線程級別的併發優化,但是在寫入S3/OSS的場景中,commitJob的時間窗口會非常長,文件數量較大時,可能達到分鐘,甚至小時級別,這對於Job的性能會產生嚴重的影響,為了解決寫S3/OSS等對象存儲系統的性能問題,Hadoop社區引入了FileOutputCommitter V2版本。

FileOutputCommitter V2

FileOutputCommitter V2版本整個job commit的過程如下:

img

  1. setupJob:設置Job臨時目錄。
  2. setupTask:確定Task臨時目錄。
  3. commitTask:將Task臨時目錄文件rename到Job最終目錄。
  4. abortTask:清理Task臨時目錄。
  5. commitJob:無需Rename操作。
  6. abortJob:清理Job臨時目錄。

可以看到在V2版本中,最大的區別是去掉了Task輸出目錄,在commitTask的時候將文件直接rename到Job最終目錄,整個Job Commit過程,對於所有的文件只需進行一次Rename操作,而且Rename操作是在集群節點的所有task上併發執行的,消除了Job Driver單點執行rename的瓶頸。

FileOutputCommitter V2在寫入數據到S3/OSS等場景中大大提高了性能,但是由於byPass了Task輸出目錄,無法保證數據的一致性,在Job執行過程中,部分文件就移動到了Job最終目錄。當部分task成功,部分task失敗時,也會在最終目錄中殘留中間文件。

針對寫入S3/OSS等的場景,Hadoop社區和各個工業界也都提出了非常多的解決方案,基本的目標是保證數據一致性的前提下,完全避免Rename操作。下面主要介紹S3ACommitter和JindoOssCommitter,分別是hadoop社區和阿里雲EMR團隊針對S3和OSS實現的Job Committer,主要是基於S3/OSS的Multipart Upload特性實現,基本思想一致,在這裡一併介紹。此外,還有Databricks基於DBIO的方案,Netflix的Staging committer方案等等,篇幅有限,這裡就不過多介紹了。

對象存儲系統的Multipart Upload

除了通過PUT Object接口上傳文件到S3/OSS以外,S3/OSS還提供了另外一種上傳模式——Multipart Upload。主要應用在文件較大,需要斷點上傳或者網絡不好等場景中,以OSS為例,Multipart Upload上傳的流程如下:

  1. InitiateMultipartUpload:使用Multipart Upload模式傳輸數據前,必須先調用該接口來通知OSS初始化一個Multipart Upload事件。指定目標文件地址作為參數,獲取一個uploadId用作後續upload使用。
  2. UploadPart:初始化一個MultipartUpload之後,可以根據指定的Object名和Upload ID來分塊(Part)上傳數據。可重複調用uploadPart接口上傳不同的分塊數據,而且可以併發調用。
  3. CompleteMultipartUpload:在將所有數據Part都上傳完成後,必須調用CompleteMultipartUpload接口來完成整個文件的MultipartUpload。完成completeMultipartUpload後,文件在oss上對外可見,在completeMultipartUpload返回之前,該文件對外不可見。
  4. AbortMultipartUpload:AbortMultipartUpload接口用於終止MultipartUpload事件,在CompleteMultipartUpload之前可隨時中止MultipartUpload。
  5. ListMultipartUploads:ListMultipartUploads用來列舉所有執行中的Multipart Upload事件,即已經初始化但還未Complete或者Abort的Multipart Upload事件。

基於Multipart Upload的No-Rename Committer實現

通過Multipart Upload功能提供的支持,結合S3/Oss文件系統層面的定製支持,可以實現在保證數據一致性前提下無需Rename操作的Job Committer實現,具體的Job Commit流程如下:

img

  1. setupJob:設置Job臨時目錄。
  2. setupTask:設置Task臨時目錄,Task執行過程中寫文件使用MultiUpload接口直接寫到Job最終目錄,在close文件時,不調用CompleteMultipartUpload接口,將所有Upload分塊信息記錄在Task臨時目錄的文件中。
  3. commitTask:將Task臨時目錄文件中的多個文件Upload分塊信息合併成一個文件,寫到Job臨時目錄。
  4. abortTask:清理Task臨時目錄,使用AbortMultipartUpload接口,abort所有該task寫的文件。
  5. commitJob:訪問Job臨時目錄中所有的Upload分塊信息,調用CompleteMultipartUpload接口,完成所有文件的MultipartUpload。
  6. abortJob:調用ListMultipartUploads,abort所有該task寫的文件分塊,清理Job臨時目錄。

在Task執行過程中,由於通過Multipart Upload相關接口初始化upload和上傳分塊數據,但是知道commitJob時,才會調用CompleteMultipartUpload。根據Multipart Upload特性,在調用CompleteMultipartUpload前文件是不可見的,從而保證了數據一致性。同FileOutputCommitter類似,由於有多個文件需要CompleteMultipartUpload,在commitJob時也會有一個可能導致數據不一致的時間窗口。文件的上傳過程都已經在task中分佈式的完成了,在Job Driver中commitJob時CompleteMultipartUpload是一個非常輕量級的請求,所以這個時間窗口會非常短,失敗的可能較低,可以滿足絕大部分業務場景的需求。對比FileOutputCommitter V1,在jobCommit時,CompleteMultipartUpload相對於Rename代價小很多,可能導致數據不一致的時間窗口也會少很多。對比FileOutputCommitter V2,V2並不保證數據一致性,JindoOssCommitter可以適用於更多對數據一致性有要求的場景。

性能方面,這種方式分佈式的在task中併發寫數據到OSS中,並且不需要Rename操作,對比FileOutputCommitter V1/V2分別需要的兩次和一次Rename操作,也有大幅的性能提升。

總結

通過對象存儲系統普遍提供的Multipart Upload功能,實現的No-Rename Committer在數據一致性和性能方面相對於FileOutputCommitter V1/V2版本均有較大提升,在使用MapRedcue和Spark寫入數據到S3/Oss的場景中更加推薦使用。S3ACommitter在Hadoop社區版本的3.1.2中已經可以使用,JindoOssCommitter也在阿里雲的EMR環境2.5.0以上版本中默認開啟。

Leave a Reply

Your email address will not be published. Required fields are marked *