行業背景
-
行業現狀:
- 在線教育是運用互聯網、人工智能等現代信息技術進行教與學互動的新型教育方式,是教育服務的重要組成部分。發展在線教育,有利於構建網絡化、數字化、個性化、終身化的教育體系,有利於建設“人人皆學、處處能學、時時可學”的學習型社會。
-
大數據在其行業中的作用:
- 對未來客戶的畫像更加精準,營銷推廣時可以對接更好的服務並提升成交轉化率(提升ROI不一定,這涉及到外部競爭);
- 更全面的評估老師、學生、機構、行業等在線教育行業的各個參與者;
- 大數據幫助在線教育行業更快發展
業務場景
某公司開發了個在線教育類APP,培訓機構可以在APP中會發布一些直播課程,離線課程,習題,學習文章等內容。用戶可在線學習新知識,離線鞏固已學知識,並對學過的內容進行課後練習/測試。
業務的構建涉及到幾部分:
- APP:應用程序,用戶訪問入口
-
後臺系統:
- 教學老師:通過分析學生課堂參與情況,提供不同的授課方案,因材施教。
- 運維人員:通過運維監控指標,實時監控在線教育直播網絡質量。
-
運營人員:根據學生註冊、學習質量、平臺成單量等統計信息針對性開展平臺運營工作:
- 學生辦理註冊、增刪課程等操作;
- 學生學習質量審核;
- 平臺指標查看,如平臺日成單量統計。
技術架構
架構解析:
數據採集:該場景中,數倉的數據來源有兩部分:app的埋點至消息隊列 Kafka 以及 hbase 等業務數據庫的增量日誌。值得注意的一點是,實時數倉往往和離線數倉配合使用,共享一套管控系統,如權限/元數據管理/調度等系統。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的構建,全部通過 Flink + Kafka 完成,原始日誌app_log_origin是從客戶端直接收集上來的。然後數據處理,加維等操作後,最終輸入到業務系統。
業務指標
-
實時數據中間層
-
學生操作日誌 ETL 清洗(分析學生操作在線信令日誌)
- 獲取學生移動圖片操作
- 獲取學生 hover 圖片操作
- 獲取學生畫線操作
- 音頻播放
- 音頻暫停
- 圖文匹配錯誤
- 圖文匹配正確
- 學生註冊考試等級日誌 ETL 清洗
-
-
學生行為分析
- 學生在線(直播)課程課堂表現統計
- 學生離線(錄播)課程學習時長統計
-
運維/網絡監控
- 直播課程(音頻)網絡監控
- 直播課程(視頻)網絡監控
-
運營分析
- 每小時不同 level 的學生註冊人數統計
- 每日課程顧問追蹤統計
說明:該案例中僅包含以上場景及指標,在實際的應用場景下還包括日uv/pv,topN熱門授課教師,教師授課質量、數量審核等其他指標。
業務代碼
場景一:對原始日誌進行實時數據清洗
學生操作日誌 ETL 清洗(分析學生操作在線信令日誌)
學生在直播課程中,會做一些隨堂練習/測試,通過頁面點擊等操作形成原始埋點日誌,為了很快的感知學生的學習表現(課堂表現),業務方針對不同的操作進行計分處理。為了下游有效的對數據進行處理,針對學生不同的操作,將原始數據(多層 JSON 數據)進行清洗(單層 JSON 數據),寫入 kafka 中。
- 埋點數據樣例
--輸入
{
"createTime":"",
"data":{
"userid":"",
"roomid":"",
"timestamp":"",
"role":"",
"msgid":"",
"msg":{
"msgtype":"",
"msg_data":{
"target_id":"",
"target_type":"",
"action":"",
"sub_action":"",
"page_index":""
}
}
}
}
--輸出
{
"messageCreateTime":"",
"timeStamp":"",
"messageTimeStamp":"",
"userId":"",
"roomId":"",
"role":"",
"msgId":"",
"msgType":"",
"targetId":"",
"targetType":"",
"action":"",
"subAction":"",
"pageIndex":"",
"event":""
}
輸入表
create table timeline_analysis_student_stream (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 事件時間
`createTime` as cast(JSON_VALUE(`message`, '$.createTime')as VARCHAR),
-- 用戶 ID
`userid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.userid') as BIGINT),
-- 教室 ID
`roomid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.roomid') as BIGINT),
-- 操作時間
`time_stamp` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.timestamp') as BIGINT),
-- 角色
`role` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.role') as TINYINT),
-- 消息 ID
`msgid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msgid') as BIGINT),
-- 消息類型
`msg_msgType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msgtype') as VARCHAR),
-- 消息目標 ID
`msg_msgData_targetId` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_id') as VARCHAR),
-- 消息目標類型
`msg_msgData_targetType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_type') as VARCHAR),
-- 學生操作
`msg_msgData_action` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.action') as VARCHAR),
-- 學生次操作
`msg_msgData_subAction` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.sub_action') as VARCHAR),
-- PPT 頁碼
`msg_msgData_pageIndex` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.page_index') as BIGINT)
) with (
type = 'kafka011',
topic = 'timeline_client_topic',
`group.id` = 'timeline_analysis_student_consumer',
...
);
輸出表
create table signal_student_classroom_internation (
messageKey VARBINARY,
`message` VARBINARY,
PRIMARY KEY (messageKey)
) with (
type = 'kafka011',
topic = 'timeline_analysis_student',
...
);
業務代碼
-
獲取學生移動圖片操作
- 當學生學習詞性(形容詞/副詞),課堂小練習讓學生將屏幕中出現的單詞圖片進行分類,學生需要移動圖片進入不同的分類桶中。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MOVE_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'move';
-
獲取學生 hover 圖片操作
- 當學生學習單詞時,需要學習單詞讀音,當學生鼠標懸停到圖片時進行發音教學。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_HOVER_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'mouse' AND
msg_msgData_subAction = 'over';
-
獲取學生畫線操作
- 學生通過畫線來進行隨堂圖文匹配練習。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_LINE_DRAW"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'shape' AND
msg_msgData_action = 'add';
-
獲取學生音頻播放操作
- 學生播放課件中的音頻。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PLAY"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'audio' AND
msg_msgData_subAction = 'start';
-
獲取學生音頻暫停操作
- 學生暫停課件中的音頻。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PAUSE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'audio' AND
msg_msgData_subAction = 'pause';
-
獲取學生圖文匹配錯誤操作
- 連線操作後,返回給學生連線結果。會影響課堂表現分數。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_WRONG"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetId = 'match' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'match' AND
msg_msgData_subAction = 'drop:wrong';
-
獲取學生圖文匹配正確操作
- 連線操作後,返回給學生連線結果。會影響課堂表現分數。
INSERT INTO signal_student_classroom_internation
SELECT
cast(messageKey as VARBINARY) as messageKey,
cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_CORRECT"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
msgid = '305' AND
msg_msgType = '116' AND
role = '2' AND
msg_msgData_targetId = 'match' AND
msg_msgData_targetType = 'template' AND
msg_msgData_action = 'match' AND
msg_msgData_subAction = 'drop:correct';
學生註冊考試等級日誌 ETL 清洗
學生在 WEB/APP 頁面註冊時需要考試測評等級,以便後期學習對應 Level 的課程,通過 Flink 做數據清洗,將埋點到 kafka 上日誌,輸出到 Hbase。
- 埋點數據樣例
{
"id":"",
"chinese_name":"",
"english_name":"",
"level":"",
"pid":"",
"create_time":"",
"update_time":"",
"dept_id":""
}
輸入表
create table blink_stg_activity__channel_name_dictionary_da (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- ID
id as JSON_VALUE(`message`,'$.id'),
-- 中文名稱
chinese_name as JSON_VALUE(`message`,'$.chinese_name'),
-- 英文名稱
english_name as JSON_VALUE(`message`,'$.english_name'),
-- 測試登記
level as JSON_VALUE(`message`,'$.level'),
-- 唯一標識 ID
pid as JSON_VALUE(`message`,'$.pid'),
-- 創建時間
create_time as JSON_VALUE(`message`,'$.create_time'),
-- 更新時間
update_time as JSON_VALUE(`message`,'$.update_time'),
-- 部門 ID
dept_id as JSON_VALUE(`message`,'$.dept_id')
) with (
type = 'kafka010',
topic = 'blink_stg_activity__channel_name_dictionary_da',
`group.id` = 'blink_stg_activity__channel_name_dictionary_da',
...
);
輸出表
create table blink_stg_activity__channel_name_dictionary_da_sinkhbase (
rowkey varchar,
id varchar,
chinese_name varchar,
english_name varchar,
level varchar,
pid varchar,
create_time varchar,
update_time varchar,
dept_id varchar,
primary key (rowkey)
) with (
type = 'cloudhbase',
tableName = 'channel_name_dictionary',
...
);
業務代碼
insert into
blink_stg_activity__channel_name_dictionary_da_sinkhbase
SELECT
MD5(id) as rowkey,
id ,
chinese_name ,
english_name ,
level ,
pid ,
create_time ,
update_time ,
dept_id
from
blink_stg_activity__channel_name_dictionary_da;
場景二:學生行為分析
學生在線(直播)課程課堂表現統計
場景一中針對學生操作日誌進行了清洗,該場景消費其清洗之後的數據,針對不同的用戶 ID、Web 服務端 ID、角色、操作事件進行分組,開 1min 窗口,通過 count(event)聚合進行計分,求得每分鐘學生在線(直播)課程的課堂表現。
- 該指標上游數據是在學生操作日誌 ETL 清洗的基礎上進行統計
{
"userId":"",
"roomId":"",
"role":"",
"event":"",
"timeStamp":""
}
輸入表
create table timeline_analysis_student_mashup_stream (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 用戶 ID
`userId` as cast(JSON_VALUE (`message`, '$.userId') as BIGINT),
-- Web 服務器 ID
`webserverId` as cast(JSON_VALUE (`message`, '$.roomId') as BIGINT),
-- 角色
`role` as cast(JSON_VALUE (`message`, '$.role') as TINYINT),
-- 操作事件
`event` as cast(JSON_VALUE (`message`, '$.event') as VARCHAR),
-- 事件時間
time_stamp as TO_TIMESTAMP(cast(JSON_VALUE (`message`, '$.timeStamp') as BIGINT)),
WATERMARK wk FOR time_stamp AS WITHOFFSET (time_stamp, 0)--為rowtime定義watermark
) with (
type = 'kafka011',
topic = 'timeline_analysis_student',
`group.id` = 'timeline-analysis-student-mashup-consumer',
...
);
輸出表
create table timeline_signal_analysis_mysql (
start_time TIMESTAMP,
end_time TIMESTAMP,
webserver_id BIGINT,
user_id BIGINT,
role TINYINT,
event VARCHAR,
event_count BIGINT,
create_time TIMESTAMP
) with (
type='RDS',
tableName='timeline_signal_analysis',
...
);
業務代碼
-
學生課堂表現解析
- 學生在課堂中舉手回答問題等行為進行積分,以此衡量學生課堂表現。
insert into timeline_signal_analysis_mysql
select
TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) as start_time,
TUMBLE_END(time_stamp,INTERVAL '1' MINUTE) as end_time,
webserverId as webserver_id,
userId as user_id,
role as role,
event as event,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM timeline_analysis_student_mashup_stream
GROUP BY TUMBLE (time_stamp,INTERVAL '1' MINUTE),
userId,
webserverId,
role,
event;
學生離線(錄播)課程學習時長統計
通過 subEvent = 'PPT_SUCCESS' 將完成課程的事件整理出來,通過自關聯的方式,和源表進行 JOIN 打寬,計算 'PPT_SUCCESS' 的時間點與最初播放 PPT 的時間差值。
- 埋點數據樣例
{
"classroom_id":"",
"user_type":"",
"user_id":"",
"event_time":"",
"sub_event":"",
"extra":{
"data_time":"",
"msg":{
"pptIndex":""
}
}
}
輸入表
create table qos_log_kafka (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
--(錄播)教室 ID
`classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
-- 用戶類型
`userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
-- 用戶 ID
`userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
-- 事件時間
`eventTime` as cast(JSON_VALUE(`message`, '$.event_time')as BIGINT),
-- 次操作
`subEvent` as cast(JSON_VALUE(`message`, '$.sub_event')as VARCHAR),
-- 數據時間
`extraDataTime` as cast(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.data_time')as VARCHAR)as BIGINT),
-- PPT 頁碼
`extraMsgIndex` as cast(JSON_VALUE(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.msg')as VARCHAR), '$.pptIndex')as BIGINT)
) with (
type = 'kafka011',
topic = 'qos_log',
...
);
輸出表
create table user_enter_classroom_take_time_mysql (
user_id BIGINT,
classroom_id VARCHAR,
user_type VARCHAR,
spend_time BIGINT,
event_time TIMESTAMP,
create_time TIMESTAMP
) with (
type='rds',
tableName='user_enter_classroom_take_time',
...
);
業務代碼
-
學生進入教室時長
- 離線錄播課程,通過 PPT 的播放時間來計算學生進入教室的時長。
CREATE VIEW qos_log_kafka_view AS
SELECT
`userId`,
`classroomId`,
`userType`,
`eventTime`,
subEvent,
`extraDataTime`
FROM qos_log_kafka
WHERE subEvent = 'PPT_SUCCESS';
insert into user_enter_classroom_take_time_mysql
SELECT
a.userId,
a.classroomId,
a.userType,
b.extraDataTime-a.extraDataTime,--毫秒值
TO_TIMESTAMP(a.eventTime),
CURRENT_TIMESTAMP
FROM qos_log_kafka a
JOIN qos_log_kafka_view b ON a.userId=b.userId AND a.classroomId=b.classroomId
WHERE a.extraDataTime<b.extraDataTime;
場景三:運維/網絡監控
通過學生直播課程中,視頻/音頻運維埋點信息計算,以userId, agoraChannelId,classroomId, userType, event,agoraAudioStateUid/agoraVideoStateUid進行分組,開 30s 的滾動窗口,求最近 30s 直播課的視頻/音頻質量(丟包/異常平均值、總次數),供下游運維同學監控,實時調整音頻/視頻質量,給用戶最佳的學習體驗。
- 埋點數據樣例
{
"classroom_id":"",
"user_type":"",
"user_id":"",
"agora_channel_id":"",
"event":"",
"agora_videoState":{
"fr":"",
"uid":""
},
"agora_audioState":{
"lost":"",
"uid":""
},
"messageCreateTime":""
}
輸入表
create table qos_agora_record_kafka (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
-- 直播教室 ID
`classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
-- 用戶類型
`userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
-- 用戶 ID
`userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
-- 渠道 ID
`agoraChannelId` as cast(JSON_VALUE(`message`, '$.agora_channel_id')as BIGINT),
-- 事件
`event` as cast(JSON_VALUE(`message`, '$.event')as VARCHAR),
-- 視頻故障記錄
`agoraVideoStateFr` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.fr')as BIGINT),
-- 視頻故障唯一標識 ID
`agoraVideoStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.uid')as BIGINT),
-- 音頻丟失記錄
`agoraAudioStateLost` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.lost')as BIGINT),
-- 音頻丟失唯一標識 ID
`agoraAudioStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.uid')as BIGINT),
-- 事件時間
`messageCreateTime` as cast(JSON_VALUE(`message`, '$.messageCreateTime')as BIGINT),
WATERMARK wk FOR messageCreateTime AS WITHOFFSET (messageCreateTime, 60000)--為rowtime定義watermark
) with (
type = 'kafka011',
topic = 'agora_record',
...
);
輸出表
create table user_av_mysql (
-- 開窗時間
start_time TIMESTAMP,
-- 關窗時間
end_time TIMESTAMP,
--用戶 ID
user_id BIGINT,
web_server_id BIGINT,
-- 直播教室 ID
classroom_id VARCHAR,
-- 用戶類型
user_type VARCHAR,
extra_uid BIGINT,
event VARCHAR,
-- 異常總和值
event_sum BIGINT,
-- 異常平均值
event_avg DOUBLE,
-- 異常次數
event_count BIGINT,
create_time TIMESTAMP
) with (
type='rds',
tableName='user_av_record',
...
);
直播課程(音頻)網絡監控
業務代碼
insert into user_av_mysql
select
TUMBLE_START(messageCreateTime, INTERVAL '30' SECOND) as start_time,
TUMBLE_END(messageCreateTime, INTERVAL '30' SECOND) as end_time,
CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
userType as user_type,
agoraAudioStateUid as extra_uid,
CONCAT(event,'_AUDIO_STATE') as event,
SUM(agoraAudioStateLost) as event_sum,
AVG(agoraAudioStateLost) as event_avg,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraAudioStateLost >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
userId,
agoraChannelId,
classroomId,
userType,
event,
agoraAudioStateUid;
直播課程(視頻)網絡監控
業務代碼
insert into user_av_mysql
select
TUMBLE_START(messageCreateTime,INTERVAL '30' SECOND) as start_time,
TUMBLE_END(messageCreateTime,INTERVAL '30' SECOND) as end_time,
CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
userType as user_type,
agoraVideoStateUid as extra_uid,
CONCAT(event,'_VIDEO_STATE') as event,
SUM(agoraVideoStateFr) as event_sum,
AVG(agoraVideoStateFr) as event_avg,
COUNT(event) as event_count,
CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraVideoStateFr >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
userId,
agoraChannelId,
classroomId,
userType,
event,
agoraVideoStateUid;
場景四:運營分析
每小時不同 level 的學生註冊人數統計
學生通過不同渠道(Web 廣告輸入、App 廣告輸入等)進行註冊,本場景會讀取註冊端日誌,並關聯用戶註冊時的考試等級表(分為 A/B/C/D 四個 level),以此展現給運營人員,每小時不同 level&渠道 的學生註冊人數,實時的調整運營推廣策略。
- 埋點數據樣例
--學生表
{
"id":"",
"channel_id":"",
"update_time":""
}
--用戶註冊數據
{
"id":"",
"name":"",
"register_date_time":"",
"status":""
}
--學生測試等級表:使用場景一“學生註冊考試等級日誌ETL清洗”的結果表
輸入表
create table student_da_src (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
`id` as JSON_VALUE (`message`, '$.id'),--用戶 ID
`channel_id` as JSON_VALUE (`message`, '$.channel_id'),--渠道 ID
`update_time` as JSON_VALUE (`message`, '$.update_time')--更新時間
) with (
type = 'kafka010',
topic = 'uc_account-student',
...
);
create table user_da_in (
messageKey VARBINARY,
`message` VARBINARY,
topic VARCHAR,
`partition` INT,
`offset` BIGINT,
`id` as JSON_VALUE (`message`, '$.id'),--用戶 ID
`name` as JSON_VALUE (`message`, '$.name'),--用戶名稱
`register_date_time` as JSON_VALUE (`message`, '$.register_date_time'),--註冊時間
`status` as JSON_VALUE (`message`, '$.status')--狀態
) with (
type = 'kafka010',
topic = 'uc_account-user',
`group.id` = 'uc_account-user',
...
);
create table channel_da (
rowkey varchar,
id VARCHAR,
`level` VARCHAR,
primary key (rowkey),
PERIOD FOR SYSTEM_TIME
) with (
type = 'cloudhbase',
tableName = 'databus:activity.channel',
...
);
輸出表
create table sink_table (
uk varchar,
reg_date bigint,
level varchar,
leads bigint,
primary key (uk)
) with (
type = 'elasticsearch',
index = 'vk_app_es_sign_csh',
typeName = 'vk_app_es_sign_csh',
...
);
業務代碼
create view student_da_src_view as
SELECT
last_value(id) as id,
last_value(update_time) as update_time,
last_value(channel_id) as channel_id
from student_da_src
group by id;
create view user_da_in_view as
SELECT
last_value(id) as id,
last_value(name) as name,
last_value(register_date_time) as register_date_time,
last_value(status) as status
from user_da_in
group by id;
insert into
sink_table
SELECT
case when level in ('A','B','C','D') then level else 'other' end as uk
,cast(date_format(register_date_time,'yyyyMMddHH') as bigint) as reg_date
,case when level in ('A','B','C','D') then level else 'other' end as levels
,COUNT(distinct t.id) AS leads
FROM
student_da_src_view t
LEFT JOIN user_da_in_view u ON u.id = t.id
LEFT JOIN channel_da FOR SYSTEM_TIME AS OF PROCTIME() ch ON ch.rowkey = MD5(t.channel_id)
where u.name not LIKE '%測試%'
and u.name not LIKE 'DM\\_%'
and u.name not LIKE '%test%'
and u.status='NORMAL'
group by date_format(register_date_time,'yyyyMMddHH')
,case when level in ('A','B','C','D') then level else 'other' end
,concat(date_format(register_date_time,'yyyyMMddHH'),case when level in ('A','B','C','D') then level else 'other' end)
;
每日課程顧問追蹤統計
首先通過 ID 進行分組,求出相同 ID 的最新消息(達到去重效果),在最新消息的基礎上使用全局Group聚合,根據事件時間(天)、課程顧問 ID 統計每天每位課程顧問找學生確認“學習進度/約課”的次數。
- 埋點數據樣例
{
"id":"",
"leads_flow_event_id":"",
"group_id":"",
"cc_id":"",
"student_id":"",
"order_id":"",
"leads_id":"",
"confirm_date_time":"",
"create_time":"",
"update_time":"",
"order_create_time":"",
"canceled_date_time":"",
"apply_refund_date":"",
"status":""
}
輸入表
create table cc_data_pack_order_info_src (
`messageKey` VARBINARY,
`message` VARBINARY,
`topic` VARCHAR,
`partition` INT,
`offset` BIGINT,
-- ID
`id` as JSON_VALUE (`message`, '$.id'),
-- (Course Consultant)課程顧問 ID
`cc_id` as JSON_VALUE (`message`, '$.cc_id'),
-- 學生 ID
`student_id` as JSON_VALUE (`message`, '$.student_id'),
-- 確認時間
`confirm_date_time` as JSON_VALUE (`message`, '$.confirm_date_time'),
-- 創建時間
`create_time` as JSON_VALUE (`message`, '$.create_time'),
-- 更新時間
`update_time` as JSON_VALUE (`message`, '$.update_time'),
-- 訂單創建時間
`order_create_time` as JSON_VALUE (`message`, '$.order_create_time'),
-- 訂單取消時間
`canceled_date_time` as JSON_VALUE (`message`, '$.canceled_date_time'),
-- 付款時間
`apply_refund_date` as JSON_VALUE (`message`, '$.apply_refund_date'),
-- 狀態
`status` as JSON_VALUE (`message`, '$.status')
) with (
type = 'kafka010',
topic = 'data_pack_order_info',
`group.id` = 'data_pack_order_info',
...
);
輸出表
CREATE TABLE index_sink (
`cc_id` bigint(20) NOT NULL,
`cc_index` bigint(10) NOT NULL,
`type` int(6) NOT NULL,
`attribution_time` varchar NOT NULL,
`update_time` timestamp NOT NULL,
PRIMARY KEY (`cc_id`, `type`, `attribution_time`)
) WITH (
type='rds',
tableName='staff_index',
...
);
業務代碼
CREATE VIEW cc_data_pack_order_info_view as
select
last_value (cc_id) as cc_id,
last_value (confirm_date_time) as confirm_date_time,
last_value (`status`) as `status`
from
cc_data_pack_order_info_src
group by
id;
insert into index_sink
select
cast(cc_id as bigint) as cc_id,
count(*) as cc_index,
cast(1 as int) as type,
date_format(confirm_date_time,'yyyy-MM-dd') as attribution_time,
current_timestamp as update_time
from
cc_data_pack_order_info_view
where
confirm_date_time is not null
and `status` is not null
and `status` = 3
group by
date_format(confirm_date_time,'yyyy-MM-dd'), cc_id;