使用Blink SQL+UDAF實現差值聚合計算介紹瞭如何使用Blink SQL+UDAF實現實時流上的差值聚合計算,後來在與@付典就業務需求和具體實現方式進行探討時,付典提出通過CEP實現的思路和方法。
本文介紹通過CEP實現實時流上的差值聚合計算。
感謝@付典在實現過程中的指導。筆者水平有限,若有紕漏,請批評指出。
一、客戶需求
電網公司每天採集各個用戶的電錶數據(格式如下表),其中data_date為電錶數據上報時間,cons_id為電錶id,r1為電錶度數,其他字段與計算邏輯無關,可忽略。為了後續演示方便,僅輸入cons_id=100000002的數據。
no(string) | data_date(string) | cons_id(string) | org_no(string) | r1(double) |
---|---|---|---|---|
101 | 20190716 | 100000002 | 35401 | 13.76 |
101 | 20190717 | 100000002 | 35401 | 14.12 |
101 | 20190718 | 100000002 | 35401 | 16.59 |
101 | 20190719 | 100000002 | 35401 | 18.89 |
表1:輸入數據
電網公司希望通過實時計算(Blink)對電錶數據處理後,每天得到每個電錶最近兩天(當天和前一天)的差值數據,結果類似如下表:
cons_id(string) | data_date(string) | subDegreeR1(double) |
---|---|---|
100000002 | 20190717 | 0.36 |
100000002 | 20190718 | 2.47 |
100000002 | 20190719 | 2.3 |
表2:期望的輸出數據
二、需求分析
根據業務需求以及CEP跨事件模式匹配的特性,定義兩個CEP事件e1和e2,輸出e2.r1-e1.r1即可得到差值。
三、CEP開發及測試結果
參考複雜事件處理(CEP)語句,CEP代碼如下:
CREATE TABLE input_dh_e_mp_read_curve (
`no` VARCHAR,
data_date VARCHAR,
cons_id VARCHAR,
org_no VARCHAR,
r1 DOUBLE,
ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
type = 'datahub',
endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
project = 'jszc_datahub',
topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
cons_id varchar
,data_date varchar
,subDegreeR1 DOUBLE
)with(
type = 'print'
);
insert into data_out
select
cons_id,
data_date,
subDegreeR1
from input_dh_e_mp_read_curve
MATCH_RECOGNIZE(
PARTITION BY cons_id
ORDER BY ts
MEASURES
e2.data_date as data_date,
e2.r1 - e1.r1 as subDegreeR1
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN(e1 e2)
DEFINE
e1 as TRUE,
e2 as TRUE
);
由於使用了print connector,從對應的sink的taskmanager.out日誌中可以查看到輸出如下:
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006
對比期望輸出(表2),20190717和20190718兩個窗口的數據均正確,表明業務邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190719的數據沒有輸出,這是因為我們設置了watermark,測試環境下20190719之後沒有數據進來觸發20190719對應的窗口的結束。
四、其他說明
1、對比使用Blink SQL+UDAF實現差值聚合計算(1),我們可以看出使用CEP開發代碼非常簡潔,所以在跨事件處理的情況下CEP還是非常的合適。從另外一個方面講,同樣的需求有不同的實現方式,所以融會貫通Blink SQL中的各種語法,利用更合適的語法來實現業務需求,將可能大大提升工作效率和業務性能。
2、在實現本案例時,筆者發現使用CEP時有如下需要注意的地方:
(1)partiton by裡的字段(如本案的cons_id),默認會帶到輸出裡,若同時在MEASURES中定義,則可能會報類似如下錯誤:
(2)define及其內容必須定義,否則前端頁面提示類似如下錯誤: