作者 | 汪磊(網易雲音樂 / 數據平臺開發專家)
整理 | 楊濤(Flink 社區志願者)
如何基於 Flink 的新 API 升級實時數倉架構?
背景介紹
網易雲音樂從 2018 年開始搭建實時計算平臺,到目前為止已經發展至如下規模:
- 機器數量:130+
- 單 Kafka 峰值 QPS:400W+
- 在線運行任務數:500+
- 開發者:160+
- 業務覆蓋:在線業務支持,實時報表統計,實時特徵處理,實時索引支持
- 2020 年 Q1 任務數增長 100%,處於高速發展中
這是網易雲音樂實時數倉 18 年的版本,基於 Flink 1.7 版本開發,當時 Flink SQL 的整體架構也還不是很完善。我們使用了 Antlr (通用的編程語言解析器,它只需編寫名為 G4 的語法文件,即可自動生成解析的代碼,並且以統一的格式輸出,處理起來非常簡單。由於 G4 文件是通過開發者自行定製的,因此由 Antlr 生成的代碼也更加簡潔和個性化)自定義了一些 DDL 完善了維表 Join 的語法。通過 Antlr 完成語法樹的解析以後,再通過 CodeGen(根據接口文檔生成代碼)技術去將整個 SQL 代碼生成一個 Jar 包,然後部署到 Flink 集群上去。
此時還沒有統一的元數據管理系統。在 JAR 包任務的開發上, 我們也沒有任何框架的約束,平臺也很難知道 JAR 的任務上下游以及相關業務的重要性和優先級。這套架構我們跑了將近一年的時間,隨著任務越來越多,我們發現了以下幾個問題:
重複的數據理解
由於沒有進行統一的元數據管理,每個任務的代碼裡面都需要預先定義 DDL 語句,然後再進行 Select 等業務邏輯的開發;消息的元數據不能複用,每個開發都需要進行重複的數據理解,需要了解數據從哪裡來、數據如何解析、數據的業務含義是什麼;整個過程需要多方溝通,整體還存在理解錯誤的風險;也缺乏統一的管理系統去查找自己想要的數據。
和官方版本越走越遠
由於早期版本很多 SQL 的語法都是我們自己自定義的,隨著 Flink 本身版本的完善,語法和官方版本差別越來越大,功能完善性上也漸漸跟不上官方的版本,易用性自然也越來越差。如果你本身就是一名熟知 Flink SQL 的開發人員,可能還需要重新學習我們平臺自己的語法,整體不是很統一,有些問題也很難在互聯網上找到相關的資料,只能靠運維來解決。
任務運維問題
SQL 任務沒有統一的元數據管理、上下游的數據源沒有統一的登記、JAR 包任務沒有統一的框架約束、平臺方很難跟蹤整個平臺數據流的走向,我們不知道平臺上運行的幾百個任務分別是幹什麼的,哪些任務讀了哪個數據源?輸出了什麼數據源?任務的種類是什麼?是線上的,測試的,重要的還是不重要的。沒有這些數據的支撐,導致整個運維工作非常侷限。
網易雲音樂的業務發展非常快,數據量越來越大,線上庫和一些其它的庫變更十分頻繁,相關的實時任務也要跟著業務架構的調整,變更相關數據源的地址。此時我們就需要知道哪些任務用到了相關的數據源,如果平臺沒有能力很快篩選出相關任務,整個流程處理起來就十分繁雜了。
首先,需要聯繫平臺所有的開發者確認是否有相關任務的數據源,整個流程非常浪費時間,而且還有可能產生疏漏;其次,假設出現平臺流量激增,做運維工作時,如果我們不知道任務在幹什麼,自然也不能知道的任務的重要性,不知道哪些任務可以限流,哪些任務可以做暫時性的停止,哪些任務要重點保障。
實時數倉建設
帶著這些問題,我們開始進行新版本的構建工作。
- 在 Flink 1.9 版本以後,Flink 有了重大變化,重構了 Catalog 的 API,這和之前我們做的離線方向的工作有一定的契合。在離線的生態上,網易雲音樂有著一套非常完整的服務體系,打通元數據中心和 Spark SQL,可以通過 Spark SQL 連接元數據中心的元數據,進行異構數據源的聯邦查詢以及數據傳輸工作;
- 同樣基於 Flink 1.10,我們利用新的 Catalog 的 API 實現了一個元數據中心的 Catalog。將元數據中心作為 Flink SQL 的底層元數據組件,實現了 Kafka 到元數據中心任一數據源的實時的數據傳輸,以及 Redis、HBase、Kudu 等數據源的維表 JOIN 的實現;
- 除了純 SQL 的開發方式外,我們還提供了一套 SDK,讓用戶可以通過 SQL 加代碼混合使用的方式來實現自己的業務邏輯,提升整個 Flink API 的易用性,大大降低用戶的開發門檻,提升了平臺對任務的管控能力;
- 有了統一的元數據的管理以及 SDK 的開發方式,血緣收集也變得水到渠成,有了上下游數據的走向信息,平臺也很容易通過數據源的業務屬性來判斷任務的重要性。
元數據中心
不知道大家有沒有用過 Apache Atlas、Netflix 的 Metacat 等工具,網易雲音樂的元數據中心顧名思義就是一個元數據管理的程序,用於管理網易雲音樂所有數據源的元數據。你有可能在實際的開發中用到 Oracle、Kudu、Hive 等工具,也有可能是自研的分佈式數據庫。如果沒有統一的元數據管理,我們很難知道我們有哪些數據,數據是如何流轉的,也很難快速找到自己想要的數據。
將它們統一管理的好處是,可以通過元數據中心快速找到自己想要的數據,瞭解數據表的連接信息、schema 信息,字段的業務含義,以及所有表的數據來源和走向。
我們的元數據中心繫統有以下幾個特點:
1. 擴展性強:元數據中心系統理論上是可以管理所有的數據存儲中間件的,每個存儲中間件都可以通過插件的方式熱部署擴展上去,目前我們已經支持了雲音樂內部幾乎所有的存儲中間件;
2. 下推查詢:對於自身有元數據系統的存儲中間件,如剛剛提到的 Oracle、Kudu 、Hive 等,我們採用的是下推查詢的方式,直接去查詢它們的元數據的數據庫,獲取到相應的元數據信息,這樣就不會存在元數據不一致的問題;
3. Nest 元數據登記:對於像 Kafka、RocketMQ 這種自身並不存在元數據體系的,元數據中心內部有一個內嵌的元數據模塊 Nest,Nest 參考了 Hive 元數據的實現,用戶可以手動登記相關數據的 Schema 信息;
4. 統一的類型系統:為了更好的管理不同類型的的數據源,方便外部查詢引擎對接,元數據中心有一套完善的類型系統,用戶在實現不同數據源的插件時需要實現自身類型體系到元數據類型的映射關係;
5. 元數據檢索:我們會定期用全量數和增量的方式將元數據同步到 ES 當中,方便用戶快速查找自己想要的數據;
6. 完善的血緣功能:只要將任務的上下游按照指定的格式上報到元數據中心,就可以通過它提供的血緣接口去拿到整個數據流的血緣鏈路。
建設流程
需要進行的工作包括:
- 使用元數據中心的 API 實現 Flink Catalog API。
- 元數據中心到 Flink 系統的數據類型轉換,因為元數據中有一套統一的類型系統,只需要處理 Flink 的類型系統到元數據類型系統的映射即可,不需要關心具體數據源的類型的轉換。
- 數據源屬性和表屬性的轉換,Flink 中表的屬性決定了它的源頭、序列化方式等,但是元數據中心也有自己的一套屬性,所以需要手動轉換一些屬性信息,主要是一些屬性 key 的對齊問題。
- 血緣解析上報。
- 序列化格式完善。
- Table Connector 的完善,完善常用的存儲中間件的 Table Connector,如 Kudu、網易內部的 DDB 以及雲音樂自研的 Nydus 等。
- 提供 SDK 的開發方式:SDK 開發類似於 Spark SQL 的開發方式,通過 SQL 讀取數據,做一些簡單的邏輯處理,然後轉換成 DataStream,利用底層 API 實現一些複雜的數據轉換邏輯,最後再通過 SQL 的方式 sink 出去。簡單來說就是,SQL 加代碼混編的方式,提升開發效率,讓開發專注於業務邏輯實現,同時保證血緣的完整性和便利性,且充分利用了元數據。
完成以上工作後,整體基本就能實現我們的預期。
在一個 Flink 任務的開發中,涉及的數據源主要有三類:
- 流式數據:來自 Kafka 或者 Nydus,可以作為源端和目標端;
- 維表 JOIN 數據:來自 HBase 、Redis、JDBC 等,這個取決於我們自己實現了哪些;
- 落地數據源:一般為 MySQL、HBase、Kudu、JDBC 等,在流處理模式下通常作為目標端。
對於流式數據,我們使用元數據中心自帶的元數據系統 Nest 登記管理(參考右上角的圖);對於維表以及落地數據源等,可以直接通過元數據中心獲取庫表 Schema 信息,無需額外的 Schema 登記,只需要一次性登記下數據源連接信息即可(參考右下角的圖)。整體對應我們系統中數倉模塊的元數據管理、數據源登記兩個頁面。
完成登記工作以後,我們可以通過catalog.[table]等方式訪問任一元數據中心中登記的表,進行 SQL 開發工作。其中 Catalog 是在數據源登記時登記的名字;db 和 table 是相應數據源自身的 DB 和 Table,如果是 MySQL 就是 MySQL 自身元數據中的 DB 和 Table。
最終效果可以參考左下角讀取實時表數據寫入 Kudu 的的例子,其中紅框部分是一個 Kudu 數據表,在使用前只需要登記相關連接信息即可,無需登記表信息,平臺會從元數據中心獲取。
ABTest 項目實踐
項目說明
ABTest 是目前各大互聯網公司用來評估前端改動或模型上線效果的一種有效手段,它主要涉及了兩類數據:第一個是用戶分流數據,一個 AB 實驗中用戶會被分成很多組;然後就是相關指標統計數據,我們通過統計不同分組的用戶在相應場景下指標的好壞,來判斷相關策略的好壞。這兩類數據被分為兩張表:
- 用戶分流表:dt 表示時間,os 表示操作系統。ab_id 是某個 ABTest 的 id 號,group_id 就是分組 id ,group_type 分為兩種,對照組指的是 ABTest 裡面的基準,而實驗組即是這次 ABTest 需要去評估的這批數據。userId 就是用戶 id 了。
- 指標統計表:根據 dt、os 等不同維度來統計每個用戶的有效播放,曝光,點擊率等指標,metric、metric_ext 組合成一個具體含義。
在早期版本中,我們使用 Spark 按照小時粒度完成從 ODS 到 DWD 層數據清洗工作,生成用戶分流表和指標統計表。然後再使用 Spark 關聯這兩張表的數據將結果寫入到 Kudu 當中,再使用 Impala 系統對接,供用戶進行查詢。
這套方案的最大的問題是延遲太高,往往需要延遲一到兩個小時,有些甚至到第二天才能看到結果。對於延遲歸檔的數據也不能及時對結果進行修正。
這個方案對我們的業務方比如算法來說,上線一個模型需要等到兩個小時甚至第二天才能看到線上的效果,試錯成本太高,所以後來使用新版的實時倉開發了一套實時版本。
如上圖所示,是我們實時版本 ABTest 的數據走向,我們整體採用了 Lambda 架構:
- 第一步:使用 Flink 訂閱 ODS 原始的數據日誌,處理成 DWD 層的數據分流表和指標統計表,同時也將實時的 DWD 層數據同步到相同結構的 Hive 表當中。DWD 層處理的目的是將業務數據清洗處理成業務能看懂的數據,沒有聚合操作,實現比較簡單。但是流數據歸檔到 Hive 的過程中需要注意小文件問題,文件落地的頻率越高,延遲越低,同時落地的小文件也會越多,所以需要在技術和需求上權衡這個問題。同時在下方,我們也會有一條離線的數據流來處理同樣的過程,這個離線不是必須的,如果業務方對數據的準確性要求非常高,我們需要用離線處理做一次修正,解決數據重複問題。這一步還涉及到一個埋點的複雜問題,如果一個指標的埋點非常複雜,比如需要依賴時間順序路徑的歸因,而且本身客戶端日誌的延遲程度也非常不可靠的話,離線的修復策略就更加有必要了。
- 第二步:DWS 層處理,讀取第一步生成的 DWD 的流表數據使用 Flink 按照天和小時的維度做全局聚合,這一步利用了 Flink 狀態計算的特點將中間結果維護在 RocksDB 的狀態當中。然後使用 RetractionSink 將結果數據不斷寫入到 Kudu ,生成一個不斷修正的 DWS 層聚合數據。同樣我們也會使用 Spark 做一套同樣邏輯的計算曆史數據來做數據的修正。
-
這個步驟涉及到幾個問題:
- Flink 大狀態的運維和性能問題:為了解決了這個問題,我們使用 SSD 的機器專門用來運行這種大狀態的任務,保障 RocksDB 狀態的吞吐性能;
- Kudu 的 Update 性能問題:這裡做了一些 minibatch 的的優化降低 Kudu 寫入的壓力;
- Lambda 架構的運維成本:實時離線兩套代碼運維成本比較高。
- 第三步:結果數據對接
對於實時的結果數據我們使用 Impala 直接關聯用戶分流表和指標數據表,實時計算出結果反饋給用戶;
對於 T+1 的歷史數據,因為數據已經落地,並且不會再變了,所以為了降低 Impala 的壓力,我們使用 Spark 將結果提前計算好存在 Kudu 的結果表中,然後使用 Impala 直接查詢出計算好的結果數據。
批流一體
前面介紹的 ABTest 實時化整個實現過程就是一套完整的批流一體 Lambda 架構的實現。ODS 和 DWD 層既可以訂閱訪問,也可以批量讀取。DWD 層落地在支持更新操作的 Kudu 當中,和上層 OLAP 引擎對接,為用戶提供實時的結果。目前實現上還有一些不足,但是未來批流一體的努力方向應該能看得比較清楚了。
我們認為批流一體主要分以下三個方面:
■ 1. 結果的批流一體
使用數據的人不需要關心數據是批處理還是流處理,在提交查詢的那一刻,拿到的結果就應該是截止到目前這一刻最新的統計結果,對於最上層用戶來說沒有批和流的概念。
■ 2. 存儲的批流一體
上面的 ABTest 例子中我們已經看到 DWD、DWS 層數據的存儲上還有很多不足,業界也有一些相應解決方案等待去嘗試,我們希望的批流一體存儲需要以下幾個特性:
- 同時提供增量訂閱讀取以及批量讀取的能力,如 Apache Pulsar,我們可以批量讀取它裡面的歸檔數據,也可以通過 Flink 訂閱它的流式數據,解決 DWD 層兩套存儲的問題。
- 高性能的實時 / 批量 append 和 update 能力,讀寫互不影響,提供類似於 MVCC 的機制,類似於 Kudu 這種,但是性能需要更加強悍來解決 DWS 層存儲的問題。
- 和 OLAP 引擎的對接能力,比如 Impala、Presto 等,並且如果想要提升查詢效率可能還要考慮到列式存儲,具備較強的 scan 或者 filter 能力,來滿足上層用戶對業務結果數據查詢效率的訴求。
■ 3. 計算引擎的批流一體
做到一套代碼解決批流統一場景,降低開發運維成本,這個也是 Flink 正在努力的方向,未來我們也會在上面做一些嘗試。
一個小調研
為了更好的瞭解大家實時計算的需求,完善 Flink 的功能及使用體驗,請大家幫忙填寫一個小小的問卷,您的兩分鐘將使 Flink 更強大!
問卷填寫鏈接: