大數據

PyFlink 在聚美優品的應用實踐

大家好,我是來自聚美優品刷寶大數據部門的吳攀剛,本文將跟大家分享 PyFlink 在刷寶的應用,包括:背景介紹、架構演進、技術選型以及一個問題的解決思路分享。

刷寶是一款短視頻 APP,涵蓋短視頻、直播視頻等內容,為用戶提供快樂視頻和優質的主播。在來到聚美之前,我主要做離線數倉開發和數據開發,來到刷寶之後,部門也並沒有現成的實時框架,需要自行搭建。所以,當實時的需求來到我面前的時候,內心是忐忑的。

下面我將分享下,我與 PyFlink 的緣分。

1.背景介紹

業務場景

刷寶有許多重要的業務場景,其中之一是為用戶實時推薦短視頻。其中推薦的實時性,決定了用戶在視頻上的停留時長、觀看視頻時長、留存等指標,進而影響到廣告位的收益,比如廣告的單價等。

刷寶從 2019 年開始,業務飛速發展,截止到 2020 年 5 月份,用戶行為數據峰值每秒過百萬,每天有 200 億數據。這個業務量,對我們現有的技術架構、數據計算的實時性提出了挑戰。

實時化挑戰

我們的數據流程整個環節完成需要1小時左右時間,遠達不到實時的要求。如何更快速的根據用戶瀏覽習慣實時推薦相關視頻會對用戶觀看視頻時長、停留時長、留存等有重大的影響,比如在現有基礎上提升10-20%。

我們更期望數據的計算實時化,也就是將原有技術架構中的批量計算(hive)變成實時計算(Flink SQL),架構圖如下。

2.架構演進

架構演進

1.jpg

  • 第一層:最開始是離線計算,完成一次計算需要30分鐘,還不包括後續的模型處理;
  • 第二層:考慮實時計算後,我們打算採取 Flink 架構來處理,整體主件過程如圖;
  • 第三層:考慮到人力和時間等成本,還有技術人員技能匹配度,最終選擇第三層;

我們成員更多的是對 Python 和 SQL 熟悉,所以 PyFlink 更加適合我們。我們用 PyFlink 開發了 20 個業務作業,目前每秒過百萬,每天有 200 億,業務平穩運行(PyFlink 1.10)。

3.技術選型

面對實時化的業務和架構升級需求,我們團隊本身沒有 Spark、Flink 等框架的背景積累,但是一個偶然的機會,我們觀看了金竹老師的直播,瞭解到了 PyFlink 是 Flink 的 Python API 和我團隊現有的開發人員語言技能比較吻合。所以就想利用 PyFlink 進行業務的實時化升級。

2.jpg

看完金竹老師的分享,我對 PyFlink 有了一個簡單的瞭解,就和團隊同學一起規劃瞭解 PyFlink,進行技術選型。

初識與困難

雖然 PyFlink 和團隊的語言技能比較 match,但是其中還是涉及到很多 Flink 的環境、文檔、算子等的使用問題,遇到了很多困難:

  • PyFlink 的知識文檔、示例、答疑等都非常少,除了官網和阿里雲,基本無其他參考。
  • PyFlink 官方文檔缺少很多細節,比如:給了方法不給參數格式。
  • PyFlink 的內容不明確,官網上沒有明確具體寫出哪些 PyFlink 沒有,哪些有。沒法將 Flink 和 PyFlink 清晰的區分開。
  • PyFlink 本身等侷限性,比如:left/rigint Join 產生 retraction 無法寫入 Kafka,要寫入需要改寫 Flink SQL 讓流改為 append 模式,或者修改 kafka-connector 源碼支持 retraction。

所以一時感覺利用 PyFlink 的學習時間也比較漫長。大家比較擔心短時間內很難滿足業務開發。

機遇

在我和團隊擔心開發進度時候,我也一直關注 Flink 社區的動態,恰巧發現 Flink 社區在進行 “PyFlink 扶持計劃”,所以我和團隊都眼前一亮,填寫了 PyFlink 調查問卷。也和金竹老師進行了幾次郵件溝通。最終有幸參與了 PyFlink 社區扶持計劃。

4. OOM 報錯解決思路分享

其實瞭解下來 PyFlink 的開發是非常便捷的,在完成了第一個作業的開發之後,大家逐漸熟悉 PyFlink 的使用,3周左右就完成了 20 個業務邏輯的開發,進入了測試階段。這個快速一方面是團隊成員不斷的熟悉 PyFlink,一方面是由社區 PyFlink 團隊金竹/付典等老師的幫助和支持。這裡,不一一為大家分享全部內容,我這裡列舉一個具體的例子。

■ 背景:

從接觸到 Flink 開始,有個別 job,一直有 running beyond physical memory limits 問題。多次調整 tm 內存,修改 tm 和 slos 的比例,都沒用,最終還是會掛。最後妥協的方案是,增加自動重啟次數,定期重啟任務

■ 現象:

Flink job 通常會穩定運行5-6天,然後就報出這個錯誤。一直持續和反覆。

■ 詳細信息:

Closing TaskExecutor connection container_e36_1586139242205_122975_01_000011 because: Container [pid=45659,containerID=container_e36_1586139242205_122975_01_000011] is running beyond physical memory limits. Current usage: 4.0 GB of 4 GB physical memory used; 5.8 GB of 32 GB virtual memory used. Killing container.
    Dump of the process-tree for container_e36_1586139242205_122975_01_000011 :
    |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
    |- 45659 45657 45659 45659 (bash) 0 0 115814400 297 /bin/bash -c /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.out 2> /data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.err 
    |- 45705 45659 45659 45659 (java) 13117928 609539 6161567744 1048471 /usr/local/jdk//bin/java -Xms2764m -Xmx2764m -XX:MaxDirectMemorySize=1332m -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/test.bin -Dlog.file=/data/emr/yarn/logs/application_1586139242205_122975/container_e36_1586139242205_122975_01_000011/taskmanager.log -Dlogback.configurationFile=file:./logback.xml -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .

    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143

我們的解決思路:

    1. 從內容上看是 oom 問題,所以一開始調整了 tm 大小,直接到最大內存,2調整 tm 和 slot 的比例,儘量做到 1v1.
    2. dump heap 的內存,分析佔用情況。
    3. 調整 backend state 的類型

結果:以上手段都失敗了,在持續一段時間後,依然一定報錯。

PyFlink 團隊處理思路:

1.分析當前作業的 state 情況,作業情況,作業環境參數情況。通過 flink-conf 可以看 backend state 情況,通過 flinkdashboard 可以知道作業圖和環境參數。

  1. 由於 1.10 中,rocksdb statebackend 佔用的內存默認為非 managed memory,通過在 PyFlink 作業中增加如下代碼,可以將其設置為 managed memory:env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)
  2. 為了分析 OOM 是否是由於 rocksdb statebackend 佔用的內存持續增長導致的,開啟了關於 rocksdb 的監控,因為我們使用的是 rocksdb,這裡需要在 flink-conf 中增加如下配置:
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-usage: true
                            state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.size-all-mem-tables: true

然後通過自建的 metrics 系統來收集展示和分析,我們使用的 grafana。

  1. 通過前面的步驟,觀察到 rocksdb 的內存基本是穩定的,內存佔用符合預期,懷疑是“rocksdb 超用了一點點,或者是 jvm overhead 不夠大”導致的。這兩種問題,都可以通過調整 jvm overhead 的相關參數來解決。於是在 flink-conf 中添加了配置:
taskmanager.memory.jvm-overhead.min: 1024m

taskmanager.memory.jvm-overhead.max: 2048m

用大佬的原話:rocksdb 超用了一點點,或者是 jvm overhead 不夠大,這兩種情況調大 jvm overhead 應該都能解決。

  1. 調整 flink.size 的大小,讓 flink 自動計算出 process.size,這部分在 flink-conf:
 taskmanager.memory.flink.size: 1024m

完成所有調整後,經歷了14天的等待,job 運行正常,這裡充分說明了問題被解決了。同時開始觀察 rocksdb 的 metrics 情況,發現 native 內存會超用一些,但是 rocksdb 整體保持穩定的。目前能判斷出某個地方用到的 native 內存比 flink 預留的多,大概率是用戶代碼或者第三方依賴,所以加大下 jvm-overhead 大數值,能解決問題。

  1. 最終需要修改的參數有:

1) 在 pyflink 作業中增加如下代碼:

env.get_state_backend()._j_rocks_db_state_backend.getMemoryConfiguration().setUseManagedMemory(True)

2) flink-conf 修改或增加:

taskmanager.memory.jvm-overhead.min: 1024m
taskmanager.memory.jvm-overhead.max: 2048m
taskmanager.memory.process.size: 6144m

其實針對這個業務升級,老闆為了不影響最終的業務上線,起初我們準備了2套方案同時進行:

  • 基於某個雲平臺進行平臺搭建和開發;
  • 基於開源 PyFlink 進行代碼開發;

兩個方案同時進行,最終我們團隊基於 PyFlink 開發快速的完成了業務開發和測試。最終達到了我前面所說的每秒百萬/每天200億的穩定業務支撐。

重點,重點,重點,參與這個業務升級的開發只有2個人。

5.總結和展望

通過 PyFlink 的學習,刷寶大數據團隊,在短時間能有了實時數據開發的能力。目前穩定運行了 20+PyFlink 任務,我們對接了多個需求部門,如推薦部門、運營、廣告等;在多種場景下,模型畫像計算、AB 測試系統、廣告推薦、用戶召回系統等,使用了 PyFlink。為我們的業務提供了堅實穩定的實時數據。

此外,我們將搭建 Flink on Zeppelin 這樣的實時計算平臺,擴大 Flink 開發用戶群體,進一步簡化 Flink 開發成本。Flink 1.11 版本也準備上線,Python UDF 功能會有進一步的優化,Pandas 模塊也會被引入。假如讀者和我們一樣,期望能快速擁有實時的能力,以 Python 語言為主,並且還有數據開發/數倉的能力,PyFlink 將是不二之選。

如果您也對 PyFlink 社區扶持計劃感興趣,可以填寫下方問卷,與我們一起共建 PyFlink 生態。

3.jpg

Leave a Reply

Your email address will not be published. Required fields are marked *