開發與維運

【最佳實踐】實時計算 Flink 版在金融行業的最佳實踐

行業背景

  • 行業現狀:
    金融是現代經濟的核心。我國金融業在市場化改革和對外開放中不斷髮展,金融總量大幅增長。金融穩定直接關係到國家經濟發展的前途和命運,金融業是國民經濟發展的晴雨表。對我國金融業發展現狀進行客觀分析,對金融業發展趨勢進行探索,有助於消除金融隱患,使金融業朝著健康、有序方向發展。
  • 大數據在其行業中的作用:
  1. 金融服務和產品創新:藉助社交網絡等平臺產生的海量用戶和數據記錄著用戶群體的興趣和偏好情緒等信息, 對客戶行為模式進行分析,可以帶來更貼近客戶需求的產品創新。
  2. 增強用戶體驗:通過大數據分析對客戶進行畫像,結合客戶畫像特徵,為用戶提供個性化服務,增強用戶體驗。

業務場景

某保險公司開發了個金融類APP,該公司在APP中會投放保險廣告、發佈優惠活動,用戶通過APP進行投保等操作。
業務的構建涉及到幾個端:

  1. APP:應用程序,用戶訪問入口,用戶通過APP點擊瀏覽保險廣告、優惠活動等,並進行投保下單。
  2. 後臺系統:

    a.運營人員:
    (1)根據用戶提交的訂單統計指定時間段的總投保人數和總投保金額,輔助優化運營方案。
    (2)對用戶的日常行為做出分析,分析出每個用戶比較關注的信息,作為推薦系統的數據來源。
    b.業務經理:
    對重點客戶的投保金額變動進行監控,將投保金額變動較大的重點客戶推送給業務經理,業務經理針對性開展客戶挽留等操作。
    

技術架構

金融行業最佳實踐.png

架構解析

數據採集:該場景中,數倉的數據主要來源於APP等系統的埋點信息,被實時採集至DATAHUB作為Flink的輸入數據。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的構建,全部通過Flink完成,Flink實時讀取DATAHUB的數據進行處理,並與維表進行關聯查詢等操作,最終實時統計的結果輸入到下游數據庫RDS中。

業務指標

  • 運營數據分析

     每小時的投保人數
     每小時的總保費
     每小時總保單數
  • 用戶行為監控

     用戶原投保金額
     用戶現投保金額
  • 用戶行為分析

     用戶最後訪問的頁面類型
     用戶最後訪問的頁面地址
    

數據結構

場景一:運營數據分析

本場景用於計算每小時總投保人數和總保費。
用戶投保會生成一份訂單,訂單內容包括用戶id、用戶姓名、訂單號等。flink實時讀取訂單信息,用where過濾出大於當前小時時間段的數據(數據過濾),然後根據用戶id做分組用last_value函數獲取每個用戶最終生成的訂單信息(訂單去重),最後按照小時維度聚合統計當前小時的總保費和總投保人數。

輸入表

CREATE   TABLE  user_order
(
    id                          bigint    comment '用戶id'
    ,order_no                    varchar    comment '訂單號'
    ,order_type                  bigint    comment '訂單類型'
    ,pay_time                    bigint  comment '支付時間'
    ,order_price                 double    comment '訂單價格'
    ,customer_name               varchar    comment '用戶姓名'
    ,customer_tel                varchar    comment '用戶電話'
    ,certificate_no              varchar    comment '證件號碼'
    ,gmt_created                 bigint  comment '創建時間'
    ,gmt_modified                bigint  comment '修改時間'
    ,account_payble             double      comment '應付金額'

) WITH (
     type='datahub',
     topic='user_order'
       ...
)

輸出表

CREATE    TABLE hs_order (
    biz_date              varchar COMMENT 'yyyymmddhh'
    ,total_premium         DOUBLE COMMENT '總保費'
    ,policy_cnt            BIGINT COMMENT '保單數'
    ,policy_holder_cnt     BIGINT COMMENT '投保人數'
    ,PRIMARY KEY (biz_date)
) WITH
 (
   type='rds',
   tableName='adm_pfm_zy_order_gmv_msx_hs'
   ...
 ) 
 ;

業務代碼
1.數據清洗

create view  last_order
as 
select 
     id                                 as id               
    ,last_value(order_no)               as order_no                   
    ,last_value(customer_tel)           as customer_tel     
    ,last_value(gmt_modified)           as gmt_modified                      
    ,last_value(account_payble)         as account_payble   
    from user_order
    where gmt_modified  >= cast(UNIX_TIMESTAMP(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'), 'yyyy-MM-dd')*1000 as bigint)
    group by id
;

2.數據彙總

insert into hs_order
select 
biz_date
,cast (total_premium as double) as total_premium
,cast (policy_cnt as bigint) as policy_cnt
,cast (policy_holder_cnt as bigint) as policy_holder_cnt
from (
select 
    from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')      as biz_date
    ,sum(coalesce(account_payble,0))  as total_premium
    ,count(distinct order_no)   as policy_cnt
    ,count(distinct customer_tel)  as policy_holder_cnt
from  last_order a 
group by 
from_unixtime(cast(gmt_modified/1000 as bigint),'yyyyMMddHH')
)a 
;

場景二:用戶行為監控

本場景對投保金額變動較大(總保額變動大於15%)的重點用戶進行監控。
Flink實時讀取用戶新建訂單數據,新建訂單包括用戶的id和現投保金額,通過where過濾沒有保存成功的訂單。維表中存儲了業務經理關注的重點用戶數據(如原投保金額),通過新建訂單中的用戶id與維表進行關聯查詢,如果維表中存在此用戶且總保額下降15%以上,則將該用戶的詳細信息推送給業務經理,並且在維表中更新該用戶投保金額及投保信息。

輸入表

create table update_info
(
 id             bigint      comment '用戶id'
,channel        varchar     comment '渠道編號'
,open_id        varchar     comment '訂單id'
,event          varchar     comment '事件類型'
,now_premium  varchar     comment '現投保金額'
,creator        varchar     comment '創建人'
,modifier       varchar     comment '最後修改人'
,gmt_modified   bigint      comment '修改時間'
,now_info       varchar     comment '現投保信息'
) with (
    type = 'datahub',
    topic = 'dh_prd_dm_account_wechat_trace'
    ...
   
);

維表

 create table raw_info
(
     id                 bigint  comment '用戶id'
    ,raw_premium      varchar  comment '原投保金額'
    ,raw_info           varchar  comment '原投保信息'
    ,PRIMARY KEY(id)
    ,PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
    tableName='adm_zy_acct_sub_wechat_list'
    ...
);

輸出表

create table update_info
(
     id               bigint  comment '用戶id'
    ,raw_info         varchar comment '原投保信息'
    ,now_info         varchar comment '現投保信息'
    ,raw_premium      varchar comment '原投保金額'
    ,now_premium      varchar comment '現投保金額'
    ,PRIMARY KEY(id)
) WITH (
    type='rds',
    tableName='wechat_activity_user'
    ...
);

業務代碼

create view info_join as 
select
      t1.id               as  id
    ,t2.raw_info          as  raw_info
    ,t1.now_info          as  now_info  
    ,t2.raw_premium     as raw_premium
    ,t1.now_premium     as now_premium
from update_info t1
inner join raw_info FOR SYSTEM_TIME AS OF PROCTIME() as t2
on t1.id = t2.id ;
insert into update_info
select 
     id                        as id  
    ,raw_info                  as raw_info
    ,now_info                  as now_info
    ,raw_premium               as raw_premium  
    ,now_premium               as now_premium  
from info_join where now_premium<raw_premium*0.85
;
insert into raw_info
select 
     id                        as id  
    ,now_premium               as raw_premium  
    ,now_info                  as raw_info
from info_join
;

場景三:用戶行為分析

本場景記錄用戶最後訪問的頁面名稱和類型,作為用戶畫像的特徵值。
Flink讀取用戶瀏覽APP頁面的日誌信息,如用戶id、頁面名稱和頁面類型等信息,根據用戶id和設備id進行分組,通過last_value函數獲取用戶最後一次訪問頁面的名稱和類型,輸出到RDS作為推薦系統的輸入數據,在下次用戶登錄的時候為其推送相關廣告信息,提升用戶廣告點擊率和下單的成功率。

輸入表


create table user_log
(
 log_time                bigint  comment '日誌採集日期(Linux時間)' 
,device_id               varchar  comment '設備id'
,account_id              varchar  comment '賬戶id'
,bury_point_type         varchar  comment '頁面類型或埋點類型'
,page_name               varchar  comment '頁面名稱或埋點時一級目錄'
) WITH (
    type = 'datahub',
    topic = 'edw_zy_evt_bury_point_log'
    ...
);

輸出表

create table user_last_log
(
     account_id         varchar  comment 'account_id'
    ,device_id          varchar    comment  '設備id'
    ,bury_point_type    varchar  comment '頁面類型'
    ,page_name          varchar  comment '頁面名稱'
    ,primary key(account_id)
) WITH (
    type='rds',
    tableName='adm_zy_moblie_charge_exchg_rs'
    ...
    
);

業務代碼


insert into user_last_log
select
    account_id
    ,device_id
    ,last_value(bury_point_type)  as bury_point_type
    ,last_value(page_name)  as page_name
from user_log
where account_id is not null 
group by account_id,device_id

Leave a Reply

Your email address will not be published. Required fields are marked *