一 背景
隨著數據時效性對企業的精細化運營越來越重要,“實時即未來”、“實時數倉”、“數據湖” 成為了近幾年炙手可熱的詞。流計算領域的格局也在這幾年發生了巨大的變化,Apache Flink 在流批一體的方向上不斷深耕,Apache Spark 的近實時處理有著一定的受眾,Apache Kafka 也有了 ksqlDB 高調地進軍流計算,而 Apache Storm 卻開始逐漸地退出歷史的舞臺。
每一種引擎有其優勢的地方,如何選擇適合自己業務的流計算引擎成了一個由來已久的話題。除了比較各個引擎提供的不同的功能矩陣之外,性能是一個無法繞開的評估因素。基準測試(benchmark)就是用來評估系統性能的一個重要和常見的過程。
二 現有流計算基準測試的問題
目前在流計算領域中,還沒有一個行業標準的基準測試。目前業界較為人知的流計算 benchmark 是五年前雅虎 Storm 團隊發佈的 Yahoo Streaming Benchmarks[4]。雅虎的原意是因為業界缺少反映真實場景的 benchmark,模擬了一個簡單的廣告場景來比較各個流計算框架,後來被廣泛引用。具體場景是從 Kafka 消費的廣告的點擊流,關聯 Redis 中的廣告所屬的 campaign 信息,然後做時間窗口聚合計數。
然而,正是因為雅虎團隊太過於追求還原真實的生產環境,導致這些外部系統服務(Kafka, Redis)成為了作業的瓶頸。Ververica 曾在這篇文章[5]中做過一個擴展實驗,將數據源從 Kafka 替換成了一個內置的 datagen source,性能提升了 37 倍!由此可見,引入的 Kafka 組件導致了無法準確反映引擎真實的性能。更重要的一個問題是,Yahoo Benchmark 只包含一個非常簡單的,類似 “Word Count” 的作業,它無法全面地反映當今複雜的流計算系統和業務。試想,誰會用一個簡單的 “Word Count” 去衡量比較各個數據庫之間的性能差異呢?正是這些原因使得 Yahoo Benchmark 無法成為一個行業標準的基準測試。這也正是我們想要解決的問題。
因此,我們認為一個行業標準的基準測試應該具備以下幾個特點:
可復現性
可復現性是使得 benchmark 被信任的一個重要條件。許多 benchmark 的結果是難以重現的。有的是因為只擺了個 benchmark 結果圖,用於生成這些結果的代碼並沒有公開。有的是因為用於 benchmark 的硬件不容易被別人獲取到。有的是因為 benchmark 依賴的服務太多,致使測試結果不穩定。
能代表和覆蓋行業真實的業務場景( query 量)
例如數據庫領域非常著名的 TPC-H、TPC-DS 涵蓋了大量的 query 集合,來捕獲查詢引擎之間細微的差別。而且這些 query 集合都立於真實業務場景之上(商品零售行業),數據規模大,因此也很受一些大數據系統的青睞。
能調整作業的負載(數據量、數據分佈)
在大數據領域,不同的數據規模對於引擎來說可能會是完全不同的事情。例如 Yahoo Benchmark 中使用的 campaign id 只有 100 個,使得狀態非常小,內存都可以裝的下。這樣使得同步 IO 和 checkpoint 等的影響可以忽略不計。而真實的場景往往要面對大狀態,面臨的挑戰要複雜困難的多。像 TPC-DS 的數據生成工具會提供 scalar factor 的參數來控制數據量。其次在數據分佈上最好也能貼近真實世界的數據,如有數據傾斜,及調整傾斜比例。從而能全面、綜合地反映業務場景和引擎之間地差異。
有統一的性能衡量指標和採集彙總工具
基準測試的性能指標的定義需要清晰、一致,且能適用於各種計算引擎。然而流計算的性能指標要比傳統批處理的更難定義、更難採集。是流計算 benchmark 最具挑戰性的一個問題,這也會在下文展開描述。
我們也研究了很多其他的流計算相關的基準測試,包括:StreamBench、HiBench、BigDataBench,但是它們都在上述幾個基本面有所欠缺。基準測試的行業標杆無疑是 TPC 發佈的一系列 benchmark,如 TPC-H,TPC-DS。然而這些 benchmark 是面向傳統數據庫、傳統數倉而設計的,並不適用於今天的流計算系統。例如 benchmark 中沒有考慮事件時間、數據的亂序、窗口等流計算中常見的場景。因此我們不得不考慮重新設計並開源一個流計算基準測試框架——Nexmark。
地址:https://github.com/nexmark/nexmark。
三 Nexmark 基準測試框架的設計
為了提供一個滿足以上幾個基本面的流計算基準測試,我們設計和開發了 Nexmark 基準測試框架,並努力讓其成為流計算領域的標準 benchmark 。
Nexmark 基準測試框架來源於 NEXMark 研究論文[1],以及 Apache Beam Nexmark Suite[6],並在其之上進行了擴展和完善。Nexmark 基準測試框架不依賴任何第三方服務,只需要部署好引擎和 Nexmark,通過腳本 nexmark/bin/run_query.sh all 即可等待並獲得所有 query 下的 benchmark 結果。下面我們將探討 Nexmark 基準測試在設計上的一些決策。
1 移除外部 source、sink 依賴
如上所述,Yahoo Benchmark 使用了 Kafka 數據源,卻使得最終結果無法準確反映引擎的真實性能。此外,我們還發現,在 benchmark 快慢流雙流 JOIN 的場景時,如果使用了 Kafka 數據源,慢流會超前消費(快流易被反壓),導致 JOIN 節點的狀態會緩存大量超前的數據。這其實不能反映真實的場景,因為在真實的場景下,慢流是無法被超前消費的(數據還未產生)。所以我們在 Nexmark 中使用了 datagen source,數據直接在內存中生成,數據不落地,直接向下遊節點發送。多個事件流都由單一的數據生成器生成,所以當快流被反壓時,也能抑制慢流的生成,較好地反映了真實場景。
與之類似的,我們也移除了外部 sink 的依賴,不再輸出到 Kafka/Redis,而是輸出到一個空 sink 中,即 sink 會丟棄收到的所有數據。
通過這種方式,我們保證了瓶頸只會在引擎自身,從而能精確地測量出引擎之間細微的差異。
2 Metrics
批處理系統 benchmark 的 metric 通常採用總體耗時來衡量。然而流計算系統處理的數據是源源不斷的,無法統計 query 耗時。因此,我們提出三個主要的 metric:吞吐、延遲、CPU。Nexmark 測試框架會自動幫我們採集 metric,並做彙總,不需要部署任何第三方的 metric 服務。
吞吐
吞吐(throughput)也常被稱作 TPS,描述流計算系統每秒能處理多少條數據。由於我們有多個事件流,所有事件流都由一個數據生成器生成,為了統一觀測角度,我們採用數據生成器的 TPS,而非單一事件流的 TPS。我們將一個 query 能達到的最大吞吐,作為其吞吐指標。例如,針對 Flink 引擎,我們通過 Flink REST API 暴露的 .numRecordsOutPerSecond metric 來獲取當前吞吐量。
延遲
延遲(Latency)描述了從數據進入流計算系統,到它的結果被輸出的時間間隔。對於窗口聚合,Yahoo Benchmark 中使用 output_system_time - window_end 作為延遲指標,這其實並沒有考慮數據在窗口輸出前的等待時間,這種計算結果也會極大地受到反壓的影響,所以其計算結果是不準確的。一種更準確的計算方式應為 output_system_time - max(ingest_time)。然而在非窗口聚合,或雙流 JOIN 中,延遲又會有不同的計算方式。
所以延遲的定義和採集在流計算系統中有很多現實存在的問題,需要根據具體 query 具體分析,這在參考文獻[2]中有詳細的討論,這也是我們目前還未在 Nexmark 中實現延遲 metric 的原因。
CPU
資源使用率是很多流計算 benchmark 中忽視的一個指標。由於在真實生產環境,我們並不會限制流計算引擎所能使用的核數,從而給系統更大的彈性。所以我們引入了 CPU 使用率,作為輔助指標,即作業一共消耗了多少核。通過吞吐/cores,可以計算出平均每個核對於吞吐的貢獻。對於進程的 CPU 使用率的採集,我們沒有使用 JVM CPU load,而是借鑑了 YARN 中的實現,通過採樣 /proc//stat 並計算獲得,該方式可以獲得較為真實的進程 CPU 使用率。因此我們的 Nexmark 測試框架需要在測試開始前,先在每臺機器上部署 CPU 採集進程。
3 Query 與 Schema
Nexmark 的業務模型基於一個真實的在線拍賣系統。所有的 query 都基於相同的三個數據流,三個數據流會有一個數據生成器生成,來控制他們之間的比例、數據偏斜、關聯關係等等。這三個數據流分別是:
- 用戶(Person):代表一個提交拍賣,或參與競標的用戶。
- 拍賣(Auction):代表一個拍賣品。
- 競標(Bid):代表一個對拍賣品的出價。
我們一共定義了 16 個 query,所有的 query 都使用 ANSI SQL 標準語法。基於 SQL ,我們可以更容易地擴展 query 測試集,支持更多的引擎。然而,由於 Spark 在流計算功能上的限制,大部分的 query 都無法通過 Structured Streaming 來實現。因此我們目前只支持測試 Flink SQL 引擎。
4 作業負載的配置化
我們也支持配置調整作業的負載,包括數據生成器的吞吐量以及吞吐曲線、各個數據流之間的數據量比例、每個數據流的數據平均大小以及數據傾斜比例等等。具體的可以參考 Source DDL 參數。
四 實驗結果
我們在阿里雲的三臺機器上進行了 Nexmark 針對 Flink 的基準測試。每臺機器均為 ecs.i2g.2xlarge 規格,配有 Xeon 2.5 GHz CPU (8 vCores) 以及 32 GB 內存,800 GB SSD 本地磁盤。機器之間的帶寬為 2 Gbps。
測試了 flink-1.11 版本,我們在這 3 臺機器上部署了 Flink standalone 集群,由 1 個 JobManager,8 個 TaskManager (每個只有 1 slot)組成,都是 4 GB內存。集群默認並行度為 8。開啟 checkpoint 以及 exactly once 模式,checkpoint 間隔 3 分鐘。使用 RocksDB 狀態後端。測試發現,對於有狀態的 query,每次 checkpoint 的大小在 GB 級以上,所以有效地測試的大狀態的場景。
Datagen source 保持 1000 萬每秒的速率生成數據,三個數據流的數據比例分別是 Bid: 92%,Auction: 6%,Person: 2%。每個 query 都先運行 3 分鐘熱身,之後 3 分鐘採集性能指標。
運行 nexmark/bin/run_query.sh all 後,打印測試結果如下:
五 總結
我們開發和設計 Nexmark 的初衷是為了推出一套標準的流計算 benchmark 測試集,以及測試流程。雖然目前僅支持了 Flink 引擎,但在當前也具有一定的意義,例如:
推動流計算 benchmark 的發展和標準化。
作為 Flink 引擎版本迭代之間的性能測試工具,甚至是日常回歸工具,及時發現性能回退的問題。
在開發 Flink 性能優化的功能時,可以用來驗證性能優化的效果。
部分公司可能會有 Flink 的內部版本,可以用作內部版本與開源版本之間的性能對比工具。
當然,我們也計劃持續改進和完善 Nexmark 測試框架,例如支持 Latency metric,支持更多的引擎,如 Spark Structured Streaming, Spark Streaming, ksqlDB, Flink DataStream 等等。也歡迎有志之士一起加入貢獻和擴展。
參考及引用
[1]Pete Tucker and Kristin Tufte. "NEXMark – A Benchmark for Queries over Data Streams". June 2010.
[2]Jeyhun Karimov and Tilmann Rabl. "Benchmarking Distributed Stream Data Processing Systems". arXiv:1802.08496v2 [cs.DB] Jun 2019
[3]Yangjun Wang. "Stream Processing Systems Benchmark: StreamBench". May 2016.
[4]https://github.com/yahoo/streaming-benchmarks
[5]https://www.ververica.com/blog/extending-the-yahoo-streaming-benchmark
[6]https://beam.apache.org/documentation/sdks/java/testing/nexmark/