開發與維運

生產實踐 | 基於 Flink 的短視頻生產消費監控

本文詳細介紹了實時監控類指標的數據流轉鏈路以及技術方案,大多數的實時監控類指標都可按照本文中的幾種方案實現。

短視頻生產消費監控

短視頻帶來了全新的傳播場域和節目形態,小屏幕、快節奏成為行業潮流的同時,也催生了新的用戶消費習慣,為創作者和商戶帶來收益。而多元化的短視頻也可以為品牌方提供營銷機遇。

其中對於垂類生態短視頻的生產消費熱點的監控分析目前成為了實時數據處理很常見的一個應用場景,比如對某個圈定的垂類生態下的視頻生產或者視頻消費進行監控,對熱點視頻生成對應的優化推薦策略,促進熱點視頻的生產或者消費,構建整個生產消費數據鏈路的閉環,從而提高創作者收益以及消費者留存。

本文將完整分析垂類生態短視頻生產消費數據的整條鏈路流轉方式,並基於 Flink 提供幾種對於垂類視頻生產消費監控的方案設計。通過本文,你可以瞭解到:

垂類生態短視頻生產消費數據鏈路閉環

實時監控短視頻生產消費的方案設計

不同監控量級場景下的代碼實現

flink 學習資料

項目簡介

垂類生態短視頻生產消費數據鏈路流轉架構圖如下,此數據流轉圖也適用於其他場景:

image.png

鏈路

在上述場景中,用戶生產和消費短視頻,從而客戶端、服務端以及數據庫會產生相應的行為操作日誌,這些日誌會通過日誌抽取中間件抽取到消息隊列中,我們目前的場景中是使用 Kafka 作為消息隊列;然後使用 flink 對垂類生態中的視頻進行生產或消費監控(內容生產通常是圈定垂類作者 id 池,內容消費通常是圈定垂類視頻 id 池),最後將實時聚合數據產出到下游;下游可以以數據服務,實時看板的方式展現,運營同學或者自動化工具最終會幫助我們分析當前垂類下的生產或者消費熱點,從而生成推薦策略。

方案設計

image.png

架構

其中數據源如下:

Kafka 為全量內容生產和內容消費的日誌。

Rpc/Http/Mysql/配置中心/Redis/HBase 為需要監控的垂類生態內容 id 池(內容生產則為作者 id 池,內容消費則為視頻 id 池),其主要是提供給運營同學動態配置需要監控的 id 範圍,其可以在 flink 中進行實時查詢,解析運營同學想要的監控指標範圍,以及監控的指標和計算方式,然後加工數據產出,可以支持隨時配置,實時數據隨時計算產出。

其中數據匯為聚類好的內容生產或者消費熱點話題或者事件指標:

Redis/HBase 主要是以低延遲(Redis 5ms p99,HBase 100ms p99,不同公司的服務能力不同)並且高 QPS 提供數據服務,給 Server 端或者線上用戶提供低延遲的數據查詢。

Druid/Mysql 可以做為 OLAP 引擎為 BI 分析提供靈活的上卷下鑽聚合分析能力,供運營同學配置可視化圖表使用。

Kafka 可以以流式數據產出,從而提供給下游繼續消費或者進行特徵提取。

廢話不多說,我們直接上方案和代碼,下述幾種方案按照監控 id 範圍量級區分,不同的量級對應著不同的方案,其中的代碼示例為 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要監控邏輯都相同。

方案 1

適合監控 id 數據量小的場景(幾千 id),其實現方式是在 flink 任務初始化時將需要監控的 id 池或動態配置中心的 id 池加載到內存當中,之後只需要在內存中判斷內容生產或者消費數據是否在這個監控池當中。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {
    
    // 配置中心動態 id 池
    private Config<Set<Long>> needMonitoredIdsConfig;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.needMonitoredIdsConfig = ConfigBuilder
                .buildSet("needMonitoredIds", Long.class);
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        Set<Long> needMonitoredIds = needMonitoredIdsConfig.get();
        /**
         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }
}

監控的 id 池可以按照固定或者可配置從而分出兩種獲取方式:第一種是在 flink 任務開始時就全部加載進內存中,這種方式適合監控 id 池不變的情況;第二種是使用動態配置中心,每次都從配置中心訪問到最新的監控 id 池,其可以滿足動態配置或者更改 id 池的需求,並且這種實現方式通常可以實時感知到配置更改,幾乎無延遲。

方案 2

適合監控 id 數據量適中(幾十萬 id),監控數據範圍會不定時發生變動的場景。其實現方式是在 flink 算子中定時訪問接口獲取最新的監控 id 池,以獲取最新監控數據範圍。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

    private long lastRefreshTimestamp;

    private Set<Long> needMonitoredIds;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.refreshNeedMonitoredIds(System.currentTimeMillis());
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        long windowStart = context.window().getStart();
        this.refreshNeedMonitoredIds(windowStart);
        /**
         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }

    public void refreshNeedMonitoredIds(long windowStart) {
        // 每隔 10 秒訪問一次
        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
            this.lastRefreshTimestamp = windowStart;
            this.needMonitoredIds = Rpc.get(...)
        }
    }
}

根據上述代碼實現方式,按照時間間隔的方式刷新 id 池,其缺點在於不能實時感知監控 id 池的變化,所以刷新時間可能會和需求場景強耦合(如果 id 池會頻繁更新,那麼就需要縮小刷新時間間隔)。也可根據需求場景在每個窗口開始前刷新 id 池,這樣可保證每個窗口中的 id 池中的數據一直保持更新。

方案 3

方案 3 對方案 2 的一個優化(幾十萬 id,我們生產環境中最常用的)。其實現方式是在 flink 中使用 broadcast 算子定時訪問監控 id 池,並將 id 池以廣播的形式下發給下游參與計算的各個算子。其優化點在於:比如任務的並行度為 500,每 1s 訪問一次,採用方案 2 則訪問監控 id 池接口的 QPS 為 500,在使用 broadcast 算子之後,其訪問 QPS 可以減少到 1,可以大大減少對接口的訪問量,減輕接口壓力。

public class Example {

    @Slf4j
    static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Map<Long, Set<Long>>> sourceContext) throws Exception {
            while (!this.isCancel) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Set<Long> needMonitorIds = Rpc.get(...);
                    // 可以和上一次訪問的數據做比較查看是否有變化,如果有變化,才發送出去
                    if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                        sourceContext.collect(new HashMap<Long, Set<Long>>() {{
                            put(0L, needMonitorIds);
                        }});
                    }
                } catch (Throwable e) {
                    // 防止接口訪問失敗導致的錯誤導致 flink job 掛掉
                    log.error("need monitor ids error", e);
                }
            }
        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/
        BroadcastStream<Map<Long, Set<Long>>> broadcastStream = env
                .addSource(new NeedMonitorIdsSource()) // redis photoId 數據廣播
                .setParallelism(1)
                .broadcast(broadcastMapStateDescriptor);

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                .connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, CommonModel, Map<Long, Set<Long>>, CommonModel>() {

                    private Set<Long> needMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        this.needMonitoredIds = Rpc.get(...)
                    }

                    @Override
                    public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector<CommonModel> collector) throws Exception {
                        // 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                    }

                    @Override
                    public void processBroadcastElement(Map<Long, Set<Long>> longSetMap, Context context, Collector<CommonModel> collector) throws Exception {
                        // 需要監控的字段
                        Set<Long> needMonitorIds = longSetMap.get(0L);
                        if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                            this.needMonitoredIds = needMonitorIds;
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);
        
        env.execute(inputParams.jobName);
    }

}

方案 4

適合於超大監控範圍的數據(幾百萬,我們自己的生產實踐中使用擴量到 500 萬)。其原理是將監控範圍接口按照 id 按照一定規則分桶。flink 消費到日誌數據後將 id 按照 監控範圍接口 id 相同的分桶方法進行分桶 keyBy,這樣在下游算子中每個算子中就可以按照桶變量值,從接口中拿到對應桶的監控 id 數據,這樣 flink 中並行的每個算子只需要獲取到自己對應的桶的數據,可以大大減少請求的壓力。

public class Example {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                .process(new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

                    private long lastRefreshTimestamp;

                    private Set<Long> oneBucketNeedMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
                        long windowStart = context.window().getStart();
                        this.refreshNeedMonitoredIds(windowStart, bucket);
                        /**
                         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                         */
                    }

                    public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                        // 每隔 10 秒訪問一次
                        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
                            this.lastRefreshTimestamp = windowStart;
                            this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);

        env.execute(inputParams.jobName);
    }
}

總結

本文首先介紹了,在短視頻領域中,短視頻生產消費數據鏈路的整個閉環,並且其數據鏈路閉環一般情況下也適用於其他場景;以及對應的實時監控方案的設計和不同場景下的代碼實現,包括:

垂類生態短視頻生產消費數據鏈路閉環:用戶操作行為日誌的流轉,日誌上傳,實時計算,以及流轉到 BI,數據服務,最後數據賦能的整個流程

實時監控方案設計:監控類實時計算流程中各類數據源,數據匯的選型

監控 id 池在不同量級場景下具體代碼實現

學習資料

https://github.com/flink-china/flink-training-course/blob/master/README.md
https://ververica.cn/developers-resources/
https://space.bilibili.com/33807709

作者:YangYichao
來源:Flink 微信公眾號
原文鏈接:https://mp.weixin.qq.com/s/t_hbmx_xHly9y0nBcZmtnQ

Leave a Reply

Your email address will not be published. Required fields are marked *