一、背景
Flink 1.13 已於近期正式發佈,超過 200 名貢獻者參與了 Flink 1.13 的開發,提交了超過 1000 個 commits,完成了若干重要功能。其中,PyFlink 模塊在該版本中也新增了若干重要功能,比如支持了 state、自定義 window、row-based operation 等。隨著這些功能的引入,PyFlink 功能已經日趨完善,用戶可以使用 Python 語言完成絕大多數類型Flink作業的開發。接下來,我們詳細介紹如何在 Python DataStream API 中使用 state & timer 功能。
二、state 功能介紹
作為流計算引擎,state 是 Flink 中最核心的功能之一。
- 在 1.12 中,Python DataStream API 尚不支持 state,用戶使用 Python DataStream API 只能實現一些簡單的、不需要使用 state 的應用;
- 而在 1.13 中,Python DataStream API 支持了此項重要功能。
state 使用示例
如下是一個簡單的示例,說明如何在 Python DataStream API 作業中使用 state:
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = ValueStateDescriptor('cnt', Types.LONG())
# 定義value state
self.cnt_state = runtime_context.get_state(state_desc)
def map(self, value):
cnt = self.cnt_state.value()
if cnt is None:
cnt = 0
new_cnt = cnt + 1
self.cnt_state.update(new_cnt)
return value[0], new_cnt
def state_access_demo():
# 1. 創建 StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. 創建數據源
seq_num_source = NumberSequenceSource(1, 100)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
# 3. 定義執行邏輯
ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
# 4. 將打印結果數據
ds.print()
# 5. 執行作業
env.execute()
if __name__ == '__main__':
state_access_demo()
在上面的例子中,我們定義了一個 MapFunction,該 MapFunction 中定義了一個名字為 “cnt_state” 的 ValueState,用於記錄每一個 key 出現的次數。
說明:
- 除了 ValueState 之外,Python DataStream API 還支持 ListState、MapState、ReducingState,以及 AggregatingState;
- 定義 state 的 StateDescriptor 時,需要聲明 state 中所存儲的數據的類型(TypeInformation)。另外需要注意的是,當前 TypeInformation 字段並未被使用,默認使用 pickle 進行序列化,因此建議將 TypeInformation 字段定義為 Types.PICKLED_BYTE_ARRAY() 類型,與實際所使用的序列化器相匹配。這樣的話,當後續版本支持使用 TypeInformation 之後,可以保持後向兼容性;
- state 除了可以在 KeyedStream 的 map 操作中使用,還可以在其它操作中使用;除此之外,還可以在連接流中使用 state,比如:
ds1 = ... # type DataStream
ds2 = ... # type DataStream
ds1.connect(ds2) \
.key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
.map(MyCoMapFunction()) # 可以在MyCoMapFunction中使用state
可以使用 state 的 API 列表如下:
操作 | 自定義函數 | |
---|---|---|
KeyedStream | map | MapFunction |
flat_map | FlatMapFunction | |
reduce | ReduceFunction | |
filter | FilterFunction | |
process | KeyedProcessFunction | |
ConnectedStreams | map | CoMapFunction |
flat_map | CoFlatMapFunction | |
process | KeyedCoProcessFunction | |
WindowedStream | apply | WindowFunction |
process | ProcessWindowFunction |
state 工作原理
上圖是 PyFlink 中,state 工作原理的架構圖。從圖中我們可以看出,Python 自定義函數運行在 Python worker 進程中,而 state backend 運行在 JVM 進程中(由 Java 算子來管理)。當 Python 自定義函數需要訪問 state 時,會通過遠程調用的方式,訪問 state backend。
我們知道,遠程調用的開銷是非常大的,為了提升 state 讀寫的性能,PyFlink 針對 state 讀寫做了以下幾個方面的優化工作:
- Lazy Read:
對於包含多個 entry 的 state,比如 MapState,當遍歷 state 時,state 數據並不會一次性全部讀取到 Python worker 中,只有當真正需要訪問時,才從 state backend 讀取。
- Async Write:
當更新 state 時,更新後的 state,會先存儲在 LRU cache 中,並不會同步地更新到遠端的 state backend,這樣做可以避免每次 state 更新操作都訪問遠端的 state backend;同時,針對同一個 key 的多次更新操作,可以合併執行,儘量避免無效的 state 更新。
- LRU cache:
在 Python worker 進程中維護了 state 讀寫的 cache。當讀取某個 key 時,會先查看其是否已經被加載到讀 cache 中;當更新某個 key 時,會先將其存放到寫 cache 中。針對頻繁讀寫的 key,LRU cache 可以避免每次讀寫操作,都訪問遠端的 state backend,對於有熱點 key 的場景,可以極大提升 state 讀寫性能。
- Flush on Checkpoint:
為了保證 checkpoint 語義的正確性,當 Java 算子需要執行 checkpoint時,會將 Python worker中的寫 cache 都 flush 回 state backend。
其中 LRU cache 可以細分為二級,如下圖所示:
說明:
- 二級 cache 為 global cache,二級 cache 中的讀 cache 中存儲著當前 Python worker 進程中所有緩存的原始 state 數據(未反序列化);二級 cache 中的寫 cache 中存儲著當前 Python worker 進程中所有創建的 state 對象。
- 一級 cache 位於每一個 state 對象內,在 state 對象中緩存著該 state 對象已經從遠端的 state backend 讀取的 state 數據以及待更新回遠端的 state backend 的 state 數據。
工作流程:
- 當在 Python UDF 中,創建一個 state 對象時,首先會查看當前 key 所對應的 state 對象是否已經存在(在二級 cache 中的 “Global Write Cache” 中查找),如果存在,則返回對應的 state 對象;如果不存在,則創建新的 state 對象,並存入 “Global Write Cache”;
- state 讀取:當在 Python UDF 中,讀取 state 對象時,如果待讀取的 state 數據已經存在(一級 cache),比如對於 MapState,待讀取的 map key/map value 已經存在,則直接返回對應的 map key/map value;否則,訪問二級 cache,如果二級 cache 中也不存在待讀取的 state 數據,則從遠端的 state backend 讀取;
- state 寫入:當在 Python UDF 中,更新 state 對象時,先寫到 state 對象內部的寫 cache 中(一級 cache);當 state 對象中待寫回 state backend 的 state 數據的大小超過指定閾值或者當遇到 checkpoint 時,將待寫回的 state 數據寫回遠端的 state backend。
state 性能調優
通過前一節的介紹,我們知道 PyFlink 使用了多種優化手段,用於提升 state 讀寫的性能,這些優化行為可以通過以下參數配置:
配置 | 說明 |
---|---|
python.state.cache-size | Python worker 中讀 cache 以及寫 cache 的大小。(二級 cache)需要注意的是:讀 cache、寫 cache是獨立的,當前不支持分別配置讀 cache 以及寫 cache 的大小。 |
python.map-state.iterate-response-batch-size | 當遍歷 MapState 時,每次從 state backend 讀取並返回給 Python worker 的 entry 的最大個數。 |
python.map-state.read-cache-size | 一個 MapState 的讀 cache 中最大允許的 entry 個數(一級 cache)。當一個 MapState 中,讀 cache 中的 entry 個數超過該閾值時,會通過 LRU 策略從讀 cache 中刪除最近最少訪問過的 entry。 |
python.map-state.write-cache-size | 一個 MapState 的寫 cache 中最大允許的待更新 entry 的個數(一級 cache)。當一個 MapState 中,寫 cache 中待更新的 entry 的個數超過該閾值時,會將該 MapState 下所有待更新 state 數據寫回遠端的 state backend。 |
需要注意的是,state 讀寫的性能不僅取決於以上參數,還受其它因素的影響,比如:
- 輸入數據中 key 的分佈:
輸入數據的 key 越分散,讀 cache 命中的概率越低,則性能越差。
- Python UDF 中 state 讀寫次數:
state 讀寫可能涉及到讀寫遠端的 state backend,應該儘量優化 Python UDF 的實現,減少不必要的 state 讀寫。
- checkpoint interval:
為了保證 checkpoint 語義的正確性,當遇到 checkpoint 時,Python worker 會將所有緩存的待更新 state 數據,寫回 state backend。如果配置的 checkpoint interval 過小,則可能並不能有效減少 Python worker 寫回 state backend 的數據量。
- bundle size / bundle time:
當前 Python 算子會將輸入數據劃分成多個批次,發送給 Python worker 執行。當一個批次的數據處理完之後,會強制將 Python worker 進程中的待更新 state 寫回 state backend。與 checkpoint interval 類似,該行為也可能會影響 state 寫性能。批次的大小可以通過 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 參數控制。
三、timer 功能介紹
timer 使用示例
除了 state 之外,用戶還可以在 Python DataStream API 中使用定時器 timer。
import datetime
from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment
class CountWithTimeoutFunction(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(ValueStateDescriptor(
"my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = Row(value.f1, 0, 0)
# update the state's count
current[1] += 1
# set the state's timestamp to the record's assigned event time timestamp
current[2] = ctx.timestamp()
# write the state back
self.state.update(current)
# schedule the next timer 60 seconds from the current event time
ctx.timer_service().register_event_time_timer(current[2] + 60000)
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
# get the state for the key that scheduled the timer
result = self.state.value()
# check if this is an outdated timer or the latest timer
if timestamp == result[2] + 60000:
# emit the state on timeout
yield result[0], result[1]
class MyTimestampAssigner(TimestampAssigner):
def __init__(self):
self.epoch = datetime.datetime.utcfromtimestamp(0)
def extract_timestamp(self, value, record_timestamp) -> int:
return int((value[0] - self.epoch).total_seconds() * 1000)
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a TIMESTAMP(3),
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
""")
stream = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
watermarked_stream = stream.assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(MyTimestampAssigner()))
# apply the process function onto a keyed stream
watermarked_stream.key_by(lambda value: value[1])\
.process(CountWithTimeoutFunction()) \
.print()
env.execute()
在上述示例中,我們定義了一個 KeyedProcessFunction,該 KeyedProcessFunction 記錄每一個 key 出現的次數,當一個 key 超過 60 秒沒有更新時,會將該 key 以及其出現次數,發送到下游節點。
除了 event time timer 之外,用戶還可以使用 processing time timer。
timer 工作原理
timer 的工作流程是這樣的:
- 與 state 訪問使用單獨的通信信道不同,當用戶註冊 timer 之後,註冊消息通過數據通道發送到 Java 算子;
- Java 算子收到 timer 註冊消息之後,首先檢查待註冊 timer 的觸發時間,如果已經超過當前時間,則直接觸發;否則的話,將 timer 註冊到 Java 算子的 timer service 中;
- 當 timer 觸發之後,觸發消息通過數據通道發送到 Python worker,Python worker 回調用戶 Python UDF 中的的 on_timer 方法。
需要注意的是:由於 timer 註冊消息以及觸發消息通過數據通道異步地在 Java 算子以及 Python worker 之間傳輸,這會造成在某些場景下,timer 的觸發可能沒有那麼及時。比如當用戶註冊了一個 processing time timer,當 timer 觸發之後,觸發消息通過數據通道傳輸到 Python UDF 時,可能已經是幾秒中之後了。
四、總結
在這篇文章中,我們主要介紹瞭如何在 Python DataStream API 作業中使用 state & timer,state & timer 的工作原理以及如何進行性能調優。接下來,我們會繼續推出 PyFlink 系列文章,幫助 PyFlink 用戶深入瞭解 PyFlink 中各種功能、應用場景以及最佳實踐等。
另外,阿里雲實時計算生態團隊長期招聘優秀大數據人才(包括實習 + 社招),我們的工作包括:
- 實時機器學習:支持機器學習場景下實時特徵工程和 AI 引擎配合,基於 Apache Flink 及其生態打造實時機器學習的標準,推動例如搜索、推薦、廣告、風控等場景的全面實時化;
- 大數據 + AI 一體化:包括編程語言一體化 (PyFlink 相關工作),執行引擎集成化 (TF on Flink),工作流及管理一體化(Flink AI Flow)。
如果你對開源、大數據或者 AI 感興趣,請發簡歷到:[email protected]
此外,也歡迎大家加入 “PyFlink交流群”,交流 PyFlink 相關的問題。
活動推薦
阿里雲基於 Apache Flink 構建的企業級產品-實時計算 Flink 版現開啟6月限時活動:
0元試用實時計算 Flink 全託管版本(包年包月、10CU)即可有機會獲得 Flink 獨家定製T恤;另包3個月及以上還有85折優惠!
瞭解活動詳情:https://www.aliyun.com/product/bigdata/sc