大數據

Apache Flink 說道系列 – PyFlink集成Pandas(1+1 != 2)

開篇說道

說道聊什麼,聊阿里人熟知的“因為相信,所以簡單”!這是每個人內心所神往的日常工作生活!這看似 簡單的言語,透徹的道理,雖被大眾所認可,但人們對其執行的能力卻大相徑庭。“因為相信,所以簡單” 不是對我們的約束,而是為我們尋求快樂指明瞭方向...永遠做別人成功路上的基石...當TA人踏上人生巔峰,基石自具人生高度!

image.png

Python已經發展成為許多數據處理領域中最重要的編程語言之一。Python的流行程度如此之大,以至於它幾乎成了數據科學家默認的數據處理語言。除此之外,還有大量基於Python的數據處理工具,如NumPy、Pandas和scikitlearn,這些工具由於其靈活性或強大的功能而獲得了廣泛的普及。

Python has evolved into one of the most important programming languages for many fields of data processing. So big has been Python’s popularity, that it has pretty much become the default data processing language for data scientists. On top of that, there is a plethora of Python-based data processing tools such as NumPy, Pandas, and Scikit-learn that have gained additional popularity due to their flexibility or powerful functionalities.

image.png

Pic source: VanderPlas 2017, slide 52[1]

為了滿足用戶的需求,Flink社區希望更好地利用這些工具。為此,Flink社區花了很大的努力將Pandas與最新的Flink1.11中的PyFlink進行集成。增加的新特性包括對Pandas UDF的支持以及Pandas DataFrame和Table之間的轉換。Pandas UDF不僅大大提高了Python UDF的執行性能,而且使用戶更方便地利用Python UDF中的Pandas和NumPy等庫。此外,為Pandas DataFrame和Table之間的轉換提供支持,用戶可以無縫地切換處理引擎,而不需要中間連接器。在本文的剩餘部分中,我們將通過一個逐步的示例介紹這些功能是如何工作的。

In an effort to meet the user needs and demands, the Flink community hopes to leverage and make better use of these tools. Along this direction, the Flink community put some great effort in integrating Pandas into PyFlink with the latest Flink version 1.11. Some of the added features include support for Pandas UDF and the conversion between Pandas DataFrame and Table. Pandas UDF not only greatly improve the execution performance of Python UDF, but also make it more convenient for users to leverage libraries such as Pandas and NumPy in Python UDF. Additionally, providing support for the conversion between Pandas DataFrame and Table enables users to switch processing engines seamlessly without the need for an intermediate connector. In the remainder of this article, we will introduce how these functionalities work and how to use them with a step-by-step example.

Note: Currently, only Scalar Pandas UDFs are supported in PyFlink.

Pandas UDF in Flink 1.11

在flink1.10中已經可以使用標量Python UDF了,正如在博客《一小時吃透PyFlink》描述的那樣 標量Python udf基於三個主要步驟工作:

Using scalar Python UDF was already possible in Flink 1.10 as described in a previous article on the Flink blog. Scalar Python UDFs work based on three primary steps:

  • Java算子序列化數據成字節流發送到Python算子中;
  1. Java operator serializes one input row to bytes and sends them to the Python worker;

    • Python算子反序列化字節數據利用用戶的UDF進行計算;
  2. Python worker deserializes the input row and evaluates the Python UDF with it;

    • 最後Python算子將計算結果序列化發送回到Java 算子
  3. resulting row is serialized and sent back to the Java operator

雖然在PyFlink中提供對Python udf的支持極大地改善了用戶體驗,但它也有一些缺點,即導致:

While providing support for Python UDFs in PyFlink greatly improved the user experience, it had some drawbacks, namely resulting in:

  • 序列化反序列化成本很高
    High serialization/deserialization overhead
  • 在利用數據科學家使用的流行Python庫(如Pandas或NumPy)時遇到困難,這些庫提供了高性能的數據結構和函數,但是普通的UDF無法支持。
    Difficulty when leveraging popular Python libraries used by data scientists — such as Pandas or NumPy — that provide high-performance data structure and functions.

引入Pandas UDF就是為了解決這些缺點。對於Pandas UDF,在JVM和PVM之間以 Columnar 格式(Arrow memory format)傳輸一批行。這批行將被轉換為Pandas系列的集合,並將其傳輸到Pandas UDF,然後利用流行的Python庫(如Pandas、NumPy等)來實現Python UDF。

The introduction of Pandas UDF is used to address these drawbacks. For Pandas UDF, a batch of rows is transferred between the JVM and Python VM in a columnar format (Arrow memory format). The batch of rows will be converted into a collection of Pandas Series and will be transferred to the Pandas UDF to then leverage popular Python libraries (such as Pandas, Numpy, etc.) for the Python UDF implementation.

image.png

與普通Python UDF相比,Vectorized UDF的性能通常要高得多,因為通過 Apache Arrow序列化/反序列化開銷被最小化了。 利用Pandas.Series作為輸入/輸出數據結構,我們可以充分利用Pandas and NumPy功能。與NumPy生態庫的集成將成為並行化機器學習和其他大規模、分佈式數據科學計算的流行解決方案(如特徵工程、分佈式模型應用)。

The performance of vectorized UDFs is usually much higher when compared to the normal Python UDF, as the serialization/deserialization overhead is minimized by falling back to Apache Arrow, while handling Pandas.Series as input/output allows us to take full advantage of the Pandas and NumPy libraries, making it a popular solution to parallelize Machine Learning and other large-scale, distributed data science workloads (e.g. feature engineering, distributed model application).

Conversion between PyFlink Table and Pandas DataFrame

Pandas DataFrame是Python社區中處理表格數據的事實標準,而PyFlink Table是Flink用Python語言表示的表格數據。提供PyFlink Table和Pandas DataFrame之間的轉換,將允許在Python中處理數據時在PyFlink和Pandas之間無縫切換。用戶可以使用一個執行引擎處理數據,並輕鬆切換到另一個執行引擎。例如,如果用戶手頭已經有一個Pandas DataFrame,並且想要執行一些昂貴的轉換,他們可以很容易地將其轉換為PyFlink Table,並利用Flink引擎進行分佈式計算。另一方面,用戶還可以將PyFlink Table 轉換為Pandas DataFrame,然後利用Pandas生態系統提供的豐富功能進行轉換計算。

Pandas DataFrame is the de-facto standard for working with tabular data in the Python community while PyFlink Table is Flink’s representation of the tabular data in Python language. Enabling the conversion between PyFlink Table and Pandas DataFrame allows switching between PyFlink and Pandas seamlessly when processing data in Python. Users can process data using one execution engine and switch to a different one effortlessly. For example, in case users already have a Pandas DataFrame at hand and want to perform some expensive transformation, they can easily convert it to a PyFlink Table and leverage the power of the Flink engine. On the other hand, users can also convert a PyFlink Table to a Pandas DataFrame and perform the same transformation with the rich functionalities provided by the Pandas ecosystem.

Examples

在Apache Flink中使用Python需要安裝PyFlink。PyFlink可以通過PyPI獲得,並且可以使用pip輕鬆安裝:

Using Python in Apache Flink requires installing PyFlink. PyFlink is available through PyPI and can be easily installed using pip:

Check Python Version/檢查Python版本

$ python --version
Python 3.7.6

Note: Python 3.5 or higher is required to install and run PyFlink.

如果你目前不是3.5+,我們也可以利用virtualenv:

$ pip install virtualenv
$ virtualenv --python /usr/local/bin/python3 py37
$ source py37/bin/activate

然後進行PyFlink安裝:

$ python -m pip install apache-flink

Using Pandas UDF

Pandas UDFs 使用 pandas.Series 作為輸入並返回與輸出入長度相同的pandas.Series。Pandas UDFs的應用和普通UDF一樣。只要將自定義項標記為Pandas,也就是在UDF decorator中添加額外的參數UDF_type=“Pandas”:

Pandas UDFs take pandas.Series as the input and return a pandas.Series of the same length as the output. Pandas UDFs can be used at the exact same place where non-Pandas functions are currently being utilized. To mark a UDF as a Pandas UDF, you only need to add an extra parameter udf_type=”pandas” in the udf decorator:


@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def interpolate(id, temperature):
    # takes id: pandas.Series and temperature: pandas.Series as input
    df = pd.DataFrame({'id': id, 'temperature': temperature})

    # use interpolate() to interpolate the missing temperature
    interpolated_df = df.groupby('id').apply(
        lambda group: group.interpolate(limit_direction='both'))

    # output temperature: pandas.Series
    return interpolated_df['temperature']

上面的Pandas UDF使用Pandas的dataframe.interpolate()為每個設備id插入缺失的溫度數據的功能。這是一種常見的IOT場景需求,每個設備需要報告其設備溫度,但由於各種原因,溫度數據可能為空。下面是如何在PyFlink中使用Pandas UDF的完整示例。

The Pandas UDF above uses the Pandas dataframe.interpolate() function to interpolate the missing temperature data for each equipment id. This is a common IoT scenario whereby each equipment/device reports it’s id and temperature to be analyzed, but the temperature field may be null due to various reasons. Below is a complete example of how to use the Pandas UDF in PyFlink.


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
import pandas as pd

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def interpolate(id, temperature):
    # takes id: pandas.Series and temperature: pandas.Series as input
    df = pd.DataFrame({'id': id, 'temperature': temperature})

    # use interpolate() to interpolate the missing temperature
    interpolated_df = df.groupby('id').apply(
        lambda group: group.interpolate(limit_direction='both'))

    # output temperature: pandas.Series
    return interpolated_df['temperature']

t_env.register_function("interpolate", interpolate)

my_source_ddl = """
    create table mySource (
        id INT,
        temperature FLOAT 
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        id INT,
        temperature FLOAT 
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'csv',
        'connector.path' = '/tmp/output'
    )
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

t_env.from_path('mySource')\
    .select("id, interpolate(id, temperature) as temperature") \
    .insert_into('mySink')

t_env.execute("pandas_udf_demo")

To submit the job, you:

  • Firstly, you need to prepare the input data in the “/tmp/input” file. For example,

$ echo -e  "1,98.0\n1,\n1,100.0\n2,99.0" > /tmp/input
  • Next, you can run this example on the command line,

$ python pandas_udf_demo.py

該命令在本地集群中構建並運行Python Table API程序。您還可以使用不同的命令行將Python Table API程序提交到遠程集群,請參閱[此處](https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples)。

The command builds and runs the Python Table API program in a local mini-cluster. You can also submit the Python Table API program to a remote cluster using different command lines, see more details here.

  • Finally, you can see the execution result on the command line. Here you will find that all the temperature data with an empty value has been interpolated:

$  cat /tmp/output
1,98.0
1,99.0
1,100.0
2,99.0

Conversion between PyFlink Table and Pandas DataFrame

我們可以使用 from_pandas() 方法從 Pandas DataFrame創建PyFlink Ta ble
,也可以使用toPandas()方法將PyFlink Table 轉換為Pandas DataFrame。

You can use the from_pandas() method to create a PyFlink Table from a Pandas DataFrame or use the to_pandas() method to convert a PyFlink Table to a Pandas DataFrame.


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
import pandas as pd
import numpy as np

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Create a PyFlink Table
pdf = pd.DataFrame(np.random.rand(1000, 2))
table = t_env.from_pandas(pdf, ["a", "b"]).filter("a > 0.5")

# Convert the PyFlink Table to a Pandas DataFrame
pdf = table.to_pandas()
print(pdf)

Conclusion & Upcoming work

本文介紹了flink1.11對Pandas的集成,包括Pandas UDF和table與Pandas之間的轉換。實際上,在最新的Apache Flink版本中,PyFlink中添加了許多實用的特性,比如支持User-defined Table functions和Python UDF 的用戶定義Metrics。此外,從flink1.11開始,您可以使用Cython支持構建PyFlink,並對Python UDF進行“Cythonize”,從而顯著提高代碼執行速度(與flink1.10中的Python UDF相比,提高了30倍)。
In this article, we introduce the integration of Pandas in Flink 1.11, including Pandas UDF and the conversion between table and Pandas. In fact, in the latest Apache Flink release, there are many excellent features added to PyFlink, such as support of User-defined Table functions and User-defined Metrics for Python UDFs. What’s more, from Flink 1.11, you can build PyFlink with Cython support and “Cythonize” your Python UDFs to substantially improve code execution speed (up to 30x faster, compared to Python UDFs in Flink 1.10).

image.png

社區未來的工作將集中在添加更多的特性和在後續版本中帶來更多的優化。這樣的優化和添加包括Python DataStream API和與Python生態系統的更多集成,例如在Flink中支持分佈式Pandas。請繼續關注即將發佈的更多更新!

Future work by the community will focus on adding more features and bringing additional optimizations with follow up releases. Such optimizations and additions include a Python DataStream API and more integration with the Python ecosystem, such as support for distributed Pandas in Flink. Stay tuned for more information and updates with the upcoming releases!

shiming.gif

小結

很喜歡,也要堅持執行 逍遙子 的那句經典:
從“求同存異” 到 “求同尊異”,是尋找同路人必經的過程 Finding Our Fellow Travelers.

=======PyFlink團隊歡迎你。。。==========

團隊介紹:
阿里巴巴實時計算團隊聚焦在 Apache Flink 和周邊生態上,工作覆蓋 Flink 所有相關領域,圍繞 Flink 打造通用的實時計算解決方案。團隊服務於阿里經濟體內部所有 BU,阿里雲外部客戶,以及 Flink 社區的用戶。
Flink 生態團隊是阿里實時計算部的核心團隊之一,主要負責從應用場景的角度出發,自上而下構建 Flink 完整生態(例如 Flink AI Flow, TF on Flink,多語言支持PyFlink)並改進和完善 Flink 核心引擎功能。

工作方向:
Flink AI,Flink Python/R/Go等語言支持以及Python生態庫與Flink的集成,如分佈式Pandas的開發!詳見這裡

工作內容:

  1. 透徹理解 Flink 的應用場景,構建合理的技術方案。包括分析、抽象和改進 Flink 引擎核心功能和 API,設計實現圍繞 Flink 的周邊服務。
  2. 通過 Flink 引擎和生態技術支持服務集團內外的客戶。

經驗能力要求:

  1. 大數據 / 工程項目(分佈式系統、Python生態庫開發等)經驗,開源項目 Committer / PMC 優先。
  2. 獨立設計開發大中型系統框架、模塊和服務的經驗,對性能、接口、可擴展性、兼容性,高可用性等方面有較深刻的理解。
  3. 有以下項目開發經驗者優先: Flink, Spark,Kafka,Pulsar,NumPy,Pandas,PyTorch,etc.
  4. 溝通能力強,具有國際社區開發經驗優先。
  5. 技術上有追求極致的精神。

歡迎私聊,釘釘:金竹, 微信:18158190225

Leave a Reply

Your email address will not be published. Required fields are marked *