行業背景
-
行業現狀:
金融是現代經濟的核心。我國金融業在市場化改革和對外開放中不斷髮展,金融總量大幅增長。金融穩定直接關係到國家經濟發展的前途和命運,金融業是國民經濟發展的晴雨表。對我國金融業發展現狀進行客觀分析,對金融業發展趨勢進行探索,有助於消除金融隱患,使金融業朝著健康、有序方向發展。 - 大數據在其行業中的作用:
- 金融服務和產品創新:藉助社交網絡等平臺產生的海量用戶和數據記錄著用戶群體的興趣和偏好情緒等信息, 對客戶行為模式進行分析,可以帶來更貼近客戶需求的產品創新。
- 增強用戶體驗:通過大數據分析對客戶進行畫像,結合客戶畫像特徵,為用戶提供個性化服務,增強用戶體驗。
業務場景
某保險公司開發了個金融類APP,該公司在APP中會投放保險廣告、發佈優惠活動,用戶通過APP進行投保等操作。
業務的構建涉及到幾個端:
- APP:應用程序,用戶訪問入口,用戶通過APP點擊瀏覽保險廣告、優惠活動等,並進行投保下單。
-
後臺系統:
a.運營人員: (1)根據用戶提交的訂單統計指定時間段的總投保人數和總投保金額,輔助優化運營方案。 (2)對用戶的日常行為做出分析,分析出每個用戶比較關注的信息,作為推薦系統的數據來源。 b.業務經理: 對重點客戶的投保金額變動進行監控,將投保金額變動較大的重點客戶推送給業務經理,業務經理針對性開展客戶挽留等操作。
技術架構
架構解析
數據採集:該場景中,數倉的數據主要來源於APP等系統的埋點信息,被實時採集至DATAHUB作為Flink的輸入數據。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的構建,全部通過Flink完成,Flink實時讀取DATAHUB的數據進行處理,並與維表進行關聯查詢等操作,最終實時統計的結果輸入到下游數據庫RDS中。
業務指標
-
運營數據分析
每小時的投保人數 每小時的總保費 每小時總保單數
-
用戶行為監控
用戶原投保金額 用戶現投保金額
-
用戶行為分析
用戶最後訪問的頁面類型 用戶最後訪問的頁面地址
數據結構
場景一:運營數據分析
本場景用於計算每小時總投保人數和總保費。
用戶投保會生成一份訂單,訂單內容包括用戶id、用戶姓名、訂單號等。flink實時讀取訂單信息,用where過濾出大於當前小時時間段的數據(數據過濾),然後根據用戶id做分組用last_value函數獲取每個用戶最終生成的訂單信息(訂單去重),最後按照小時維度聚合統計當前小時的總保費和總投保人數。
輸入表
CREATE TABLE user_order
(
id bigint comment '用戶id'
,order_no varchar comment '訂單號'
,order_type bigint comment '訂單類型'
,pay_time bigint comment '支付時間'
,order_price double comment '訂單價格'
,customer_name varchar comment '用戶姓名'
,customer_tel varchar comment '用戶電話'
,certificate_no varchar comment '證件號碼'
,gmt_created bigint comment '創建時間'
,gmt_modified bigint comment '修改時間'
,account_payble double comment '應付金額'
) WITH (
type='datahub',
topic='user_order'
...
)
輸出表
CREATE TABLE hs_order (
biz_date varchar COMMENT 'yyyymmddhh'
,total_premium DOUBLE COMMENT '總保費'
,policy_cnt BIGINT COMMENT '保單數'
,policy_holder_cnt BIGINT COMMENT '投保人數'
,PRIMARY KEY (biz_date)
) WITH
(
type='rds',
tableName='adm_pfm_zy_order_gmv_msx_hs'
...
)
;
業務代碼
1.數據清洗
create view last_order
as
select
id as id
,last_value(order_no) as order_no
,last_value(customer_tel) as customer_tel
,last_value(gmt_modified) as gmt_modified
,last_value(account_payble) as account_payble
from user_order
where gmt_modified >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
group by id
;
2.數據彙總
insert into hs_order
select
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH') as biz_date
,sum(coalesce(account_payble,0)) as total_premium
,count(distinct order_no) as policy_cnt
,count(distinct customer_tel) as policy_holder_cnt
from last_order a
group by
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a
;
場景二:用戶行為監控
本場景對投保金額變動較大(總保額變動大於15%)的重點用戶進行監控。
Flink實時讀取用戶新建訂單數據,新建訂單包括用戶的id和現投保金額,通過where過濾沒有保存成功的訂單。維表中存儲了業務經理關注的重點用戶數據(如原投保金額),通過新建訂單中的用戶id與維表進行關聯查詢,如果維表中存在此用戶且總保額下降15%以上,則將該用戶的詳細信息推送給業務經理,並且在維表中更新該用戶投保金額及投保信息。
輸入表
create table update_info
(
id bigint comment '用戶id'
,channel varchar comment '渠道編號'
,open_id varchar comment '訂單id'
,event varchar comment '事件類型'
,now_premium varchar comment '現投保金額'
,creator varchar comment '創建人'
,modifier varchar comment '最後修改人'
,gmt_modified bigint comment '修改時間'
,now_info varchar comment '現投保信息'
) with (
type = 'datahub',
topic = 'dh_prd_dm_account_wechat_trace'
...
);
維表
create table raw_info
(
id bigint comment '用戶id'
,raw_premium varchar comment '原投保金額'
,raw_info varchar comment '原投保信息'
,PRIMARY KEY(id)
,PERIOD FOR SYSTEM_TIME
) WITH (
type='ots',
tableName='adm_zy_acct_sub_wechat_list'
...
);
輸出表
create table update_info
(
id bigint comment '用戶id'
,raw_info varchar comment '原投保信息'
,now_info varchar comment '現投保信息'
,raw_premium varchar comment '原投保金額'
,now_premium varchar comment '現投保金額'
,PRIMARY KEY(id)
) WITH (
type='rds',
tableName='wechat_activity_user'
...
);
業務代碼
create view info_join as
select
t1.id as id
,t2.raw_info as raw_info
,t1.now_info as now_info
,t2.raw_premium as raw_premium
,t1.now_premium as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select
id as id
,raw_info as raw_info
,now_info as now_info
,raw_premium as raw_premium
,now_premium as now_premium
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select
id as id
,now_premium as raw_premium
,now_info as raw_info
from info_join
;
場景三:用戶行為分析
本場景記錄用戶最後訪問的頁面名稱和類型,作為用戶畫像的特徵值。
Flink讀取用戶瀏覽APP頁面的日誌信息,如用戶id、頁面名稱和頁面類型等信息,根據用戶id和設備id進行分組,通過last_value函數獲取用戶最後一次訪問頁面的名稱和類型,輸出到RDS作為推薦系統的輸入數據,在下次用戶登錄的時候為其推送相關廣告信息,提升用戶廣告點擊率和下單的成功率。
輸入表
create table user_log
(
log_time bigint comment '日誌採集日期(Linux時間)'
,device_id varchar comment '設備id'
,account_id varchar comment '賬戶id'
,bury_point_type varchar comment '頁面類型或埋點類型'
,page_name varchar comment '頁面名稱或埋點時一級目錄'
) WITH (
type = 'datahub',
topic = 'edw_zy_evt_bury_point_log'
...
);
輸出表
create table user_last_log
(
account_id varchar comment 'account_id'
,device_id varchar comment '設備id'
,bury_point_type varchar comment '頁面類型'
,page_name varchar comment '頁面名稱'
,primary key(account_id)
) WITH (
type='rds',
tableName='adm_zy_moblie_charge_exchg_rs'
...
);
業務代碼
insert into user_last_log
select
account_id
,device_id
,last_value(bury_point_type) as bury_point_type
,last_value(page_name) as page_name
from user_log
where account_id is not null
group by account_id,device_id