作者:孫金城(金竹)
Flink 從 1.9.0 版本開始增加了對 Python 的支持(PyFlink),在剛剛發佈的 Flink 1.10 中,PyFlink 添加了對 Python UDFs 的支持,現在可以在 Table API/SQL 中註冊並使用自定義函數。PyFlink 的架構如何,適用於哪些場景?本文將詳細解析並進行 CDN 日誌分析的案例演示。
PyFlink 的必要性
Flink on Python and Python on Flink
PyFlink 是什麼?這個問題也許會讓人感覺問題的答案太明顯了,那就是 Flink + Python,也就是 Flink on Python。那麼到底 Flink on Python 意味著這什麼呢?那麼一個非常容易想到的方面就是能夠讓 Python 用享受到 Flink 的所有功能。其實不僅如此,PyFlink 的存在還有另外一個非常重要的意義就是,Python on Flink,我們可以將 Python 豐富的生態計算能力運行在 Flink 框架之上,這將極大的推動 Python 生態的發展。其實,如果你再仔細深究一下,你會發現這個結合並非偶然。
Python 生態和大數據生態
Pythoh 生態與大數據生態有密不可分的關係,我們先看看大家都在用 Python 解決什麼實際問題?通過一份用戶調查我們發現,大多數 Python 用戶正在解決 ”數據分析“,”機器學習“的問題,那麼這些問題場景在大數據領域也有很好的解決方案。那麼 Python 生態和大數據生態結合,拋開擴大大數據產品的受眾用戶之外,對 Python 生態一個特別重要到意義就是單機到分佈式的能力增強,我想,這也是大數據時代海量數據分析對 Python 生態的強需求。
Why Flink and Why Python
好了, Python 生態和大數據的結合是時代的要求,那麼 Flink 為啥選擇 Python 生態作為多語言支持的切入點,而不是 Go 或者 R 呢?作為用戶的你,為啥選擇 PyFlink 而不是 PySpark 或者 PyHive 呢?
首先我們說說選擇 Flink 的理由:
- 第一,最主要的是架構優勢, Flink 是純流架構的流批統一的計算引擎;
- 第二,從 ASF 的客觀統計看, Flink 是 2019 年度最活躍的開源項目,這意味著 Flink 鮮活的生命力;
- 第三, Flink 不僅僅是開源項目而且也經歷過無數次,各個大數據公司的生產環境的歷練,值得信賴。
那麼我們再來看看 Flink 在選擇多語言支持時候,為啥選擇了 Python 而不是其他語言呢?我們還是看一下數據統計,如下: Python 語言流行程度僅次於 Java 和 C,其實我們發現自18年開始 Python 的發展非常迅速,並且還在持續。那麼 Java/Scala 是 Flink 的默認語言,所以選擇 Python 來進行 Flink 多語言的支持似乎很合理。這些權威的統計信息,大家可以在我提供的鏈接進行查看更詳細的信息。
目前看 PyFlink 的產生是時代的必然,但僅僅想清楚 PyFlink 存在的意義還遠遠不夠,因為我們最終的目標是讓 Flink 和 Python 用戶受益,真真正正的解決實際的現實問題。所以,我們還需要繼續深入,一起探究 PyFlink 該如何落地?
PyFlink 架構
任何事情在想清楚之後,還要做明白,要將 PyFlink 落地,首要解決的是分析清楚要達成的核心目標和要達成目標解決的核心問題。那麼 PyFlink 的核心目標到底是什麼呢?
PyFlink 的核心目標
我們在前面的分析過程中已經提到過,這裡我們再具體化一下,PyFlink 的核心目標:
- 將 Flink 能力輸出到 Python 用戶,進而可以讓 Python 用戶使用所有的 Flink 能力。
- 將 Python 生態現有的分析計算功能運行到 Flink 上,進而增強 Python 生態對大數據問題的解決能力。
圍繞這 2 個核心的目標,我們再來分析,要達成這樣的目標,需要解決的核心問題是什麼?
Flink 功能 Python 化
為了 PyFlink 落地,我們需要在 Flink 上開發一套和現有 Java 一樣的 Python 的引擎嗎?答案是 NO,這在 Flink 1.8 之前已經嘗試過。我們做設計有一個很好的原則就是追求以最小的代價完成既定的目標,所以最好的方式是僅僅提供一層 Python API 複用現有的計算引擎。
那麼對於 Flink 而言我們要提供怎樣的 Python API 呢?那就是我們熟知的: High-level 的 TableAPI/SQL 和有狀態的 DataStream API。好,我們現在的思考越來越切近 Flink 內部了,接踵而來的問題就是,我們如何提供一套 Python 的 Table API 和 DataStream API 呢?核心要解決的問題是什麼呢?
■ Flink 功能 Python 化的核心問題
核心問題顯而易見是 Python VM 和 Java VM 的握手,他們之間要建立通訊,這是 Flink 多語言支持的核心問題。好,面對核心問題我們要進行技術選型. Here we go…
■ Flink 功能 Python 化的 VM 通訊技術選型
就當前的 Java VM 和 Python VM 通訊的問題而言,目前比較顯著的解決方案有 Apache Beam,一個著名的多語言多引擎支持項目,另外一個專門解決 Java VM 和 Python VM 通訊問題的 Py4J。我們從不同視角進行分析對比, 首先, Py4J 和 Beam 對比,就好像有穿山功能的穿山甲和一個力量強大的大象,要穿越一道牆,我們可以打個洞,也可以推到整面牆。所以在當前 VM 通訊的場景, Beam 顯得有些複雜。因為 Beam 在通用性上做了很多的努力,在極端情況會喪失一定程度的靈活性。
從另一個視角來看, Flink 本身有交互式編程的需求,比如 FLIP-36 ,同時還要在多語言支持的同時,確保各種語言的接口設計語義一致性,這些在 Beam 現有的架構下很難滿足。所以在這樣一種思考下,我們選擇 Py4J 作為 Java VM 和 Python VM 之間通訊的橋樑。
■ Flink 功能 Python 化的技術架構
其實如果我們解決了 Python VM 和 Java VM 通訊的問題,本質上是在努力達成我們第一個目標,就是將現有 Flink 功能輸出給 Python 用戶,也就是我們 Flink 1.9 所完成的工作,接下來我們看看 Flink 1.9 PyFlink API 的架構,如下:
我們利用Py4J解決通訊問題,在 PythonVM 啟動一個 Gateway,並且 Java VM 啟動一個 Gateway Server 用於接受 Python 的請求,同時在 Python API 裡面提供和 Java API 一樣的對象,比如 TableENV, Table,等等。這樣 Python 在寫 Python API 的時候本質是在調用 Java API。當然,在 Flink 1.9 中還解決了作業部署問題,我們可以用 Python 命令,Python shell 和 CLI 等多種方式進行作業提交。
那麼基於這樣的架構有怎樣的優勢呢?第一個就是簡單,並確保 Python API 語義和 Java API 的一致性,第二點, Python 作業可以達到和 Java 一樣的極致性能,那麼 Java 的性能怎樣呢?我想大家已經熟知,在去年雙 11 Flink Java API 已經具備了每秒25.51億次的數據處理的能力。
Python 生態分佈化
OK,在完成了現有 Flink 功能向 Python 用戶的輸出之後,接下來我們繼續探討,如何將 Python 生態功能引入 Flink 中,進而將 Python 功能分佈式化。如何達成?通常我們可以有如下2種做法:
- 選擇有代表性的 Python 類庫,將其 API 增加到 PyFlink 中,這種方式是一個漫長的過程,因為 Python 的生態庫太多了,但無論如何,我們在引入這些 APIs 之前,首要解決的問題是,解決 Python 的執行問題。
- 我們結合現有 Flink Table API 的現狀和現有 Python 類庫的特點,我們可以對現有所有的 Python 類庫功能視為 用戶自定義函數(UDF),集成到 Flink 中。這樣我們就找到了集成 Python 生態到 Flink 中的手段是將其視為 UDF,也就是我們 Flink 1.10 中的工作。那麼集成的核心問題是什麼?沒錯,剛才說過,是 Python UDF 的執行問題。
好,我們針對這個核心問題進行技術選型吧,Here we go…
■ Python 生態分佈化的 UDF 執行技術選型
解決 Python UDF 執行問題可不僅僅是 VM 之間通訊的問題了,它涉及到 Python 執行環境的管理,業務數據在 Java 和 Python 之間的解析, Flink State Backend 能力向 Python 的輸出, Python UDF 執行的監控等等,是一個非常複雜的問題。面對這樣複雜的問題,前面我們介紹過 Apache Beam ,支持多引擎多語言,無所不能的大象可以出場了,我們來看一下 Beam 是怎麼解決 Python UDF 執行問題的 :)
Beam 為了解決多語言和多引擎支持問題高度抽象了一個叫 Portability Framework 的架構,如下圖,Beam 目前可以支持 Java/Go/Python 等多種語言,其中圖下方 Beam Fu Runners 和 Execution 之間就解決了 引擎和 UDF 執行環境的問題。其核心是對利用 Protobuf 進行數據結構抽象,利用 gRPC 協議進行通訊,同時封裝了核心的 gRPC 服務。所以這時候 Beam 更像是一隻螢火蟲,照亮了 PyFlink 解決 UDF 執行問題之路。:)(多說一嘴,螢火蟲已經成為了 Aapche Beam 的吉祥物)。
我們接下來看看 Beam 到底提供了哪些 gRPC 服務。
如圖 Runner部分是 Java 的算子執行, SDK Worker部分是 Python 的執行環境, Beam已經抽象 Control/Data/State/Logging 等服務。並這些服務已經在 Beam 的 Flink runner 上穩定高效的運行了很久了。所以在 PyFlink UDF 執行上面我們可以站在巨人的肩膀上了:),這裡我們發現 Apache Beam 在 API 層面和在 UDF 的執行層面都有解決方案,而 PyFlink 在 API 層面採用了 Py4J 解決 VM 通訊問題,在 UDF 執行需求上採用了 Beam 的 Protability Framework 解決 UDF 執行環境問題。
這也表明了 PyFlink 在技術選型上嚴格遵循以最小的代價達成既定目標的原則,在技術選型上永遠會選擇最合適的,最符合 PyFlink 長期發展的技術架構。(BTW,與 Beam 的合作過程中,我也向 Beam 社區提交了20+的優化 patch)。
■ Python 生態分佈化的 UDF 技術架構
在 UDF 的架構中我們我既要考慮 Java VM 和 Python VM 的通訊問題,又要考慮在編譯階段和在運行階段的不同需求。圖中我們以綠色表示 Java VM 的行為,藍色表示 Python VM 的行為。首先我們看看編譯階段,也就是local的設計,在local的設計是純 API 的 mapping 調用,我們仍然要過 Py4J 來解決通訊問題,也就是如圖 Python 每執行一個 API 就會同步的調用 Java 所對應的 API 。
對 UDF 的支持上,需要添加 UDF 註冊的 API , register_function,但僅僅是註冊還不夠,用戶在自定義 Python UDF 的時候往往會依賴一些三方庫,所以我們還需要增加添加依賴的方法,那就是一系列的 add 方法,比如 add_Python_file()。在編寫 Python 作業的同時, Java API 也會同時被調用在提交作業之前,Java 端會構建JobGraph。然後通過 CLI 等多種方式將作業提交到集群進行運行。
我們再來看看運行時 Python 和 Java 的不同分工情況,首先在 Java 端與普通 Java 作業一樣, JobMaster 將作業分配給 TaskManger , TaskManager 會執行一個個 Task ,task裡面就涉及到了Java和Python的算子執行。在Python UDF的算子中我們會設計各種 gRPC 服務來完成 Java VM 和 Python VM 的各種通訊,比如 DataService 完成業務數據通訊, StateService 完成 Python UDF 對 Java Statebackend 的調用,當然還有 Logging 和 Metrics 等其他服務。
這些服務都是基於 Beam 的 Fn API 來構建的,最終在Python的 Worker 裡面運行用戶的 UDF,運行結束之後再利用對應的 gRPC 服務將結果返回給 Java 端的 PythonUDF 算子。當然 Python 的 worker 不僅僅是 Process 模式,可以是 Docker 模式甚至是 External 的服務集群。這種擴展機制,為後面 PyFlink 與 Python 生態的其他框架集成打下了堅實的基礎,在後面我們介紹 PyFlink 大圖的時候,我們會介紹這一點:)。好,這就是 PyFlink 在 1.10 中引入 Python UDF 支持的架構。那麼這樣的架構有怎樣的優勢呢?
首先,Beam 是一個成熟的多語言支持框架,基於 Beam 進行架構我們後面可以很容易進行其他語言的支持擴展。 同時 Beam 對 State 的服務抽象也方便 PyFlink 增加對 Stateful UDF 的支持。還有一個方面是方便維護,同一套框架由 Apache Beam 和 Apache Flink 兩個非常活躍的社區共同維護和優化 …
PyFlink 場景,怎麼用?
好了解了這麼多關於 PyFlink 的架構和架構背後的思考,我們還是以一個具體場景案例,來增加一些對 PyFlink 的體感吧!
PyFlink 適用的場景
在具體的案例之前我們先簡單分享一些 PyFlink 所能適用的業務場景。首先 PyFlink 既然是 Python+Flink,那其適用場景也可以從 java 和 Python 兩方面去分析,第一個 Java 所適用的場景 PyFlink 都適用。
- 第一個,事件驅動型,比如:刷單,監控等;
- 第二個,數據分析型的,比如:庫存,雙11大屏等;
- 第三個適用的場景是數據管道,也就是ETL場景,比如一些日誌的解析等;
- 第四個場景,機器學習,比如個性推薦等。
這些都可以嘗試使用 PyFlink 。除此之外還有 Python 生態特有的場景,比如科學計算等。那麼這麼多的應用場景,PyFlink 到底有哪些可用的 API 呢?
PyFlink 的安裝
使用具體的 API 開發之前,首先要安裝 PyFlink,目前 PyFlink 支持 pip install 進行安裝,這裡特別提醒一下具體命令是:pip install apache-Flink。
PyFlink 的 APIs
目前 PyFlink API 完全與 Java Table API 對齊,各種關係操作都支持,同時對 window 也有很好的支持,並且這裡稍微提一下就是 PyFlink 裡面有些易用性 API 比 SQL 還要強大,比如:這些對 columns 進行操作的 APIs。除了這些 APIs,PyFlink還提供多種定義 Python UDF 的方式。
PyFlink 的 UDF 定義
首先,可以擴展 ScalarFunction,這種方式可以提供更多的輔助功能,比如添加 Metrics 。除此之外 Python 語言所支持的任何方式的方法定義,在 PyFlink UDF 中都是支持的,比如:Lambda Function,Named Function 和 CallableFunction等。
當定義完方法後,我們用 PyFlink 所提供的 Decorators 進行打標,並描述 input 和 output 的數據類型就可以了。當然後面版本我們也可以根據 Python 語言的 type hint 特性再進一步簡化,進行類型推導。為了直觀,我們看一個具體的 UDF 定義的例子:
Python UDF 定義示例
我們定義兩個數相加的例子,首先導入必須的類,然後是剛才我們提到的幾種定義方式。這個簡單直接,我們閒話少敘,開始看看實際的案例吧:)
PyFlink 案例-阿里雲 CDN 實時日誌分析
我們這裡以一個阿里雲 CDN 實時日誌分析的例子來介紹如何用 PyFlink 解決實際的業務問題。CDN 我們都很熟悉,為了進行資源的下載加速。那麼 CDN 日誌的解析一般有一個通用的架構模式,就是首先要將各個邊緣節點的日誌數據進行採集,一般會採集到消息隊列,然後將消息隊列和實時計算集群進行集成進行實時的日誌分析,最後將分析的結果寫到存儲系統裡面。那麼我今天的案例將架構實例化,消息隊列採用 Kafka,實時計算採用Flink,最終將數據存儲到 MySQL 中。
■ 阿里雲 CDN 實時日誌分析需求說明
我們在來看看業務統計的需求,為了介紹方便,我們將實際的統計需求進行簡化,示例中只進行按地區分組,進行資源訪問量,下載量和下載速度的統計。數據格式我們只選取核心的字段,比如:uuid,表示唯一的日誌標示,client_ip 表示訪問來源,request_time 表示資源下載耗時, response_size 表示資源數據大小。其中我們發現我們需求是按地區分組,但是原始日誌裡面並沒有地區的字段信息,所以我們需要定義一個 Python UDF 根據 client_ip 來查詢對應的地區。好,我們首先看如何定義這個 UDF。
■ 阿里雲 CDN 實時日誌分析 UDF 定義
這裡我們用了剛才提到的 named function 的方式定義一個 ip_to_province() 的UDF,輸入是 ip 地址,輸出是地區名字字符串。我們這裡描述了輸入類型是一個字符串,輸出類型也是一個字符串。當然這裡面的查詢服務僅供演示,大家在自己的生產環境要替換為可靠的地域查詢服務。
import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
"""
format:
{
'ip': '27.184.139.25',
'pro': '河北省',
'proCode': '130000',
'city': '石家莊市',
'cityCode': '130100',
'region': '靈壽縣',
'regionCode': '130126',
'addr': '河北省石家莊市靈壽縣 電信',
'regionNames': '',
'err': ''
}
"""
try:
urlobj = urlopen( \
'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
data = str(urlobj.read(), "gbk")
pos = re.search("{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
else:
return geo_data['err']
except:
return "UnKnow"
■ 阿里雲 CDN 實時日誌分析 Connector 定義
我們完成了需求分析和 UDF 的定義,我們開始進行作業的開發了,按照通用的作業結構,需要定義 Source connector 來讀取 Kafka 數據,定義 Sink connector 來將計算結果存儲到 MySQL。最後是編寫統計邏輯。
在這特別說明一下,在 PyFlink 中也支持 SQL DDL 的編寫,我們用一個簡單的 DDL 描述,就完成了 Source Connector的開發。其中 connector.type 填寫 kafka。SinkConnector 也一樣,用一行DDL描述即可,其中 connector.type 填寫 jdbc。描述 connector 的邏輯非常簡單,我們再看看核心統計邏輯是否也一樣簡單:)
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.interval' = '1s'
)
"""
■ 阿里雲 CDN 實時日誌分析核心統計邏輯
首先從數據源讀取數據,然後需要先將 clien_ip 利用我們剛才定義的 ip_to_province(ip) 轉換為具體的地區。之後,在進行按地區分組,統計訪問量,下載量和資源下載速度。最後將統計結果存儲到結果表中。這個統計邏輯中,我們不僅使用了Python UDF,而且還使用了 Flink 內置的 Java AGG 函數,sum 和 count。
# 核心的統計邏輯
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 轉換為地區名稱
"response_size, request_time")\
.group_by("province")\
.select( # 計算訪問量
"province, count(uuid) as access_count, "
# 計算下載總量
"sum(response_size) as total_download, "
# 計算下載速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
■ 阿里雲 CDN 實時日誌分析完整代碼
我們在整體看一遍完整代碼,首先是核心依賴的導入,然後是我們需要創建一個ENV,並設置採用的 planner(目前Flink支持Flink和blink兩套 planner)建議大家採用 blink planner。
接下來將我們剛才描述的 kafka 和 mysql 的 ddl 進行表的註冊。再將 Python UDF 進行註冊,這裡特別提醒一點,UDF所依賴的其他文件也可以在API裡面進行制定,這樣在job提交時候會一起提交到集群。然後是核心的統計邏輯,最後調用 executre 提交作業。這樣一個實際的CDN日誌實時分析的作業就開發完成了。我們再看一下實際的統計效果。
import os
from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl
# 創建Table Environment, 並選擇使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
# 創建Kafka數據源表
t_env.sql_update(kafka_source_ddl)
# 創建MySql結果表
t_env.sql_update(mysql_sink_ddl)
# 註冊IP轉換地區名稱的UDF
t_env.register_function("ip_to_province", ip_to_province)
# 添加依賴的Python文件
t_env.add_Python_file(
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")
# 核心的統計邏輯
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 轉換為地區名稱
"response_size, request_time")\
.group_by("province")\
.select( # 計算訪問量
"province, count(uuid) as access_count, "
# 計算下載總量
"sum(response_size) as total_download, "
# 計算下載速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")
# 執行作業
t_env.execute("pyFlink_parse_cdn_log")
■ 阿里雲 CDN 實時日誌分析運行效果
我們採用 mock 的數據向 kafka 發送 CDN 日誌數據,右邊實時的按地區統計資源的訪問量,下載量和下載速度。這個示例的 mock 數據工具,源代碼和操作過程,在今天的直播後,會更新到我的博客當中。方便大家在自己的環境中進行體驗。
PyFlink 未來,會怎樣?
總體來看 PyFlink 的業務開發還是非常簡潔的,不用關心底層的實現細節,只需要按照SQL或者Table API的方式描述業務邏輯就行。那麼,我們再整體看看PyFlink的未來會怎樣呢?
PyFlink 本心驅動 Roadmap
PyFlink 的發展始終要以本心驅動,我們要圍繞將現有 Flink 功能輸出到 Python 用戶,將 Python 生態功能集成到Flink當中為目標。PyFlink的 Roadmap 如圖所示:首先解決 Python VM 和 Java VM 的通訊問題,然後將現有的 Table API 功能暴露給 Python 用戶,提供 Python Table API,這也就是 Flink 1.9 中所進行的工作,接下來我們要為將Python功能集成到Flink做準備就是集成 Apache Beam,提供 Python UDF 的執行環境,並增加Python 對其他類庫依賴的管理功能,為用戶提供 User-defined-Funciton 的接口定義,支持 Python UDF,這就是 Flink 1.10 所做的工作。
為了進一步擴大Python生態的分佈式功能,PyFlink 將提供 Pandas 的 Series 和 DataFram 的支持,也就是用戶可以在 PyFlink 中直接使用 Pandas 的UDF。同時為增強用戶的易用性,讓用戶有更多的方式使用 PyFlink,後續增加在 Sql Client 中使用 Python UDF。面對 Python 用戶的機器學習問題,增加 Python 的 ML pipeline API。監控 Python UDF 的執行情況對,對實際的生產業務非常關鍵,所以 PyFlink 會增加 Python UDF 的 Metric 管理。這些點將在 Flink 1.11 中將與用戶見面。
但這些功能只是 PyFlink 規劃的冰山一角,後續我們還要進行性能優化,圖計算API,Pandas on Flink 的 Pandas 原生 API 等等。。。進而完成不斷將 Flink 現有功能推向 Python 生態,將 Python 生態的強大功能不斷集成到 Flink 當中,進而完成 Python 生態分佈化的初衷。
PyFlink 1.11 預覽
我們快速的預覽一下即將與大家見面的 Flink 1.11 中的 PyFlink 的重點內容。
■ 功能
我們將視角由遠方拉近到 Flink 1.11 版本 PyFlink 的核心功能,PyFlink 會圍繞著 功能,性能和易用性不斷努力,在 1.11 在功能上會增加 Pandas UDF 的支持,這樣Pandas 生態的實用類庫功能可以在 PyFlink 中直接使用,比如累積分佈函數, CDF 等。
還會增加 ML Pipeline API 的支持,這樣大家可以利用 PyFlink 完成一些機器學習場景的業務需求,我這裡是一個使用 pyFlink 完成 KMeans 的示例。
■ 性能
在性能上 PyFlink 也會有更多的投入,我們利用 Codegen,CPython,優化序列化和反序列化的方式提高 PythonUDF 的執行性能,目前我們初步對 1.10 和 1.11 進行性能對比來看,1.11 將比 1.10 有近 15 倍的性能提升。
■ 易用性
在用戶的易用性上 PyFlink 會在 SQL DDL 和 SQL Client 中增加對 Python UDF 的支持。讓用戶有更多的方式選擇來使用 PyFlink。
PyFlink 大圖(使命願景)
今天已經介紹了很多,比如什麼是 PyFlink,PyFlink 的存在的意義,PyFlink API 架構,UDF 架構,以及架構背後的取捨和現有架構的優勢,並介紹了 CDN 的案例,介紹了 PyFlink 的 Roadmap,預覽了 Flink 1.11 版本中 PyFlink 的重點,那麼接下來還有什麼呢?
那麼最後我們再來看看 PyFlink 的未來會怎樣?在以 “Flink 功能 Python 化,Python 生態分佈化” 的使命驅動下,PyFlink 會有怎樣的佈局?我們快速分享一下:PyFlink 是 Apache Flink 的一部分,涉及到 Runtime 層面和 API 層面。
在這兩個層面 PyFlink 會有怎樣的發展? Runtime 層面,PyFlink 會構建解決 Java VM 和 Python VM 的通訊問題的 gRPC 通用服務,比如(Control/Data/State等)在這套框架之上會抽象出 Java 的 Python UDF 算子,Python 的執行容器構建,支持多種 Python 的 Execution,比如 Process,Docker 和 External,尤其值得強調的是 External 以Socket 的方式提供了無限的擴展能力,在後續的 Python 生態集成上至關重要。
API 層面,我們會使命驅動,將 Flink 上所以的 API 進行 Python 化,當然這也依託於引入 Py4J 的 VM 通訊框架之上,PyFlink 會逐漸增加各種 API 的支持,Python Table API,UDX 的接口 API,ML Pipeline,DataStream,CEP,Gelly,State,等Flink所具備的 Java APIs 和 Python 生態用戶的最愛 Pandas APIs 等。在這些 API 的基礎之上,PyFlink 還會不斷的進行生態系統的集成,比如 方便用戶開發的 Notebook 的集成,Zeppelin,Jupyter,並與阿里開源的 Alink 進行集成,目前 PyAlink 已經完全應用了 PyFlink 所提供的功能,後面還會和現有的 AI 系統平臺進行集成,比如大家熟知的 TensorFlow 等等。
所以此時我會發現使命驅動的力量會讓 PyFlink 的生命線不斷延續…當然這種生命的延續更需要更多的血液融入。這裡再次強調一下 PyFlink 的使命:“Flink 能力 Python 化,Python 生態分佈化”。目前 PyFlink 的核心貢獻者們正以這樣的使命而持續活躍在社區。
PyFlink 核心貢獻者及問題支持
在分享的最後,我想介紹一下目前 PyFlink 的核心貢獻者。
首先是付典,目前付典是 Flink 以及另外兩個 Apache 頂級項目的 Committer,在PyFlink 模塊做了巨大的貢獻。
第二位是黃興勃,目前專注 PyFlink 的 UDF 性能優化,曾經是阿里與安全算法挑戰賽的冠軍,在 AI 和中間件性能比賽中也有很好的成績。
第三位就是大家熟知的程鶴群,為大家做過多次分享,相信大家還記得他為大家帶來的《Flink 知識圖譜》分享。
第四位是鍾葳,關注 PyFlink 的 UDF 依賴管理和易用性工作,目前已經有很多的代碼貢獻。最後一個是我自己。大家後續在使用 PyFlink 的時候,如果有什麼問題都可以聯繫我們中的任何一位尋求支持。
當然遇到通用性問題還是建議大家郵件到 Flink 的用戶列表和中文用戶列表,這樣能問題共享。當然如果你遇到特別急的個別問題,也非常歡迎您郵件到剛才介紹的小夥伴郵箱,同時,為了問題的積累和有效的分享,更期望大家遇到問題可以在 Stackoverflow 進行提問題。首先搜索你遇到問題是否已經被解答過,如果沒有,請描述清楚,最後提醒大家要為問題打上 PyFlink 的 tags。這樣我們及時訂閱回覆您問題。
總結
今天深入剖析了 PyFlink 深層含義;介紹了 PyFlink API 架構是核心採用 Py4J 框架進行 VM 之間的通訊,API 的設計上 Python API 和 Java API 保持語義的一致;還介紹了 Python UDF 架構,以集成 Apache Beam 的 Portability Framework 的方式獲取高效穩定的 Python UDF 的支持,並且細緻分析了架構背後思考,對技術選型的取捨和現有架構的優勢;
然後介紹了 PyFlink 所適用的業務場景,並以阿里雲 CDN 日誌實時分析的案例讓大家對 PyFlink 的使用有一定的體感;
最後介紹了 PyFlink 的 Roadmap 和預覽了 Flink 1.11 版本中 PyFlink 的重點,預期 PyFlink 1.11 相對於1.10會有15倍以上的性能提升,最後和大家一起分享了 PyFlink 的使命,PyFlink 的使命是 ”Flink能力Python化,Python生態分佈化”。
留在最後的是提供給大家一種更有效的問題求助的方式,大家有什麼問題可以隨時拋給剛才向大家介紹的 PyFlink 小夥伴,那麼這些小夥伴已經在直播群裡了,接下來有什麼問題,我們可以一起探討。:)
作者介紹:
孫金城(金竹),2011 年加入阿里,9 年的阿里工作中,主導過很多內部核心系統,如,阿里集團行為日誌,阿里郎,雲轉碼,文檔轉換等。在 2016 年初開始瞭解 Apache Flink 社區,由初期的參與社區開發到後來逐漸主導具體模塊的開發,到負責 Apache Flink Python API(PyFlink) 的建設。 目前是 PMC member of Apache Flink and ALC(Beijing), 以及 Committer for Apache Flink, Apache Beam and Apache IoTDB。