大數據

【最佳實踐】實時計算Flink在在線教育行業的實時數倉建設實踐

行業背景

  • 行業現狀: 

    • 在線教育是運用互聯網、人工智能等現代信息技術進行教與學互動的新型教育方式,是教育服務的重要組成部分。發展在線教育,有利於構建網絡化、數字化、個性化、終身化的教育體系,有利於建設“人人皆學、處處能學、時時可學”的學習型社會。
  • 大數據在其行業中的作用:

    • 對未來客戶的畫像更加精準,營銷推廣時可以對接更好的服務並提升成交轉化率(提升ROI不一定,這涉及到外部競爭);
    • 更全面的評估老師、學生、機構、行業等在線教育行業的各個參與者;
    • 大數據幫助在線教育行業更快發展

業務場景

某公司開發了個在線教育類APP,培訓機構可以在APP中會發布一些直播課程,離線課程,習題,學習文章等內容。用戶可在線學習新知識,離線鞏固已學知識,並對學過的內容進行課後練習/測試。
業務的構建涉及到幾部分:

  1. APP:應用程序,用戶訪問入口
  2. 後臺系統:

    1. 教學老師:通過分析學生課堂參與情況,提供不同的授課方案,因材施教。
    2. 運維人員:通過運維監控指標,實時監控在線教育直播網絡質量。
    3. 運營人員:根據學生註冊、學習質量、平臺成單量等統計信息針對性開展平臺運營工作:

      1. 學生辦理註冊、增刪課程等操作;
      2. 學生學習質量審核;
      3. 平臺指標查看,如平臺日成單量統計。

技術架構

image.png
架構解析:
數據採集:該場景中,數倉的數據來源有兩部分:app的埋點至消息隊列 Kafka 以及 hbase 等業務數據庫的增量日誌。值得注意的一點是,實時數倉往往和離線數倉配合使用,共享一套管控系統,如權限/元數據管理/調度等系統。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的構建,全部通過 Flink + Kafka 完成,原始日誌app_log_origin是從客戶端直接收集上來的。然後數據處理,加維等操作後,最終輸入到業務系統。

業務指標

  • 實時數據中間層

    • 學生操作日誌 ETL 清洗(分析學生操作在線信令日誌)

      • 獲取學生移動圖片操作
      • 獲取學生 hover 圖片操作
      • 獲取學生畫線操作
      • 音頻播放
      • 音頻暫停
      • 圖文匹配錯誤
      • 圖文匹配正確
    • 學生註冊考試等級日誌 ETL 清洗
  • 學生行為分析

    • 學生在線(直播)課程課堂表現統計
    • 學生離線(錄播)課程學習時長統計
  • 運維/網絡監控

    • 直播課程(音頻)網絡監控
    • 直播課程(視頻)網絡監控
  • 運營分析

    • 每小時不同 level 的學生註冊人數統計
    • 每日課程顧問追蹤統計

說明:該案例中僅包含以上場景及指標,在實際的應用場景下還包括日uv/pv,topN熱門授課教師,教師授課質量、數量審核等其他指標。

業務代碼

場景一:對原始日誌進行實時數據清洗

學生操作日誌 ETL 清洗(分析學生操作在線信令日誌)

學生在直播課程中,會做一些隨堂練習/測試,通過頁面點擊等操作形成原始埋點日誌,為了很快的感知學生的學習表現(課堂表現),業務方針對不同的操作進行計分處理。為了下游有效的對數據進行處理,針對學生不同的操作,將原始數據(多層 JSON 數據)進行清洗(單層 JSON 數據),寫入 kafka 中。

  • 埋點數據樣例
--輸入
{
    "createTime":"",
    "data":{
        "userid":"",
        "roomid":"",
        "timestamp":"",
        "role":"",
        "msgid":"",
        "msg":{
            "msgtype":"",
            "msg_data":{
                "target_id":"",
                "target_type":"",
                "action":"",
                "sub_action":"",
                "page_index":""
            }
        }
    }
}
--輸出
{
    "messageCreateTime":"",
    "timeStamp":"",
    "messageTimeStamp":"",
    "userId":"",
    "roomId":"",
    "role":"",
    "msgId":"",
    "msgType":"",
    "targetId":"",
    "targetType":"",
    "action":"",
    "subAction":"",
    "pageIndex":"",
    "event":""
}

輸入表

create table timeline_analysis_student_stream (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    -- 事件時間
    `createTime` as cast(JSON_VALUE(`message`, '$.createTime')as VARCHAR),
    -- 用戶 ID
    `userid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.userid') as BIGINT),
    -- 教室 ID
      `roomid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.roomid') as BIGINT),
    -- 操作時間
      `time_stamp` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.timestamp') as BIGINT),
      -- 角色 
    `role` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.role') as TINYINT),
    -- 消息 ID
    `msgid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msgid') as BIGINT),
    -- 消息類型
    `msg_msgType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msgtype') as VARCHAR),
    -- 消息目標 ID
    `msg_msgData_targetId` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_id') as VARCHAR),
    -- 消息目標類型
      `msg_msgData_targetType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_type') as VARCHAR),
    -- 學生操作
      `msg_msgData_action` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.action') as VARCHAR),
    -- 學生次操作
      `msg_msgData_subAction` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.sub_action') as VARCHAR),
    -- PPT 頁碼
      `msg_msgData_pageIndex` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.page_index') as BIGINT)
) with (
    type = 'kafka011',
    topic = 'timeline_client_topic',
    `group.id` = 'timeline_analysis_student_consumer',
       ...
);

輸出表

create table signal_student_classroom_internation (
   messageKey VARBINARY,
   `message` VARBINARY,
   PRIMARY KEY (messageKey)
) with (
    type = 'kafka011',
    topic = 'timeline_analysis_student',
    ...
);

業務代碼

  • 獲取學生移動圖片操作

    • 當學生學習詞性(形容詞/副詞),課堂小練習讓學生將屏幕中出現的單詞圖片進行分類,學生需要移動圖片進入不同的分類桶中。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MOVE_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'move';
  • 獲取學生 hover 圖片操作

    • 當學生學習單詞時,需要學習單詞讀音,當學生鼠標懸停到圖片時進行發音教學。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_HOVER_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'mouse' AND
    msg_msgData_subAction = 'over';
  • 獲取學生畫線操作

    • 學生通過畫線來進行隨堂圖文匹配練習。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_LINE_DRAW"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'add';
  • 獲取學生音頻播放操作

    • 學生播放課件中的音頻。

INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PLAY"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'audio' AND
    msg_msgData_subAction = 'start';
  • 獲取學生音頻暫停操作

    • 學生暫停課件中的音頻。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PAUSE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'audio' AND
    msg_msgData_subAction = 'pause';
  • 獲取學生圖文匹配錯誤操作

    • 連線操作後,返回給學生連線結果。會影響課堂表現分數。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_WRONG"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetId = 'match' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'match' AND
    msg_msgData_subAction = 'drop:wrong';
  • 獲取學生圖文匹配正確操作

    • 連線操作後,返回給學生連線結果。會影響課堂表現分數。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_CORRECT"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetId = 'match' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'match' AND
    msg_msgData_subAction = 'drop:correct';

學生註冊考試等級日誌 ETL 清洗

學生在 WEB/APP 頁面註冊時需要考試測評等級,以便後期學習對應 Level 的課程,通過 Flink 做數據清洗,將埋點到 kafka 上日誌,輸出到 Hbase。

  • 埋點數據樣例
{
    "id":"",
    "chinese_name":"",
    "english_name":"",
    "level":"",
    "pid":"",
    "create_time":"",
    "update_time":"",
    "dept_id":""
}

輸入表

create table blink_stg_activity__channel_name_dictionary_da (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- ID
    id as JSON_VALUE(`message`,'$.id'),
      -- 中文名稱
    chinese_name as JSON_VALUE(`message`,'$.chinese_name'),
      -- 英文名稱
    english_name as JSON_VALUE(`message`,'$.english_name'),
      -- 測試登記
    level as JSON_VALUE(`message`,'$.level'),
      -- 唯一標識 ID
    pid as JSON_VALUE(`message`,'$.pid'),
      -- 創建時間
    create_time as JSON_VALUE(`message`,'$.create_time'),
      -- 更新時間
    update_time as JSON_VALUE(`message`,'$.update_time'),
      -- 部門 ID
    dept_id as JSON_VALUE(`message`,'$.dept_id')
) with (
    type = 'kafka010',
    topic = 'blink_stg_activity__channel_name_dictionary_da',
    `group.id` = 'blink_stg_activity__channel_name_dictionary_da',
    ...
);

輸出表

create table blink_stg_activity__channel_name_dictionary_da_sinkhbase (
    rowkey varchar,
    id varchar,
    chinese_name varchar,
    english_name varchar,
    level varchar,
    pid varchar,
    create_time varchar,
    update_time varchar,
    dept_id varchar,
    primary key (rowkey)
) with (
    type = 'cloudhbase',
    tableName = 'channel_name_dictionary',
    ...
);

業務代碼

insert into
    blink_stg_activity__channel_name_dictionary_da_sinkhbase
SELECT
    MD5(id) as rowkey,
    id ,
    chinese_name ,
    english_name ,
    level ,
    pid ,
    create_time ,
    update_time ,
    dept_id 
from
    blink_stg_activity__channel_name_dictionary_da;

場景二:學生行為分析

學生在線(直播)課程課堂表現統計

場景一中針對學生操作日誌進行了清洗,該場景消費其清洗之後的數據,針對不同的用戶 ID、Web 服務端 ID、角色、操作事件進行分組,開 1min 窗口,通過 count(event)聚合進行計分,求得每分鐘學生在線(直播)課程的課堂表現。

  • 該指標上游數據是在學生操作日誌 ETL 清洗的基礎上進行統計
{
    "userId":"",
    "roomId":"",
    "role":"",
    "event":"",
    "timeStamp":""
}

輸入表

create table timeline_analysis_student_mashup_stream (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- 用戶 ID
    `userId` as cast(JSON_VALUE (`message`, '$.userId') as BIGINT),
       -- Web 服務器 ID
    `webserverId` as cast(JSON_VALUE (`message`, '$.roomId') as BIGINT),
      -- 角色
    `role` as cast(JSON_VALUE (`message`, '$.role') as TINYINT),
    -- 操作事件
    `event` as cast(JSON_VALUE (`message`, '$.event') as VARCHAR),
      -- 事件時間
    time_stamp as TO_TIMESTAMP(cast(JSON_VALUE (`message`, '$.timeStamp') as BIGINT)),
    WATERMARK wk FOR time_stamp AS WITHOFFSET (time_stamp, 0)--為rowtime定義watermark
) with (
    type = 'kafka011',
    topic = 'timeline_analysis_student',
    `group.id` = 'timeline-analysis-student-mashup-consumer',
    ...
);

輸出表

create table timeline_signal_analysis_mysql (
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    webserver_id BIGINT,
    user_id BIGINT,
    role TINYINT,
    event VARCHAR,
    event_count BIGINT,
    create_time TIMESTAMP
) with (
    type='RDS',
    tableName='timeline_signal_analysis',
    ...
);

業務代碼

  • 學生課堂表現解析

    • 學生在課堂中舉手回答問題等行為進行積分,以此衡量學生課堂表現。
insert into timeline_signal_analysis_mysql
select 
    TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) as start_time,
    TUMBLE_END(time_stamp,INTERVAL '1' MINUTE) as end_time,
    webserverId as webserver_id,
    userId as user_id,
    role as role,
    event as event,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM timeline_analysis_student_mashup_stream
GROUP BY TUMBLE (time_stamp,INTERVAL '1' MINUTE),
    userId,
    webserverId,
    role,
    event;

學生離線(錄播)課程學習時長統計

通過 subEvent = 'PPT_SUCCESS' 將完成課程的事件整理出來,通過自關聯的方式,和源表進行 JOIN 打寬,計算 'PPT_SUCCESS' 的時間點與最初播放 PPT 的時間差值。

  • 埋點數據樣例
{
    "classroom_id":"",
    "user_type":"",
    "user_id":"",
    "event_time":"",
    "sub_event":"",
    "extra":{
        "data_time":"",
        "msg":{
            "pptIndex":""
        }
    }
}

輸入表

create table qos_log_kafka (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      --(錄播)教室 ID
    `classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
      -- 用戶類型
    `userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
      -- 用戶 ID
    `userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
      -- 事件時間
    `eventTime` as cast(JSON_VALUE(`message`, '$.event_time')as BIGINT),
      -- 次操作
      `subEvent` as cast(JSON_VALUE(`message`, '$.sub_event')as VARCHAR),
      -- 數據時間
      `extraDataTime` as cast(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.data_time')as VARCHAR)as BIGINT),
    -- PPT 頁碼
    `extraMsgIndex` as cast(JSON_VALUE(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.msg')as VARCHAR), '$.pptIndex')as BIGINT)
) with (
    type = 'kafka011',
    topic = 'qos_log',
      ...
);

輸出表

create table user_enter_classroom_take_time_mysql (
    user_id BIGINT,
    classroom_id VARCHAR,
    user_type VARCHAR,
    spend_time BIGINT,
    event_time TIMESTAMP,
    create_time TIMESTAMP
) with (
    type='rds',
    tableName='user_enter_classroom_take_time',
    ...
);

業務代碼

  • 學生進入教室時長

    • 離線錄播課程,通過 PPT 的播放時間來計算學生進入教室的時長。
CREATE VIEW qos_log_kafka_view AS
SELECT 
    `userId`,
    `classroomId`,
    `userType`,
    `eventTime`,
     subEvent,
    `extraDataTime`
FROM qos_log_kafka
WHERE subEvent = 'PPT_SUCCESS';

insert into user_enter_classroom_take_time_mysql
SELECT
  a.userId,
  a.classroomId,
  a.userType,
  b.extraDataTime-a.extraDataTime,--毫秒值
  TO_TIMESTAMP(a.eventTime),
  CURRENT_TIMESTAMP
FROM qos_log_kafka a
JOIN qos_log_kafka_view b ON a.userId=b.userId AND a.classroomId=b.classroomId 
WHERE a.extraDataTime<b.extraDataTime;

場景三:運維/網絡監控

通過學生直播課程中,視頻/音頻運維埋點信息計算,以userId, agoraChannelId,classroomId, userType, event,agoraAudioStateUid/agoraVideoStateUid進行分組,開 30s 的滾動窗口,求最近 30s 直播課的視頻/音頻質量(丟包/異常平均值、總次數),供下游運維同學監控,實時調整音頻/視頻質量,給用戶最佳的學習體驗。

  • 埋點數據樣例
{
    "classroom_id":"",
    "user_type":"",
    "user_id":"",
    "agora_channel_id":"",
    "event":"",
    "agora_videoState":{
        "fr":"",
        "uid":""
    },
    "agora_audioState":{
        "lost":"",
        "uid":""
    },
    "messageCreateTime":""
}

輸入表

create table qos_agora_record_kafka (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- 直播教室 ID
    `classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
      -- 用戶類型
    `userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
      -- 用戶 ID
    `userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
      -- 渠道 ID
    `agoraChannelId` as cast(JSON_VALUE(`message`, '$.agora_channel_id')as BIGINT),
      -- 事件
    `event` as cast(JSON_VALUE(`message`, '$.event')as VARCHAR),
      -- 視頻故障記錄
    `agoraVideoStateFr` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.fr')as BIGINT),
      -- 視頻故障唯一標識 ID
    `agoraVideoStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.uid')as BIGINT),
      -- 音頻丟失記錄
    `agoraAudioStateLost` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.lost')as BIGINT),
      -- 音頻丟失唯一標識 ID
    `agoraAudioStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.uid')as BIGINT),
      -- 事件時間
    `messageCreateTime` as cast(JSON_VALUE(`message`, '$.messageCreateTime')as BIGINT),
     WATERMARK wk FOR messageCreateTime AS WITHOFFSET (messageCreateTime, 60000)--為rowtime定義watermark
) with (
    type = 'kafka011',
    topic = 'agora_record',
    ...
);

輸出表

create table user_av_mysql (
      -- 開窗時間
    start_time TIMESTAMP,
      -- 關窗時間
    end_time TIMESTAMP,
      --用戶 ID
    user_id BIGINT,
    web_server_id BIGINT,
      -- 直播教室 ID
    classroom_id VARCHAR,
      -- 用戶類型
    user_type VARCHAR,
    extra_uid BIGINT,
    event VARCHAR,
      -- 異常總和值
    event_sum BIGINT,
      -- 異常平均值
    event_avg DOUBLE,
      -- 異常次數
    event_count BIGINT,
    create_time TIMESTAMP
) with (
    type='rds',
    tableName='user_av_record',
    ...
);

直播課程(音頻)網絡監控

業務代碼

insert into user_av_mysql
select 
    TUMBLE_START(messageCreateTime, INTERVAL '30' SECOND) as start_time,
    TUMBLE_END(messageCreateTime, INTERVAL '30' SECOND) as end_time,
    CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
    CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
    CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
    userType as user_type,
    agoraAudioStateUid as extra_uid,
    CONCAT(event,'_AUDIO_STATE') as event,
    SUM(agoraAudioStateLost) as event_sum,
    AVG(agoraAudioStateLost) as event_avg,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraAudioStateLost >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
    userId,
    agoraChannelId,
    classroomId,
    userType,
    event,
    agoraAudioStateUid;

直播課程(視頻)網絡監控

業務代碼

insert into user_av_mysql
select 
    TUMBLE_START(messageCreateTime,INTERVAL '30' SECOND) as start_time,
    TUMBLE_END(messageCreateTime,INTERVAL '30' SECOND) as end_time,
    CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
    CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
    CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
    userType as user_type,
    agoraVideoStateUid as extra_uid,
    CONCAT(event,'_VIDEO_STATE') as event,
    SUM(agoraVideoStateFr) as event_sum,
    AVG(agoraVideoStateFr) as event_avg,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraVideoStateFr >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
    userId,
    agoraChannelId,
    classroomId,
    userType,
    event,
    agoraVideoStateUid;

場景四:運營分析

每小時不同 level 的學生註冊人數統計

學生通過不同渠道(Web 廣告輸入、App 廣告輸入等)進行註冊,本場景會讀取註冊端日誌,並關聯用戶註冊時的考試等級表(分為 A/B/C/D 四個 level),以此展現給運營人員,每小時不同 level&渠道 的學生註冊人數,實時的調整運營推廣策略。

  • 埋點數據樣例
--學生表
{
    "id":"",
    "channel_id":"",
    "update_time":""
}
--用戶註冊數據
{
    "id":"",
    "name":"",
    "register_date_time":"",
    "status":""
}

--學生測試等級表:使用場景一“學生註冊考試等級日誌ETL清洗”的結果表

輸入表

create table student_da_src (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    `id` as JSON_VALUE (`message`, '$.id'),--用戶 ID
    `channel_id` as JSON_VALUE (`message`, '$.channel_id'),--渠道 ID
    `update_time` as JSON_VALUE (`message`, '$.update_time')--更新時間
) with (
    type = 'kafka010',
    topic = 'uc_account-student',
    ...
);
create table user_da_in (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    `id` as JSON_VALUE (`message`, '$.id'),--用戶 ID
    `name` as JSON_VALUE (`message`, '$.name'),--用戶名稱
    `register_date_time` as JSON_VALUE (`message`, '$.register_date_time'),--註冊時間
    `status` as JSON_VALUE (`message`, '$.status')--狀態
) with (
    type = 'kafka010',
    topic = 'uc_account-user',
    `group.id` = 'uc_account-user',
    ...
);
create table channel_da (
    rowkey varchar,
    id VARCHAR,
    `level`  VARCHAR,
    primary key (rowkey),
    PERIOD FOR SYSTEM_TIME
) with (
    type = 'cloudhbase',
    tableName = 'databus:activity.channel',
    ...
    );

輸出表

create table sink_table (
    uk varchar,
    reg_date bigint,
    level varchar,
    leads bigint,
    primary key (uk)
) with (
    type = 'elasticsearch',
    index = 'vk_app_es_sign_csh',
    typeName = 'vk_app_es_sign_csh',
    ...
);

業務代碼

create view student_da_src_view as 
SELECT
   last_value(id) as id,
   last_value(update_time) as update_time,
   last_value(channel_id) as channel_id
from student_da_src
group by id;

create view user_da_in_view as 
SELECT
   last_value(id) as id,
   last_value(name) as name,
   last_value(register_date_time) as register_date_time,
   last_value(status) as status
from user_da_in
group by id;
 
insert into 
    sink_table 
SELECT
      case when level in ('A','B','C','D') then level else 'other' end as uk
     ,cast(date_format(register_date_time,'yyyyMMddHH') as bigint) as reg_date
     ,case when level in ('A','B','C','D') then level else 'other' end as levels
     ,COUNT(distinct t.id) AS leads
FROM 
     student_da_src_view t 
LEFT JOIN user_da_in_view  u ON u.id = t.id
LEFT JOIN channel_da FOR SYSTEM_TIME AS OF PROCTIME() ch ON ch.rowkey = MD5(t.channel_id)
where u.name not LIKE '%測試%' 
and u.name not LIKE 'DM\\_%' 
and u.name not LIKE '%test%' 
and u.status='NORMAL'
group by date_format(register_date_time,'yyyyMMddHH')
        ,case when level in ('A','B','C','D') then level else 'other' end
        ,concat(date_format(register_date_time,'yyyyMMddHH'),case when level in ('A','B','C','D') then level else 'other' end)
;

每日課程顧問追蹤統計

首先通過 ID 進行分組,求出相同 ID 的最新消息(達到去重效果),在最新消息的基礎上使用全局Group聚合,根據事件時間(天)、課程顧問 ID 統計每天每位課程顧問找學生確認“學習進度/約課”的次數。

  • 埋點數據樣例
{
    "id":"",
    "leads_flow_event_id":"",
    "group_id":"",
    "cc_id":"",
    "student_id":"",
    "order_id":"",
    "leads_id":"",
    "confirm_date_time":"",
    "create_time":"",
    "update_time":"",
    "order_create_time":"",
    "canceled_date_time":"",
    "apply_refund_date":"",
    "status":""
}

輸入表

create table cc_data_pack_order_info_src (
    `messageKey` VARBINARY,
    `message` VARBINARY,
    `topic` VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- ID
    `id` as JSON_VALUE (`message`, '$.id'),
      -- (Course Consultant)課程顧問 ID
    `cc_id` as JSON_VALUE (`message`, '$.cc_id'),
      -- 學生 ID
    `student_id` as JSON_VALUE (`message`, '$.student_id'),
      -- 確認時間
    `confirm_date_time` as JSON_VALUE (`message`, '$.confirm_date_time'),
      -- 創建時間
    `create_time` as JSON_VALUE (`message`, '$.create_time'),
      -- 更新時間
    `update_time` as JSON_VALUE (`message`, '$.update_time'),
      -- 訂單創建時間
    `order_create_time` as JSON_VALUE (`message`, '$.order_create_time'),
      -- 訂單取消時間
    `canceled_date_time` as JSON_VALUE (`message`, '$.canceled_date_time'),
    -- 付款時間
    `apply_refund_date` as JSON_VALUE (`message`, '$.apply_refund_date'),
      -- 狀態
    `status` as JSON_VALUE (`message`, '$.status')
) with (
    type = 'kafka010',
    topic = 'data_pack_order_info',
    `group.id` = 'data_pack_order_info',
    ...
);

輸出表

CREATE TABLE index_sink (
  `cc_id` bigint(20) NOT NULL,
  `cc_index` bigint(10) NOT NULL, 
  `type` int(6) NOT NULL,
  `attribution_time` varchar NOT NULL,
  `update_time` timestamp NOT NULL,
  PRIMARY KEY (`cc_id`, `type`, `attribution_time`)
) WITH (
    type='rds',
    tableName='staff_index',
    ...
);

業務代碼


CREATE VIEW cc_data_pack_order_info_view as
select
    last_value (cc_id) as cc_id,
    last_value (confirm_date_time) as confirm_date_time,
    last_value (`status`) as `status`
from
    cc_data_pack_order_info_src
group by
    id;
    
    
insert into index_sink
select 
    cast(cc_id as bigint) as cc_id,
    count(*) as cc_index,
    cast(1 as int) as type,
    date_format(confirm_date_time,'yyyy-MM-dd') as attribution_time,
    current_timestamp as update_time
from
    cc_data_pack_order_info_view 
where 
    confirm_date_time is not null
    and `status` is not null
    and `status` = 3 
group by 
    date_format(confirm_date_time,'yyyy-MM-dd'), cc_id;

Leave a Reply

Your email address will not be published. Required fields are marked *