開發與維運

Flink+Hologres億級用戶實時UV精確去重最佳實踐

UV、PV計算,因為業務需求不同,通常會分為兩種場景:

  • 離線計算場景:以T+1為主,計算曆史數據
  • 實時計算場景:實時計算日常新增的數據,對用戶標籤去重

針對離線計算場景,Hologres基於RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支持超高基數UV計算(基於RoaringBitmap實現)

對於實時計算場景,可以使用Flink+Hologres方式,並基於RoaringBitmap,實時對用戶標籤去重。這樣的方式,可以較細粒度的實時得到用戶UV、PV數據,同時便於根據需求調整最小統計窗口(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較於以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,並且通過簡單的聚合,也可以得到較大時間單位的統計結果。

主體思想

  1. Flink將流式數據轉化為表與維表進行JOIN操作,再轉化為流式數據。此舉可以利用Hologres維表的insertIfNotExists特性結合自增字段實現高效的uid映射。
  2. Flink把關聯的結果數據按照時間窗口進行處理,根據查詢維度使用RoaringBitmap進行聚合,並將查詢維度以及聚合的uid存放在聚合結果表,其中聚合出的uid結果放入Hologres的RoaringBitmap類型的字段中。
  3. 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,並對其中關鍵的RoaringBitmap字段做or運算後並統計基數,即可得出對應用戶數。
  4. 處理流程如下圖所示

0.jpeg

方案最佳實踐

1.創建相關基礎表

1)創建表uid_mapping為uid映射表,用於映射uid到32位int類型。

  • RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續)。常見的業務系統或者埋點中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。映射表利用Hologres的SERIAL類型(自增的32位int)來實現用戶映射的自動管理和穩定映射。
  • 由於是實時數據, 設置該表為行存表,以提高Flink維表實時JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--將uid設為clustering_key和distribution_key便於快速查找其對應的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;

2)創建表dws_app為基礎聚合表,用於存放在基礎維度上聚合後的結果。

  • 使用RoaringBitmap前需要創建RoaringBitmap extention,同時也需要Hologres實例為0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
  • 為了更好性能,建議根據基礎聚合表數據量合理的設置Shard數,但建議基礎聚合表的Shard數設置不超過計算資源的Core數。推薦使用以下方式通過Table Group來設置Shard數
--新建shard數為16的Table Group,
--因為測試數據量百萬級,其中後端計算資源為100core,設置shard數為16
BEGIN;
CREATE TABLE tg16 (a int);                             --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16'); 
COMMIT;
  • 相比離線結果表,此結果表增加了時間戳字段,用於實現以Flink窗口週期為單位的統計。結果表DDL如下:
BEGIN;
create table dws_app(
  country text,
  prov text,
  city text, 
  ymd text NOT NULL,  --日期字段
  timetz TIMESTAMPTZ,  --統計時間戳,可以實現以Flink窗口週期為單位的統計
  uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
  primary key(country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重複插入數據
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段設為clustering_key和event_time_column,便於過濾
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等價於將表放在shard數為16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段設為distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;

2.Flink實時讀取數據並更新dws_app基礎聚合表

完整示例源碼請見alibabacloud-hologres-connectors examples

1)Flink 流式讀取數據源(DataStream),並轉化為源表(Table)

//此處使用csv文件作為數據源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html
Table odsTable =
    tableEnv.fromDataStream(
    odsStream,
    $("uid"),
    $("country"),
    $("prov"),
    $("city"),
    $("ymd"),
    $("proctime").proctime());
// 註冊到catalog環境
tableEnv.createTemporaryView("odsTable", odsTable);

2)將源表與Hologres維表(uid_mapping)進行關聯

其中維表使用insertIfNotExists參數,即查詢不到數據時自行插入,uid_int32字段便可以利用Hologres的serial類型自增創建。

// 創建Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入
String createUidMappingTable =
    String.format(
    "create table uid_mapping_dim("
    + "  uid string,"
    + "  uid_int32 INT"
    + ") with ("
    + "  'connector'='hologres',"
    + "  'dbname' = '%s'," //Hologres DB名
    + "  'tablename' = '%s',"//Hologres 表名
    + "  'username' = '%s'," //當前賬號access id
    + "  'password' = '%s'," //當前賬號access key
    + "  'endpoint' = '%s'," //Hologres endpoint
    + "  'insertifnotexists'='true'"
    + ")",
    database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表與維表join
String odsJoinDim =
    "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
    + "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
    + "  ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3)將關聯結果轉化為DataStream,通過Flink時間窗口處理,結合RoaringBitmap進行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
    source
    // 篩選需要統計的維度(country, prov, city, ymd)
    .keyBy(0, 1, 2, 3)
    // 滾動時間窗口;此處由於使用讀取csv模擬輸入流,採用ProcessingTime,實際使用中可使用EventTime
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    // 觸發器,可以在窗口未結束時獲取聚合結果
    .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
    .aggregate(
    // 聚合函數,根據key By篩選的維度,進行聚合
    new AggregateFunction<
        Tuple5<String, String, String, String, Integer>,
        RoaringBitmap,
        RoaringBitmap>() {
            @Override
            public RoaringBitmap createAccumulator() {
                return new RoaringBitmap();
            }
            @Override
            public RoaringBitmap add(
                Tuple5<String, String, String, String, Integer> in,
                RoaringBitmap acc) {
                // 將32位的uid添加到RoaringBitmap進行去重
                acc.add(in.f4);
                return acc;
            }
            @Override
            public RoaringBitmap getResult(RoaringBitmap acc) {
                return acc;
            }
            @Override
            public RoaringBitmap merge(
                RoaringBitmap acc1, RoaringBitmap acc2) {
                return RoaringBitmap.or(acc1, acc2);
            }
     },
    //窗口函數,輸出聚合結果
    new WindowFunction<
        RoaringBitmap,
        Tuple6<String, String, String, String, Timestamp, byte[]>,
        Tuple,
        TimeWindow>() {
            @Override
            public void apply(
                Tuple keys,
                TimeWindow timeWindow,
                Iterable<RoaringBitmap> iterable,
                Collector<
                Tuple6<String, String, String, String, Timestamp, byte[]>> out)
                throws Exception {
                RoaringBitmap result = iterable.iterator().next();
                // 優化RoaringBitmap
                result.runOptimize();
                // 將RoaringBitmap轉化為字節數組以存入Holo中
                byte[] byteArray = new byte[result.serializedSizeInBytes()];
                result.serialize(ByteBuffer.wrap(byteArray));
                // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為週期進行統計,以秒為單位
                out.collect(
                    new Tuple6<>(
                        keys.getField(0),
                        keys.getField(1),
                        keys.getField(2),
                        keys.getField(3),
                        new Timestamp(
                            timeWindow.getEnd() / 1000 * 1000),
                        byteArray));
        }
    });

4)寫入結果表

需要注意的是,Hologres中RoaringBitmap類型在Flink中對應Byte數組類型

// 計算結果轉換為表
Table resTable =
    tableEnv.fromDataStream(
        processedSource,
        $("country"),
        $("prov"),
        $("city"),
        $("ymd"),
        $("timest"),
        $("uid32_bitmap"));
// 創建Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入
String createHologresTable =
    String.format(
        "create table sink("
        + "  country string,"
        + "  prov string,"
        + "  city string,"
        + "  ymd string,"
        + "  timetz timestamp,"
        + "  uid32_bitmap BYTES"
        + ") with ("
        + "  'connector'='hologres',"
        + "  'dbname' = '%s',"
        + "  'tablename' = '%s',"
        + "  'username' = '%s',"
        + "  'password' = '%s',"
        + "  'endpoint' = '%s',"
        + "  'connectionSize' = '%s',"
        + "  'mutatetype' = 'insertOrReplace'"
        + ")",
    database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 寫入計算結果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);

3.數據查詢

查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的用戶數

  • 查詢某天內各個城市的uv
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好
set hg_experimental_enable_force_three_stage_agg=off  

SELECT  country
        ,prov
        ,city
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   ymd = '20210329'
GROUP BY country
         ,prov
         ,city
;

  • 查詢某段時間內各個省份的uv
--運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好
set hg_experimental_enable_force_three_stage_agg=off 

SELECT  country
        ,prov
        ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
         ,prov
;

Leave a Reply

Your email address will not be published. Required fields are marked *