開發與維運

使用Blink SQL+UDAF實現差值聚合計算

本案例根據某電網公司的真實業務需求,通過Blink SQL+UDAF實現實時流上的差值聚合計算,通過本案例,讓讀者熟悉UDAF編寫,並理解UDAF中的方法調用關係和順序。
感謝@軍長在實現過程中的指導。筆者水平有限,若有紕漏,請批評指出。

一、客戶需求

電網公司每天採集各個用戶的電錶數據(格式如下表),其中data_date為電錶數據上報時間,cons_id為電錶id,r1為電錶度數,其他字段與計算邏輯無關,可忽略。為了後續演示方便,僅輸入cons_id=100000002的數據。

no(string) data_date(string) cons_id(string) org_no(string) r1(double)
101 20190716 100000002 35401 13.76
101 20190717 100000002 35401 14.12
101 20190718 100000002 35401 16.59
101 20190719 100000002 35401 18.89

表1:輸入數據
電網公司希望通過實時計算(Blink)對電錶數據處理後,每天得到每個電錶最近兩天(當天和前一天)的差值數據,結果類似如下表:

cons_id(string) data_date(string) subDegreeR1(double)
100000002 20190717 0.36
100000002 20190718 2.47
100000002 20190719 2.3

表2:期望的輸出數據

二、需求分析

根據客戶的需求,比較容易得到兩種解決方案:1、通過over窗口(2 rows over window)開窗進行差值聚合;2、通過hop窗口(sliding=1天,size=2天)進行差值聚合。
over窗口和hop窗口均是Blink支持的標準窗口,使用起來非常簡單。本需求的最大難點在於差值聚合,Blink支持SUM、MAX、MIN、AVG等內置的聚合函數,但沒有滿足業務需求的差值聚合函數,因此需要通過自定義聚合函數(UDAF)來實現。

三、UDAF開發

實時計算自定義函數開發搭建環境請參考UDX概述(https://help.aliyun.com/document_detail/69463.html?spm=a2c4g.11186623.6.665.76881509l0zB6B),在此不再贅述。本案例使用Blink2.2.7版本,下面簡要描述關鍵代碼的編寫。
完整代碼(為了方便上傳,使用了txt格式):SubtractionUdaf.txt
1、在com.alibaba.blink.sql.udx.SubtractionUdaf包中創建一個繼承AggregateFunction類的SubtractionUdaf類。

public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum> 

其中Double是UDAF輸出的類型,在本案例中為相鄰兩天的電錶差值度數。SubtractionUdaf.Accum是內部自定義的accumulator數據結構。
2、定義accumulator數據結構,用戶保存UDAF的狀態。

    public static class Accum {
        private long currentTime;//最新度數的上報時間
        private double oldDegree;//前一次度數
        private double newDegree;//當前最新度數
        private long num;   //accumulator中已經計算的record數量,主要用於merge
        private List<Tuple2<Double, Long>> listInput;//緩存所有的輸入,主要用於retract
    }

3、實現createAccumulator方法,初始化UDAF的accumulator

    //初始化udaf的accumulator
    public SubtractionUdaf.Accum createAccumulator() {
        SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum();
        acc.currentTime = 0;
        acc.oldDegree = 0.0;
        acc.newDegree = 0.0;
        acc.num = 0;
        acc.listInput = new ArrayList<Tuple2<Double, Long>>();
        return acc;
    }

4、實現getValue方法,用於通過存放狀態的accumulator計算UDAF的結果,本案例需求是計算新舊數據兩者的差值。

    public Double getValue(SubtractionUdaf.Accum accumulator) {
        return accumulator.newDegree - accumulator.oldDegree;
    }

5、實現accumulate方法,用於根據輸入數據更新UDAF存放狀態的accumulator。考慮到數據可能亂序以及可能的retract,數據數據包括了對應的度數iValue,還包括上報度數的時間(構造的事件時間ts)。

    public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) {
        System.out.println("method : accumulate" );
        accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts)));
        Collections.sort(accumulator.listInput,this.comparator);//按照時間排序
        accumulator.num ++;
        if(accumulator.listInput.size() == 1){
            accumulator.newDegree = iValue;
            accumulator.oldDegree = 0.0;
            accumulator.currentTime = ts;
        }else {//處理可能存在的數據亂序問題
            accumulator.newDegree = accumulator.listInput.get(0).f0;
            accumulator.currentTime = accumulator.listInput.get(0).f1;
            accumulator.oldDegree = accumulator.listInput.get(1).f0;
        }
    }

其中accumulator為UDAF的狀態,iValue和ts為實際的輸入數據。
注意需要處理可能存在的輸入數據亂序問題。
6、實現retract方法,用於在某些優化場景下(如使用over窗口)對retract的數據進行處理。

    public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{
        if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){
            if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值
                accumulator.listInput.remove(0);
                accumulator.num--;
                if(accumulator.listInput.isEmpty()){
                    accumulator.currentTime = 0;
                    accumulator.oldDegree = 0.0;
                    accumulator.newDegree = 0.0;
                }else if(accumulator.listInput.size() == 1) {
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = 0.0;
                }else{
                    accumulator.currentTime = accumulator.listInput.get(0).f1;
                    accumulator.newDegree = accumulator.listInput.get(0).f0;
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            } else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值
                accumulator.listInput.remove(1);
                accumulator.num--;
                if(accumulator.listInput.size() == 1){
                    accumulator.oldDegree = 0.0;
                }else {
                    accumulator.oldDegree = accumulator.listInput.get(1).f0;
                }
            }else {//retract的是其他值
                accumulator.listInput.remove(Tuple2.of(iValue, ts));
                accumulator.num--;
            }
        }else {
            throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts);
        }
    }

需要考慮retract的是最新的數據還是次新的數據,需要不同的邏輯處理。
7、實現merge方法,用於某些優化場景(如使用hop窗口)。

    public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) {
        int i = 0;
        System.out.println("method : merge" );
        System.out.println("accumulator : "+ accumulator.newDegree);
        System.out.println("accumulator : "+ accumulator.currentTime);

        for (SubtractionUdaf.Accum entry : its) {
            if(accumulator.currentTime < entry.currentTime){
                if(entry.num > 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(entry.num == 1){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = accumulator.newDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num ++;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }else{
                if(accumulator.num > 1){
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 1){
                    accumulator.oldDegree = entry.newDegree;
                    accumulator.num += entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }else if(accumulator.num == 0){
                    accumulator.currentTime = entry.currentTime;
                    accumulator.oldDegree = entry.oldDegree;
                    accumulator.newDegree = entry.newDegree;
                    accumulator.num = entry.num;
                    accumulator.listInput.addAll(entry.listInput);
                }
            }
            Collections.sort(accumulator.listInput,this.comparator);
            System.out.println("merge : "+i);
            System.out.println("newDegree : "+entry.newDegree);
            System.out.println("oldDegree = "+entry.oldDegree);
            System.out.println("currentTime : "+entry.currentTime);
        }
    }

需要考慮merge的是否是比當前新的數據,需要不同的處理邏輯。
8、其他方面,考慮到需要對輸入度數按照事件時間排序,在open方法中實例化了自定義的Comparator類,對accumulator數據結構中的inputList按事件時間的降序排序。

    public void open(FunctionContext context) throws Exception {
        //定義record的先後順序,用於listInput的排序,時間越新的record在list中越前面
        this.comparator = new Comparator<Tuple2<Double, Long>>() {
            public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) {
                if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) {
                    return 1;
                } else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) {
                    return -1;
                }else {
                    return 0;
                }
            }
        };
    }

請參考[使用IntelliJ IDEA開發自定義函數]()完成UDAF編譯、打包,並參考UDX概述完成資源的上傳和引用。

四、SQL開發及測試結果

(一)over窗口

SQL代碼如下,語法檢查、上線、啟動作業(選擇當前啟動位點)。並將表1數據上傳至datahub。

CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);

INSERT into data_out    
SELECT
    cons_id
    ,last_value(data_date) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
    ,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER (
        PARTITION BY cons_id 
        ORDER BY ts 
        ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date
FROM input_dh_e_mp_read_curve

由於使用了print connector,從對應的sink的taskmanager.out日誌中可以查看到輸出如下(已忽略其他debug日誌):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

對比期望輸出(表2),20190717和20190718兩個窗口的數據均正確,表明業務邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190716輸出為13.76,這是因為第一個over窗口只有一條數據導致的,這種數據可以在業務層過濾掉;
(2)20190719的數據沒有輸出,這是因為我們設置了watermark,測試環境下20190719之後沒有數據進來觸發20190719對應的窗口的結束。

(二)hop窗口

SQL代碼如下:語法檢查、上線、啟動作業(選擇當前啟動位點)。並將表1數據上傳至datahub。

CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf';

CREATE TABLE input_dh_e_mp_read_curve (
    `no`                  VARCHAR,
    data_date             VARCHAR,
    cons_id               VARCHAR,
    org_no                VARCHAR,
    r1                    DOUBLE,
    ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
    ,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
    roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
    project = 'jszc_datahub',
    topic = 'input_dh_e_mp_read_curve'
);
CREATE TABLE data_out(
    cons_id varchar
    ,data_date varchar
    ,subDegreeR1 DOUBLE
)with(
    type = 'print'
);
INSERT into data_out    
SELECT
    cons_id
    ,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd')
    ,HopWindowSubtractionUdaf(r1,unix_timestamp(ts))
FROM input_dh_e_mp_read_curve
group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;

由於使用了print connector,從對應的sink的taskmanager.out日誌中可以查看到輸出如下(已忽略其他debug日誌):

task-1> (+)100000002,20190716,13.76
task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

對比期望輸出(表2),20190717和20190718兩個窗口的數據均正確,表明業務邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190716輸出為13.76,這是因為第一個hop窗口只有一條數據導致的,這種數據可以在業務層過濾掉;
(2)20190719的數據沒有輸出,這是因為我們設置了watermark,測試環境下20190719之後沒有數據進來觸發20190719對應的窗口的結束。

五、幾點思考

1、關於UDAF內部方法的調用關係和順序

UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其調用關係和順序並不是完全確定,而是與Blink底層優化、Blink版本、開窗類型(如hop還是over窗口)等相關。
比較確定的是一次正常(沒有failover)的作業,createAccumulator方法只在作業啟動時調用一次,accumulate方法在每條數據輸入時調用一次,在觸發數據輸出時會調用一次getValue(並不代表只調用一次)。
而retract方法和merge方法則跟具體的優化方式或開窗類型有關,本案例中over窗口調用retract方法而不調用merge方法,hop窗口調用merge方法而不調用retract方法。
大家可以增加日誌,觀察這幾個方法的調用順序,還是蠻有意思的。

2、如何知道需要實現UDAF中的哪些方法

UDAF中必須實現createAccumulator、getValue、accumulate方法,可選擇實現retract和merge方法。
一般情況下,可先實現createAccumulator、getValue、accumulate三個方法,然後編寫SQL後進行語法檢查,SQL編譯器會提示是否需要retract或merge方法。
比如,如果沒有實現retract方法,在使用over窗口時,語法檢查會報類似如下錯誤:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.

比如,如果沒有實現merge方法,在使用over窗口時,語法檢查會報類似如下錯誤:

org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.

3、本案例存在優化空間的地方

(1)本案例沒有考慮數據缺失的問題,比如因為某種原因(網絡問題、數據採集問題等)缺少20190717的數據。這種情況下會是什麼樣的結果?大家可以自行測試下;
(2)本案例使用了一個List,然後通過Collections.sort方法進行排序,這不是很優的方法,如果用優先級隊列(priority queue)性能應該會更好;

Leave a Reply

Your email address will not be published. Required fields are marked *