大數據

Apache Spark™ 3.0中全新的Structured Streaming UI

作者:Genmao Yu
原文鏈接:https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.html

編譯:邵嘉陽,計算機科學與技術大三在讀,Apache Spark 中文社區志願者


在Apache Spark 2.0中,我們迎來了Structured Streaming——構建分佈式流處理應用的最佳平臺。統一的API(SQL,Dataset和DataFrame)以及Spark內置的大量函數為開發者實現複雜的需求提供了便利,比如流的聚合,流-流連接和窗口支持。開發者們普遍喜歡通過Spark Streaming中的DStream的方式來管理他們的流,那麼類似的功能什麼時候能在Structured Streaming中得到實現呢?這不,在Apache Spark 3.0中,全新的Structured Streaming可視化UI和開發者們見面了。

新的Structured Streaming UI會提供一些有用的信息和統計數據,以此來監視所有流作業,便於在開發調試過程中排除故障。同時,開發者還能夠獲得實時的監測數據,這能使生產流程更直觀。在這個新的UI中,我們會看到兩組統計數據:1)流查詢作業的聚合信息;2)流查詢的具體統計信息,包括輸入速率(Input Rate)、處理速率(Process Rate)、輸入行數(Input Rows)、批處理持續時間(Batch Duration)和操作持續時間(Operation Duration)等。

流查詢作業的聚合信息

開發者提交的流SQL查詢會被列在Structured Streaming一欄中,包括正在運行的流查詢(active)和已完成的流查詢(completed)。結果表則會顯示流查詢的一些基本信息,包括查詢名稱、狀態、ID、運行ID、提交時間、查詢持續時間、最後一批的ID以及一些聚合信息,如平均輸入速率和平均處理速率。流查詢有三種狀態:運行(RUNNING)、結束(FINISHED)、失敗(FAILED)。所有結束(FINISHED)和失敗(FAILED)的查詢都在已完成的流式查詢表中列出。Error列顯示有關失敗查詢的詳細信息。

1.png

我們可以通過單擊Run ID鏈接查看流查詢的詳細信息。

詳細的統計信息

Statistics頁面顯示了包括輸入速率、處理速率、延遲和詳細的操作持續時間在內的一系列指標。通過圖表,開發者能全面瞭解已提交的流查詢的狀態,並且輕鬆地調試查詢處理中的異常情況。
2.png
image.png

它包含以下指標:

  • Input Rate:數據到達的聚合速率(跨所有源)。
  • Process Rate: Spark處理數據的聚合速率(跨所有源)。
  • Batch Duration: 每一批的處理時間。
  • Operation Duration: 執行各種操作所花費的時間(以毫秒為單位)。
    被追蹤的操作羅列如下:
  • addBatch:從源讀取微批的輸入數據、對其進行處理並將批的輸出寫入接收器所花費的時間。這應該會佔用微批處理的大部分時間。
  • getBatch:準備邏輯查詢以從源讀取當前微批的輸入所花費的時間。
  • getOffset:查詢源是否有新的輸入數據所花費的時間。
  • walCommit:將偏移量寫入元數據日誌。
  • queryPlanning:生成執行計劃。

需要注意的是,由於數據源的類型不同,一個查詢可能不會包含以上列出的所有操作。

使用UI解決流的性能故障

在這一部分中,我們會看到新的UI是怎樣實時、直觀地顯示查詢執行過程中的異常情況的。我們會在每個例子中預先假設一些條件,樣例查詢看起來是這樣的:

import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .option("checkpointLocation", checkpointLocation)
    .start()

由於處理能力不足而增加延遲

在第一種情況下,我們希望儘快處理Apache Kafka數據。在每一批中,流作業將處理Kafka中所有可用的數據。如果處理能力不足以處理批數據,那麼延遲將迅速增加。最直觀的現象是Input Rows和Batch Duration會呈線性上升。Process Rate提示流作業每秒最多隻能處理大約8000條記錄,但是當前的輸入速率是每秒大約20000條記錄。產生問題的原因一目瞭然,那麼我們可以為流作業提供更多的執行資源,或者添加足夠的分區來處理與生產者匹配所需的所有消費者。
5.png

穩定但高延遲

第二種情況下,延遲並沒有持續增加,而是保持穩定,如下截圖所示:
image.png

我們發現在相同的Input Rate下,Process Rate可以保持穩定。這意味著作業的處理能力足以處理輸入數據。然而,每批的延遲仍然高達20秒。這裡,高延遲的主要原因是每個批中有太多數據,那麼我們可以通過增加這個作業的並行度來減少延遲。在為Spark任務添加了10個Kafka分區和10個內核之後,我們發現延遲大約為5秒——比20秒要好得多。

image.png

使用操作持續時間圖進行故障排除

操作持續時間圖(Operation Duration Chart)顯示了執行各種操作所花費的時間(以毫秒為單位)。這對於瞭解每個批處理的時間分佈和故障排除非常有用。讓我們以Apache Spark社區中的性能改進“Spark-30915:在查找最新批處理ID時避免讀取元數據日誌文件“為例。
在某次查詢中我們發現,當壓縮後的元數據日誌很大時,下一批要花費比其他批更多的時間來處理。

image.png

在進行代碼審查之後,我們發現這是由對壓縮日誌文件的不必要讀取造成的並進行了修復。新的操作持續時間圖確認了我們想法:

image.png

未來的開發方向

如上所示,新的Structured Streaming UI將通過提供更有用的流查詢信息幫助開發者更好地監視他們的流作業。作為早期發佈版本,新的UI仍在開發中,並將在未來的發佈中得到改進。有幾個未來可以實現的功能,包括但不限於:

  • 更多的流查詢執行細節:延遲數據,水印,狀態數據指標等等。
  • 在Spark歷史服務器中支持Structured Streaming UI。
  • 對於不尋常的情況有更明顯的提示:發生延遲等。

近期活動:

8月24日開始 Spark 實戰訓練營正式開課
免費報名鏈接:https://developer.aliyun.com/learning/trainingcamp/spark/2


入群照片.png

Leave a Reply

Your email address will not be published. Required fields are marked *