開發與維運

網易遊戲基於 Flink 的流式 ETL 建設

網易遊戲資深開發工程師林小鉑為大家帶來網易遊戲基於 Flink 的流式 ETL 建設的介紹。內容包括:

  1. 業務背景
  2. 專用 ETL
  3. EntryX 通用 ETL
  4. 調優實踐
  5. 未來規劃

一. 業務背景

網易遊戲 ETL 服務概況

網易遊戲的基礎數據主要日誌方式採集,這些日誌通常是非結構化或半結構化數據,需要經過數據集成 ETL 才可以入庫至實時或離線的數據倉庫。此後,業務用戶才可以方便地用 SQL 完成大部分數據計算,包括實時的 Flink SQL 和離線的 Hive 或 Spark。

image.png

網易遊戲數據集成的數據流與大多數公司大同小異,主要有遊戲客戶端日誌、遊戲服務端日誌和其他周邊基礎的日誌,比如 Nginx access log、數據庫日誌等等。這些日誌會被採集到統一的 Kafka 數據管道,然後經由 ETL 入庫服務寫入到 Hive 離線數據倉庫或者 Kafka 實時數據倉庫。

這是很常見的架構,但在我們在需求方面是有一些比較特殊的情況。

網易遊戲流式 ETL 需求特點

image.png

首先,不同於互聯網、金融等行業基本常用 MySQL、Postgres 等的關係型數據庫,遊戲行業常常使用 MongoDB 這類 schema-free 的文檔型數據庫。這給我們 ETL 服務帶來的問題是並沒有一個線上業務的準確的 schema 可以依賴,在實際數據處理中,多字段或少字段,甚至一個字段因為玩法迭代變更為完全不同的格式,這樣的情況都是可能發生的。這樣的數據異構問題給我們 ETL 的數據清洗帶來了比較高的成本。

其次,也是由於數據庫選型的原因,大部分業務的數據庫模式都遵循了反範式設計,會刻意以複雜內嵌的字段來避免表間的 join。這種情況給我們帶來的一個好處是,在數據集成階段我們不需要去實時地去 join 多個數據流,壞處則是數據結構可能會非常複雜,多層嵌套十分常見。

然後,由於近年來實時數倉的流行,我們也同樣在逐步建設實時數據倉庫,所以複用現有的 ETL 管道,提取轉換一次,加載到實時離線兩個數據倉庫,成為一個很自然的發展方向。

最後,我們的日誌類型多且變更頻繁,比如一個玩法複雜的遊戲,可能有 1,000 個以上的日誌類型,每兩週可能就會有一次發版。在這樣的背景下 ETL 出現異常數據是不可避免的。因此我們需要提供完善的異常處理,讓業務可以及時得知數據異常和通過流程修復數據。

日誌分類及特點

image.png

為了更好地針對不同業務使用模式優化,我們對不同日誌類型的業務提供了不同的服務。我們的日誌通常分為三個類型:運營日誌、業務日誌和程序日誌。

運營日誌記錄的是玩家行為事件,比如登錄帳號、領取禮包等。這類日誌是最為重要日誌,有固定的格式,也就是特定 header + json 的文本格式。數據的主要用途是做數據報表、數據分析還有遊戲內的推薦,比如玩家的組隊匹配推薦。

業務日誌記錄的是玩家行為以外的業務事件,這個就比較廣泛,比如 Nginx access log、CDN 下載日誌等等,這些完全沒有固定格式,可能是二進制也可能是文本。主要用途類似於運營日誌,但更加豐富和定製化。

程序日誌記錄是程序的運行情況,也就是平時我們通過日誌框架打的 INFO、ERROR 這類日誌。程序日誌主要用途是檢索定位運行問題,通常是寫入 ES,但有時數量過大或者需要提取指標分析時,也會寫入數據倉庫。

網易遊戲 ETL 服務剖析

image.png

針對這些日誌分類,我們具體提供了三類 ETL 入庫的服務。首先是運營日誌專用的 ETL,這會根據運營日誌的模式進行定製化。然後是通用的面向文本日誌的 EntryX ETL 服務,它會服務於運營日誌以外的所有日誌。最後是 EntryX 無法支持的特殊 ETL 需求,比如有加密或者需要進行特殊轉換的數據,這種情況下我們就會針對性地開發 ad-hoc 作業來處理。

二. 運營日誌專用 ETL

運營日誌 ETL 發展歷程

image.png

運營日誌 ETL 服務有著一個比較久的歷史。大概在 2013 年,網易遊戲就建立了基於 Hadoop Streaming + Python 預處理/後處理的第一版離線 ETL 框架。這套框架是平穩運行了多年。

在 2017 年的時候,隨著 Spark Streaming 的嶄露頭角,我們開發了基於 Spark Streaming 的第二個版本,相當於一個 POC,但因為微批調優困難且小文件多等問題沒有上線應用。

時間來到 2018 年,當時 Flink 已經比較成熟,我們也決定將業務遷移到 Flink 上,所以我們很自然地開發了基於 Flink DataStream 的第三版運營日誌 ETL 服務。這裡面比較特殊的一點就是,因為長久以來我們業務方積累了很多 Python 的 ETL 腳本,然後新版最重要的一點就是要支持這些 Python UDF 的無縫遷移。

運營日誌 ETL 架構

接下來看下兩個版本的架構對比。

image.png

在早期 Hadoop Streaming 的版本里面,數據首先會被 dump 到 HDFS 上,然後 Hadoop Streaming 啟動 Mapper 來讀取數據並通過標準輸入的方式傳遞給 Python 腳本。Python 腳本里面會分為三個模塊:首先預處理 UDF,這裡通常會進行基於字符串的替換,一般用作規範化數據,比如有些海外合作廠商的時間格式可能跟我們不同,那麼就可以在這裡進行統一。預處理完的數據會進入通用的解析/轉換模塊,這裡我們會根據運營日誌的格式來解析數據,並進行通用轉換,比如濾掉測試服數據。通用模塊之後,最後還有一個後處理模塊進行鍼對字段的轉換,比如常見的匯率轉換。之後數據會通過標準輸出返回給 Mapper,然後 Mapper 再將數據批量寫到 Hive 目錄中。

我們用 Flink 重構後,數據源就由 HDFS 改為直接對接 Kafka,而 IO 模塊則用 Flink 的 Source/Sink Operator 來代替原本的 Mapper,然後中間通用模塊可以直接重寫為 Java,剩餘的預處理和後處理則是我們需要支持 Python UDF 的地方。

Python UDF 實現

image.png

在具體實現上,我們在 Flink ProcessFunction 之上加入了 Runner 層,Runner 層負責跨語言的執行。技術選型上是選了 Jython,而沒有選擇 Py4j,主要因為 Jython 可以直接在 JVM 裡面去完成計算,不需要額外啟動 Python 進程,這樣開發和運維管理成本都比較低。而 Jython 帶來的限制,比如不支持 pandas 等基於 c 的庫,這些對於我們的 Python UDF 來說都是可接受的。

整個調用鏈是,ProcessFunction 在 TaskManager 被調用時會在 open 函數延遲初始化 Runner,這是因為 Jython 是不可序列化的。Runner 初始化時會負責資源準備,包括將依賴的模塊加入 PYTHONPATH,然後根據配置反射調用 UDF 函數。

調用時,對於預處理 UDF Runner 會把字符串轉化為 Jython 的 PyUnicode 類型,而對於後處理 UDF 則會把解析後的 Map 對象轉為 Jython 的 PyDcitionary,分別作為兩者的輸入。UDF 可以調用其他模塊進行計算,最終返回 PyObject,然後 Runner 再將其轉換成 Java String 或者 Map,返回給 ProcessFunction 輸出。

運營日誌 ETL 運行時

image.png

剛剛是 UDF 模塊的局部視圖,我們再來看下整體的 ETL 作業視圖。首先在我們提供了通用的 Flink jar,當我們生成並提交 ETL 作業到作業平臺時,調度器會執行通用的 main 函數構建 Flink JobGraph。這時會從我們的配置中心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中包含使用到的 Python 模塊,後端服務會掃描其中引用到的其他模塊,把它們統一作為資源文件通過 YARN 分發功能上傳到 HDFS 上。在 Flink JobManager 和 TaskManager 啟動時,這些 Python 資源會被 YARN 自動同步到工作目錄上備用。這就是整個作業初始化的過程。

然後因為 ETL 規則的小變更是很頻繁的,比如新增一個字段或者變更一下過濾條件,如果我們每次變更都需要重啟作業,那麼作業重啟帶來的不可用時間會對我們的下游用戶造成比較糟糕的體驗。因此,我們對變更進行了分類,對於一些不影響 Flink JobGraph 的輕量級變更支持熱更新。實現的方式是每個 TaskManager 啟動一個熱更新線程,定時輪詢配置中心同步配置。

三. EntryX 通用 ETL

接下來介紹我們的通用 ETL 服務 EntryX。這裡的通用可以分為兩層意義,首先是數據格式上的通用,支持非結構化到結構化的各種文本數據,其次是用戶群體的通用,目標用戶覆蓋數據分析、數據開發等傳統用戶,和業務程序、策劃這些數據背景較弱的用戶。

EntryX 基本概念

image.png

先介紹 EntryX 的三個基本概念,Source、StreamingTable 和 Sink。用戶需要分別配置這個三個模塊,系統會根據這些自動生成 ETL 作業。

Source 是 ETL 作業的輸入源,通常是從業務端採集而來的原始日誌 topic,或者是經過分發過濾後的 topic。這些 topic 可能只包含一種日誌,但更多情況下會包含多種異構日誌。

接下來 StreamingTable,一個比較通俗的名稱就是流表。流表定義了 ETL 管道的主要元數據,包括如何轉換數據,還有根據轉換好的數據定義的流表 schema,將數據 schema 化。流表 schema 是最為關鍵的概念,它相當於 Table DDL,主要包括字段名、字段數據類型、字段約束和表屬性等。為了更方便對接上下游,流表 schema 使用的是自研的 SQL-Like 的類型系統,裡面會支持我們一些拓展的數據類型,比如 JSON 類型。

最後 Sink 負責流表到目標存儲的物理表的映射,比如映射到目標 Hive 表。這裡主要需要 schema 的映射關係,比如流表哪個字段映射到目標表哪個字段,流表哪個字段用作目標 Hive 表分區字段。在底層,系統會自動根據 schema 映射關係來提取字段,並將數據轉換為目標表的存儲格式,加載到目標表。

EntryX ETL 管道

image.png

再來看下 EntryX ETL 管道的具體實現。藍色部分是外部存儲系統,而綠色部分則是 EnrtyX 的內部模塊。

數據首先從對接採集的原始數據 Topic 流入,經過 Source 攝入到 Filter。Filter 負責根據關鍵詞過濾數據,通常來說我們要求過濾完的數據是有相同 schema 的。經過這兩步數據完成 Extract,來到 Transform 階段。

Transform 第一步是解析數據,也就是這裡的 Parser。Parser 支持 JSON/Regex/Csv 三種解析,基本可以覆蓋所有案例。第二步是對數據進行轉換,這是由 Extender 負責的。Extender 通過內置函數或 UDF 計算衍生字段,最常見的是將 JSON 對象拉平展開,提取出內嵌字段。最後是 Formatter,Formatter 會根據之前用戶定義的字段邏輯類型,將字段的值轉為對應的物理類型。比如一個邏輯類型為 BIGINT 的字段,我們在這裡會統一轉為 Java long 的物理類型。

數據完成 Transform 之後來到最後的 Load 階段。Load 第一步是決定數據應該加載到哪個表。Splitter 模塊會根據每個表的入庫條件(也就是一個表達式)來分流數據,然後再到第二步的 Loader 來負責將數據寫到具體的外部存儲系統。目前我們支持 Hive/Kafka 兩種存儲,Hive 支持 Text/Parquet/JSON 三種格式,而 Kafka 支持 JSON 和 Avro 兩種格式。

實時離線統一 Schema

image.png

在 Entryx 的設計裡數據可以被寫入實時和離線兩個數據倉庫,也就是說同一份數據,但在不同的存儲系統中以不同格式表示。從 Flink SQL 的角度來說是 schema 部分相同,但 connector 和 format 不同的兩個表。而 schema 部分經常會隨業務變更,而 connector 和 format(也就是存儲系統和存儲格式)是相對穩定的。那麼一個很自然的想法就是,能不能將 schema 部分提取出來獨立維護?實際上,這個抽象的 schema 已經存在了,就是我們在 ETL 提取的流表 schema。

在 EntryX 裡面,流表 schema 是與序列化器、存儲系統無關的 schema,作為 Single Source of Truth。基於流表 schema,加上存儲系統信息和存儲格式信息,我們就可以衍生出具體的物理表的 DDL。目前我們主要是支持 Hive/Kafka,如果之後要拓展至支持 ES/HBase 表也是非常方便。

實時數據倉庫集成

image.png

EntryX 一個重要的定位是作為實時倉庫的統一入口。剛剛其實已經多次提到 Kafka 表,但還沒有說實時數倉是怎麼做的。實時數倉的常見問題是 Kafka 並沒有原生支持 schema 元數據的持久化。目前社區的主流解決方案是基於 Hive MetaStore 來保存 Kafka 表的元數據,並複用 HiveCatalog 來直接對接到 Flink SQL。

但這對於我們來說使用 Hive MetaStore 主要有幾個問題:一是在實時作業裡引入 Hive 依賴並與 Hive 耦合,這是很重的依賴,導致定義的表很難被其他組件複用,包括 Flink DataStream 用戶;二是我們已經有 Kafka SaaS 平臺 Avatar 來管理物理 schema,比如 Avro schema,如果再引入 Hive MetaStore 會導致元數據的割裂。因此,我們是拓展了 Avatar 平臺的 schema 註冊中心,同時支持邏輯 schema 和物理 schema。

那麼實時數倉和 EntryX 的集成關係是:首先我們有 EntryX 的流表 schema,在新建 Sink 的時候調用 Avatar 的 schema 接口,根據映射關係生成邏輯 schema,而 Avatar 再根據 Flink SQL 類型與物理類型的映射關係生成 topic 的物理 schema。

與 Avatar schema 註冊中心配套的還有我們自研的 KafkaCatalog,它負責讀取 topic 的邏輯和物理 schema 來生成 Flink SQL 的 TableSource 或 TableSink。而對於一些 Flink SQL 以外的用戶,比如 Flink DataStream API 的用戶,他們也可以直接讀取物理 schema 來享受到數據倉庫的便利。

EntryX 運行時

和運營日誌 ETL 類似,在 EntryX 運行時,系統會基於通用的 jar 和配置生成 Flink 作業,但這裡有兩種情況需要特別處理。

image.png

首先是一個 Kafka topic 往往有幾十甚至上千種日誌,那麼對應其實有也幾十甚至上千的流表,如果每個流表都單獨運行在一個作業裡,那麼一個 topic 會可能會被讀上千遍,這是非常大的浪費。因此,在作業運行時提供一個優化策略,可以將同個 source 的不同流表合併到一個作業裡跑。比如圖中,某個手游上傳了 3 種日誌到 Kafka,用戶分別配置了玩家註冊、玩家登錄、領取禮包三個流表,那麼我們可以這三個流表合併起來到一個作業,共享同一個 Kafka Source。

另外的一個優化是,一般情況下我們可以按照之前“提取轉換一次,加載一次”的思路來將數據同時寫到 Hive 和 Kafka,但是由於 Hive 或者說 HDFS 畢竟是離線系統,實時性比較差,寫入在一些負載比較高的 HDFS 老集群經常會出現反壓,同時阻塞上游,導致 Kafka 的寫入也受到影響。在這種情況下,我們通常要分離加載到實時和離線的 ETL 管道,具體會取決於業務的 SLA 還有 HDFS 的性能。

四.調優實踐

接下來給大家分享下我們在 ETL 建設中的調優實踐經驗。

HDFS 寫入調優

image.png

首先是 HDFS 寫入的調優。流式寫入 HDFS 場景中老生常談的一個問題便是小文件過多。通常來說小文件和實時性是魚與熊掌不可兼得。如果要延遲低,那麼我們需要頻繁地滾動文件來提交數據,必然導致小文件過多。

小文件過多主要造成兩個問題:一從 HDFS 集群管理角度看,小文件會佔用大量的文件數和 block 數,浪費 NameNode 內存;二是從用戶角度看,讀寫效率都會降低,因為寫的時候要更頻繁地調用 RPC 和 flush 數據,造成更多的阻塞,有時甚至造成 checkpoint 超時,而讀時則需要打開更多的文件才能讀完數據。

HDFS 寫入調優 - 數據流預分區

我們在優化小文件問題時做的一點調優是對數據流先做一遍預分區,具體來說,便是在 Flink 作業內部先基於目標 Hive 表進行一次 keyby 分區,讓同一個表的數據儘量集中在少數的幾個 subtask 上。

image.png

舉個例子,假設 Flink 作業並行度為 n,而目標 Hive 分區數為 m 個。因為每個 subtask 都有可能讀到任意分區的數據,在默認的各 subtask 完全並行的情況下,每個 subtask 都會寫所有分區,造成總體的寫入文件數是 n * m。假設 n 是 100,m 是 1000,按 10 分鐘滾一次文件算,每天會造成 14,400,000 個文件,這對於很多老集群來說是非常大的壓力。

如果經過數據流分區的優化之後,我們就可以限制住 Flink 並行度帶來的增長。比如我們 keyby hive 表字段,並加入範圍為 0-s 整數的鹽來避免數據傾斜,那麼分區最多會被 s 個 subtask 讀寫。假設 s 是 5,比起原先 n 是 100,那麼我們就將原本的文件數降低為原來 20 分之一。

基於 OperatorState 的 SLA 統計

image.png

第二個我想分享的是我們的 SLA 統計工具。背景是我們的用戶經常會通過 Web UI 來進行調試和問題的排查,比如不同 subtask 的輸入輸出數目,但這些 metric 會因為作業重啟或者 failover 而重置,因此我們開發了基於 OperatorState 的 SLA-Utils 工具來統計數據的輸入和分類輸出。這個工具設計得非常輕量級,可以很容易集成到我們自己的服務或者用戶的作業裡面。

在 SLA-Utils 裡面,我們支持了三種 metric。首先是標準的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,分別對應輸入記錄數/正常輸出記錄數/被過濾掉的記錄數/處理異常的記錄數。通常來說 recordsIn 就等於後面三者的總和。第二種用戶可以自定義的 metric,通常可以用於記錄更詳細的原因,比如是 recordsEventTimeDropped 代表數據是因為 event time 被過濾的。

那麼上述兩種 metric 靜態的,也就是說 metric key 在作業運行前就要確定,此外 SLA-Utils 還支持在運行時動態註冊的 TTL metric。這種 metric 通常有動態生成的日期作為前綴,在經過 TTL 的時間之後被自動清理。TTL metric 主要可以用於做天級別時間窗口的統計。這裡比較特別的一點是,因為 OperatorState 是不支持 TTL 的,SLA-Utils 是在每次進行 checkpoint 快照的時候進行一次過濾,剔除掉過期的 metric,以實現 TTL 的效果。

那麼在 State 保存了 SLA 指標之後要做的就是暴露給用戶。我們目前的做法是通過 Accumulater 的方式來暴露,優點是 Web UI 有支持,開箱即用,同時 Flink 可以自動合併不同的 subtask 的 metric。缺點在於沒有辦法利用 metric reporter 來 push 到監控系統,同時因為 Acuumulater 是不能在運行時動態註銷的,所以使用 TTL metric 會有內存洩漏的風險。因此,在未來我們也考慮支持 metric group 來避免這些問題。

數據容錯及恢復

最後再分享下我們在數據容錯和恢復上的實踐。

image.png

以很多最佳實踐相似,我們用 SideOutput 來收集 ETL 各環節中出錯的數據,彙總到一個統一的錯誤流。錯誤記錄中包含我們預設的錯誤碼、原始輸入數據以及錯誤類和錯誤信息。一般情況下,錯誤數據會被分類寫入 HDFS,用戶通過監控 HDFS 目錄可以得知數據是否正常。

image.png

那麼存儲好異常數據後,下一步就是要恢復數據。這通常有兩種情況。

一是數據格式異常,比如日誌被截斷導致不完整或者時間戳不符合約定格式,這種情況下我們一般通過離線批作業來修復數據,重新回填到原有的數據管道。

二是 ETL 管道異常,比如數據實際的 schema 有變更但流表配置沒有更新,可能會導致某個字段都是空值,這時我們的處理辦法是:首先更新線上的流表配置為最新,保證不再產生更多異常數據,這時 Hive 裡面仍有部分分區是異常的。然後,我們發佈一個獨立的補數作業來專門修復異常的數據,輸出的數據會寫到一個臨時的目錄,並在 hive metastore 上切換 partition 分區的 location 來替換掉原來的異常目錄。因此這樣的一個補數流程對離線查詢的用戶來說是透明的。最後我們再在合適的時間替換掉異常分區的數據並恢復 location。

五.未來規劃

最後介紹下我們的未來規劃。

image.png

  • 第一個是數據湖的支持。目前我們的日誌絕大多數都是 append 類型,不過隨著 CDC 和 Flink SQL 業務的完善,我們可能會有更多的 update、delete 的需求,因此數據湖是一個很好的選擇。
  • 第二個會提供更加豐富的附加功能,比如實時的數據去重和小文件的自動合併。這兩個都是對業務方非常實用的功能。
  • 最後是一個支持 PyFlink。目前我們的 Python 支持只覆蓋到數據集成階段,後續數據倉庫的 Python 支持我們是希望通過 PyFlink 來實現。

最新活動推薦

僅需99元即可體驗阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506

社區二維碼.jpg

Leave a Reply

Your email address will not be published. Required fields are marked *