開發與維運

使用Blink CEP實現差值聚合計算

使用Blink SQL+UDAF實現差值聚合計算介紹瞭如何使用Blink SQL+UDAF實現實時流上的差值聚合計算,後來在與@付典就業務需求和具體實現方式進行探討時,付典提出通過CEP實現的思路和方法。
本文介紹通過CEP實現實時流上的差值聚合計算。
感謝@付典在實現過程中的指導。筆者水平有限,若有紕漏,請批評指出。

一、客戶需求

電網公司每天採集各個用戶的電錶數據(格式如下表),其中data_date為電錶數據上報時間,cons_id為電錶id,r1為電錶度數,其他字段與計算邏輯無關,可忽略。為了後續演示方便,僅輸入cons_id=100000002的數據。

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

表1:輸入數據
電網公司希望通過實時計算(Blink)對電錶數據處理後,每天得到每個電錶最近兩天(當天和前一天)的差值數據,結果類似如下表:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

表2:期望的輸出數據

二、需求分析

根據業務需求以及CEP跨事件模式匹配的特性,定義兩個CEP事件e1和e2,輸出e2.r1-e1.r1即可得到差值。

三、CEP開發及測試結果

參考複雜事件處理(CEP)語句,CEP代碼如下:

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);

CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

insert into data_out
select
    cons_id,
    data_date,
    subDegreeR1
from input_dh_e_mp_read_curve
MATCH_RECOGNIZE(
    PARTITION BY cons_id
    ORDER BY ts
    MEASURES
        e2.data_date as data_date,
        e2.r1 - e1.r1 as subDegreeR1
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN(e1 e2)
    DEFINE
        e1 as TRUE,
        e2 as TRUE
);

由於使用了print connector,從對應的sink的taskmanager.out日誌中可以查看到輸出如下:

task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

對比期望輸出(表2),20190717和20190718兩個窗口的數據均正確,表明業務邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190719的數據沒有輸出,這是因為我們設置了watermark,測試環境下20190719之後沒有數據進來觸發20190719對應的窗口的結束。

四、其他說明

1、對比使用Blink SQL+UDAF實現差值聚合計算(1),我們可以看出使用CEP開發代碼非常簡潔,所以在跨事件處理的情況下CEP還是非常的合適。從另外一個方面講,同樣的需求有不同的實現方式,所以融會貫通Blink SQL中的各種語法,利用更合適的語法來實現業務需求,將可能大大提升工作效率和業務性能。
2、在實現本案例時,筆者發現使用CEP時有如下需要注意的地方:
(1)partiton by裡的字段(如本案的cons_id),默認會帶到輸出裡,若同時在MEASURES中定義,則可能會報類似如下錯誤:
13_47_33__08_03_2019.jpg
(2)define及其內容必須定義,否則前端頁面提示類似如下錯誤:
圖片.png

圖片.png

Leave a Reply

Your email address will not be published. Required fields are marked *