摘要:Apache Flink 是一個分佈式大數據處理引擎,可對有限數據流和無限數據流進行有狀態計算。可部署在各種集群環境,對各種大小的數據規模進行快速計算。滴滴基於 Apache Flink 做了大量的優化,也增加了更多的功能,比如擴展 DDL、內置消息格式解析、擴展 UDX 等,使得 Flink 能夠在滴滴的業務場景中發揮更大的作用。本文中,滴滴出行實時計算負責人、高級技術專家樑李印分享了 Apache Flink 在滴滴的應用與實踐。主要內容為:
- 服務化概述
- StreamSQL 實踐
- 平臺化建設
- 挑戰及規則
一、服務化概述
滴滴大數據服務架構
滴滴目前基於開源的大數據生態構建了一個比較完整的大數據體系,這套體系包括了離線、實時、OLAP、HBase 生態、檢索以及消息隊列等。滴滴大數據體系在 Flink 的基礎之上著力發展 StreamSQL,後面也會對此進行詳細介紹。
滴滴流計算髮展歷程
如下圖所示的是滴滴流計算的發展歷程。在 2017 年之前,滴滴的流計算基本上採用的都是業務方自建小集群的方式,並且小集群的選型也是多種多樣的,包括了 Storm、Jstorm、Spark Streaming 以及 Samza 等。滴滴的流計算技術從 2017 年開始收斂,開始主要基於 Spark Streaming 來構建服務化和平臺化的大集群。
- 從 2017 年開始,滴滴開始引入了 Flink,這是因為一些對於延遲要求特別高的業務,Spark Streaming 是無法支持的。
- 2018 年,滴滴流計算開始重點支持 StreamSQL 即 Flink SQL 的 SQL 化服務,另外一方面滴滴也在 Flink CEP 投入了一定精力來解決實際應用中的一些問題。
- 到 2019 年為止,滴滴基本上完成了流計算引擎的統一,除了少量殘留的歷史業務之外,現在絕大多數業務都是以 Flink 為基礎的,目前在滴滴通過 SQL 開發的任務也已經超過了 50%,SQL 開發成為了主流方式。
滴滴流計算業務規模
因為滴滴採用集中式業務管理,因此流計算所支持和服務的業務線達到了 50 多條。流計算集群的規模大致在千臺級別,目前流計算任務數達到了 3 千多個,其中絕大多數是使用 SQL 開發的,集群每天處理的數據量會達到上萬億條。
滴滴流計算業務場景
滴滴流計算的業務場景主要可以劃分為四個方面:實時監控、實時同步、實時特徵以及實時業務。
- 實時監控,包括對於業務指標、導航及 POI 準確率、業務健康度以及車聯網等進行實時監控。
- 實時同步,指的是數據實時地從一個地方轉移到另外一個地方去,這部分包括了業務日誌、數據庫日誌、埋點以及軌跡數據等,其中軌跡屬於滴滴比較有特點的數據,這部分數據會放入到 HBase 中去。
- 實時特徵,在滴滴中也屬於一個比較關鍵的業務,因為其會直接影響派單、導航等的準確性,實時特徵包括司機乘客特徵、上下車特徵、導航軌跡特徵以及工單特徵等。
- 實時業務,主要指的是會實時影響用戶的流計算場景,包括司乘位置語義同步,也就是當司機接單之後能夠知道乘客位置的動態變化,同時乘客也能夠知道司機的位置變化,以及高危的行程檢測等,比如在某個紅綠燈處等待時間較長,就可能出現了異常情況需要客服介入處理,除此之外還包括個性化和服務檢測等。
滴滴流計算多集群體系
隨著業務的不斷髮展,滴滴的機房也越來越多。基於這樣的情況,滴滴希望為業務提供一個多機房的統一視圖。因此,滴滴選擇在 YARN 的基礎之上構建一個路由層,這個路由層的職責是屏蔽多個物理集群,為業務方提供一個單一的邏輯集群,通過 YARN 的劃分來確定哪個業務運行在哪個機房中。在物理集群內部,又實現了隔離,因為一些業務比較重要,不希望受到其他業務的影響,因此可能需要專門劃分出一批機器來管理這些業務。
此外,滴滴對於 YARN 的調度也做了一些定製,因為在滴滴內部,YARN 的實時和離線是完全分開的,而兩者的差異也是比較大的,離線作業需要將機器資源全部用起來,吞吐越大越好,而實時作業卻不同,所追求的是均衡,因此機器的 Load 對它的影響會比較高。
因此,滴滴一方面將 YARN 的調度改為基於 CPU 進行調度,另外一方面還做了繁忙節點的智能過濾,並且還實現了動態資源推薦,這裡會為用戶提供參數的推薦配置,而最終是否調整則取決於用戶。
二、StreamSQL 實踐
StreamSQL 的優勢
StreamSQL 的實踐也是滴滴最近一年以來比較重要的工作。StreamSQL 是在 Flink SQL 基礎之上對於功能進行增加和完善所形成的產品。使用 StreamSQL 的優勢主要包括以下 5 點:
- 描述性語言,業務方不需要關注底層實現,只要將業務邏輯描述出來就能夠達到所想要的效果。
- 接口穩定,雖然版本一直在升級迭代,但只要 SQL 語法不變,對於用戶而言就是完全透明的。
- 問題易排查,因為代碼可讀性比較強,因此用戶只要看懂一些語法邏輯就能夠快速排查出問題。
- 批流一體化,今天在滴滴批處理大部分使用的是 Hive SQL 和 Spark SQL,如果流處理也是用 SQL,那麼在 SQL 與 SQL 之間能夠實現相互結合,比如實現共享 EDF、共享 Meta Store,甚至共享一些語法等,最終實現批流一體化的效果。
- 入門門檻低,StreamSQL 的學習入門的門檻比較低,因此受到了廣大開發者的歡迎。
完善 DDL
滴滴在 StreamSQL 上也做了很多事情,其中之一就是完善 DDL。社區版本的 StreamSQL 在 DDL 這部分是不完善的,而在滴滴卻將這部分實現了打通,比如打通了上游和下游的消息隊列、實時存儲和離線存儲等,業務方或者用戶方只需要創建一個 Source 或 Sink 就可以將上游和下游描述出來。
內置消息格式解析
消息中數據的提取一般而言都比較複雜,比如 Binlog 日誌的數據格式是非常複雜的,如果每個用戶都實現一次數據提取是非常困難的,因此滴滴在 StreamSQL 中內置了消息格式解析,用戶只需要創建一個 Source,並指定類型為 Binlog 就可以將相應的字段創建完成,包括數據庫名稱、表名稱以及業務屬性等,因此使用起來非常方便。
此外,滴滴在 StreamSQL 中還內置了數據去重功能。上游在數據採集等環節往往容易造成數據重複,數據重複往往會對於下游如數據監控等造成影響,因此 StreamSQL 中內置了數據去重功能,業務方只需要進行簡單配置,就能夠將重複的數據清洗掉。
除了數據庫日誌 Binlog 之外,滴滴還有很多自己定義的業務日誌,包含了很多內部的業務字段,對於這些,滴滴在 StreamSQL 中也實現了自動提取日誌頭以及提取所有業務字段,組裝成 Map 的功能。此外,滴滴的 StreamSQL 還支持通過 JSONPath 指定所需字段,而不需要通過 UDF 再獲取字段,因此大大降低了使用複雜度。
擴展 UDX
一方面,滴滴對於 StreamSQL 的原生設計進行了一些擴展,主要是在 JSON 和 Map 的操作上,因為這兩種類型在滴滴的應用非常廣泛,因此滴滴的 StreamSQL 對於這兩者的應用方法做了一些擴充。另外一方面,滴滴的 StreamSQL 還支持了自定義 UDX,用戶可以自己實現一個 Function,通過類似於 Hive Function 這樣的方式指定一個 Jar 包來應用到業務裡面。第三方面,滴滴的 StreamSQL 還兼容了 Hive UDX,如果用戶原來使用的 Hive SQL,而又想要將任務實時化,基本上邏輯不需要發生太大變化。與此同時,引用的 UDX 也不需要做什麼改動,只需要加以引用就可以使用了,這樣一來就有助於實現批流一體化。
Join 能力
在 Join 能力方面,滴滴的 StreamSQL 也花了很多精力來進行支持。第一方面所支持的就是 Join 比較長的跨度能力,比如滴滴的順風車業務可能在發單到接單的跨度能夠達到一個星期的時間,在這段時間之內如果 Join 都基於內存實現是不太可能的,因此滴滴在這種場景下 Join 都是放到狀態裡面去,通過 TTL 窗口實現。比如順風車的 TTL 需要配成 7 天,7 天之後自動過期,這樣就可以實現窗口的能力。
在維表 Join 方面,滴滴的 StreamSQL 則支持了 HBase、滴滴內部研發的 KV Store 以及 MySQL 等的 Join,這部分解決的問題包括字段補全,以及將司機 ID、司機姓名、明細等 Join 在一起方便下一個任務使用。
三、平臺化建設
StreamSQL IDE
在平臺化建設方面,滴滴目前已經構建了一站式 StreamSQL 開發平臺。
首先,滴滴提供了 StreamSQL 的 IDE。因為目前大部分業務是基於 SQL 開發的,因此 IDE 需要承擔非常重要的職責,在 StreamSQL IDE 裡面除了提供了本身的 SQL 之外,還提供了 SQL 模板,如果用戶想要開發流式 SQL 時不需要從零開始,只需要選擇一個 SQL 模板,並在這個模板之上進行修修改改即可達到期望的結果。
此外,StreamSQL IDE 還提供了 UDF 的庫,相當於一個庫如果不知道具有什麼含義以及如何使用,用戶只需要在 IDE 上搜索到這個庫,就能夠找到使用說明以及使用案例。
IDE 所提供的第三個附加功能就是語法檢測和智能提示,用戶在寫 SQL 的過程中能夠為用戶做出實時的語法檢測並給出一定的反饋,與此同時,也能夠對一些表和字段等提供一些智能提示的能力,使用起來也非常方便。
StreamSQL IDE 的第四種能力是 Debug,這種能力對於流計算而言是非常重要的,如果沒有這種能力可能需要先開發、再提交、運行一段時間之後再通過結果來看是否存在問題,因此 StreamSQL IDE 提供了在線的 Debug 能力,用戶可以直接上傳一個文件,並構造一個大約 100 行的數據,運行之後立即能夠在控制檯上打印出 SQL 輸出結果,並校驗是否正確。
除了上傳數據之外,StreamSQL IDE 還支持採樣 Topic 中數據進行驗證。
StreamSQL IDE 的最後一個功能就是版本管理,因為業務版本需要不斷升級,而升級時也可能需要回退,因此 StreamSQL IDE 也提供了版本管理功能。
任務管控
如今,滴滴的所有流計算任務都是通過 Web 化入口提交的,從一開始的時候就沒有暴露出客戶端,這也保證任務的發展是全部可控的。除了 Web 化入口之外,滴滴也提供了全流程的任務生命週期管理,包括任務的提交、任務的停止以及任務的升級、回滾等。因為客戶端是流計算團隊自己控制的,因此參數可以自己設置,如果需要調整也可以直接在 Web 化的入口進行統一的修改。
任務運維
滴滴在任務運維方面所做的優化大致包括四個方面,首先是日誌檢索,因為 Flink 的日誌是打在本地的,而通過 Flink UI 來看日誌的體驗是非常糟糕的,所以滴滴對於 Flink 的日誌採集到了 Elasticsearch 集群上去,通過 Web 化界面來檢索日誌,方便大家調查問題。
第二個是指標監控,因為 Flink 的指標非常多,因此通過 Flink UI 來看指標的體驗也非常差,因此構建了一個 Web 的報表平臺,通過將指標採集到 Druid 中去,通過查詢 Druid 來查詢報表。
第三點就是報警,這部分比較有意思,因為基本上所有的系統都會提供報警能力,但是報警裡面就會涉及到發報警和收報警的兩部分人員,對於發報警的人而言,都是希望報警信息儘量多發,這樣可以免責;而對於收報警的人而言,則希望儘量少收到報警,因為看報警信息也需要花費時間和精力,因此在兩方面需要做到很好的平衡,既能夠將該發的報警信息發出去,又不能亂髮報警信息。
最後一點,滴滴在實時任務的運維方面還提供了血緣追蹤的能力,因為實時任務和離線任務不同,其任務鏈路非常長,從採集到消息通道再到流計算以及下游的存儲之間存在四到五個環節,這裡面如果無法追蹤,那麼在查找問題時將會是災難性的,因此需要進行血緣追蹤。
Meta 化建設
滴滴目前在做的一件事情就是 Meta 化建設,流計算任務的開發需要先定義 DDL,再寫 DML,這樣的代價是比較大的。而無論是通過 Hive SQL 還是 Spark SQL, DDL 都已經在 Meta Store 裡面了,其實是不需要重複寫的。為了解決上述的問題,滴滴也正在做實時 Meta 的相關工作,希望將實時的數據比如 Kafka 的流定義成一個實時表存儲到 Meta Store 裡面去,用戶只需要寫 DML 語句即可。
在任務提交的時候,系統會自動地將 DDL 語句補充上去,變成完成的 StreamSQL 進行提交。這是滴滴目前正在實現中的一個功能,這樣的功能實現之後可以進一步降低 Meta 數據量,同時也能夠進一步打通批處理業務和流處理業務,使得兩者的開發步驟更加趨近相同。
批流一體化
雖然 Flink 具備批流的核心能力,但是在滴滴內部還沒有完全實現。滴滴首先希望能夠在產品上實現批流一體化,上面提到通過 Meta 化建設之後能夠實現整個滴滴只有一個 Meta Store,無論是 Hive 的表、 Kafka 的 Topic 還是下游的 HBase 或者 ES,都能夠定義到 Meta Store 裡面去, Hive 能夠查詢 Meta Store, Spark、 Presto 以及 Flink 也都能夠查詢 Meta Store,這樣使得整個 SQL 開發起來是完全一致的。進一步來說,可以根據 SQL 消費的 Source 來判斷到底是批計算任務還是流計算任務,如此實現產品的批流一體化體驗。
四、挑戰與規劃
面臨的挑戰
滴滴實時計算所面臨的挑戰主要有三點:
- 大狀態管理
Flink 中一個比較重要的特點就是具有狀態,而狀態有時候會非常大,而且時間比較長,因此數據對齊需要非常大的開銷。
另外一點,在做 Checkpoint 的時候,磁盤 IO 會變大,進而引起機器負載升高,從而影響任務運行,因此 Checkpoint 能不能用好還是會影響服務穩定性的。因為 Checkpoint 是一個黑盒,那麼想要看到 Checkpoint 內部的東西去做一些修改和驗證,就需要狀態診斷的能力。
第三點還需要提供全鏈路的 Exactly Once 的能力。
- 業務高可用
在滴滴內部,越來越多的業務將 Java 或者 Golang 寫的業務搬到 Flink 上去,讓 Flink 來解決容錯和擴展問題以及編程模型問題,這樣就需要原來本地開發所有的診斷手段在 Flink 上都能夠具備,比如在業務不能停止的情況下做透明升級,以及一旦出現問題之後如何快速解決等,因此快速診斷也是非常重要的能力。
在之後還需要資源伸縮的能力,因為業務存在早高峰和晚高峰還有平峰,而業務量還在增長,還需要保證活動或者節假日的流量速增情況下的任務還處於穩定狀態,這些都是需要解決的問題。
3.多語言
雖然今天在滴滴大部分實時任務都是通過 SQL 來開發的,但是依舊不能100%覆蓋全部的場景,有些場景下是需要寫代碼的。
Flink 提供了 Java 和 Scala 這兩種 API,但這對於業務人員而言依然是不夠的,因為業務大部分是 Go 語言系或者 Python 語言系的,因此滴滴希望根據社區來提供多語言的開發 Flink 的能力,比如寫 SQL,而 UDF 也可以通過多語言來開發。
未來規劃
最後為大家介紹滴滴在 Flink 應用和實踐方面的未來規劃。
- 首先,需要提供高可用的流計算服務,使得 Flink 能夠具備支撐完整線上能力的機制。
- 其次,滴滴也在探索實時機器學習,原來任務是以天級別進行更新的,而在現在的模型中,藉助 Flink 的能力已經能夠做到10分鐘到15分鐘的更新,在下一步將會進一步實現秒級別的更新。
- 最後一點就是實時數倉,能否實現數倉報表的實時化,能否使得指標口徑能夠和離線保持一致,是否能夠使得實時業務和離線業務相互補數據,都時滴滴實時計算未來的挑戰。
作者介紹:
樑李印,2010 年至 2014 年,阿里雲 Hadoop 集群(雲梯 I)負責人之一;2014 年至 2016 年,阿里雲分佈式圖計算框架 (MaxCompute Graph) 研發;2017 年至 2019 年,滴滴出行負責實時計算及 OLAP 建設,目前負責滴滴大數據架構部。