大數據

【最佳實踐】實時計算Flink在IoT行業的實時數倉建設實踐

行業背景

  • 行業現狀: 

    • 物聯網(Internet of Things,以下簡寫為 IoT)是互聯網、傳統電信網等資訊的承載體,讓所有具備獨立功能的普通物體實現互聯互通的網絡。物聯網廣泛應用於運輸和物流、健康醫療、智慧環境(家庭、辦公、工廠)等領域, 具有十分廣闊的市場和應用前景。物聯網將智能感知、識別技術、網絡通信與普適計算等技術融合起來,被認為是繼計算機、互聯網、智能手機之後世界信息產業發展的下一個風口。
    • 據 IDC 估計,到 2020 年物聯網將在全球範圍內產生 1.46 萬億美元的價值。據預測,屆時中國的物聯網市場規模將超過 1.8 萬億人民幣。得益於龐大的人口基數和低廉的芯片製造成本,中國將成為物聯網行業的主要參與者,並在推動全球物聯網市場增長上發揮重要的作用。
  • 大數據在其行業中的作用:

    • 數以百萬計的物聯網設備連接到物聯網,產生了大量的數據,通過大規模分析這些數據瞭解影響業務的上下文關係和模式,從而做出更加實時決策,因此,可以說大數據和物聯網密切相關。
    • 物聯網大數據分析可以幫助人們更好地理解數據,從而做出更有效、明智的決定。大數據分析使數據挖掘者和科學家能夠利用傳統工具分析非結構化數據。此外,大數據分析的目的是利用數據挖掘技術,提取知識信息,這些信息有助於進行預測,識別趨勢,發現隱藏的信息,並做出決定。

業務場景

某公司開發了一套針對商場的人流量管理和分析系統。當顧客進入商場後,固定部署的WIFI探針將實時探測並採集顧客手機或PC終端的WIFI信息。這些顧客WIFI信息經過Flink實時統計和分析後獲得商場中客流量、客流量的高峰期、顧客喜歡哪些商店、新客與常客的比例、商場的成交額、顧客的實時位置等等指標。客流量指標可用於數據大屏,顧客的實時位置指標可用於展示顧客實時密度圖,顧客喜歡的商店指標可用於個性化的廣告推送應用等。

技術架構

04673C60-F95A-4633-8382-53A6A3871778.png
架構解析
數據採集:該場景中,數倉的數據來源於WIFI探針探測到的顧客手機和PC終端的WIFI信息,實時採集至DataHub作為Flink的輸入數據。
實時數倉架構:該場景中,整個實時數倉的ETL和BI部分的構建,全部通過Flink完成,Flink實時讀取DataHub的數據進行處理,並與維表進行關聯查詢等操作,統計和分析結果最終輸出到RDS和TableStore等存儲系統,以供業務系統使用。

業務指標

  • 場景一:客流量統計

    • 每日商場客流量
    • 每日商店客流量
    • 顧客的實時位置
    • 每日商場新增顧客人數
    • 每日商場中顧客數量前5的商店
    • 每日商場中顧客最多的時間段及顧客數量
  • 場景二:顧客喜好分析

    • 單日顧客最喜歡的商店
    • 單日顧客進入超過一次的商店

業務代碼

場景一:客流量統計

每日商場客流量

對顧客WIFI信息按天維度進行分組,使用count distinct語法去重統計得到商場中顧客人數。

輸入表

--顧客WIFI信息
CREATE TABLE user_wifidata (
    id                            varchar,
    shop_code                     varchar, --商店編碼
    ap_ip                         varchar, --WIFI的ip地址
    occur_time                    varchar, --記錄時間
    user_device_mac               varchar, --終端mac地址
    rssi                          varchar, --接收信號強度
    ap_mac                        varchar, --WIFI的MAC地址
    pt                            varchar, --pt
    ts_occur_time AS TO_TIMESTAMP(cast(cast(occur_time as bigint)*1000 as bigint)),
    WATERMARK FOR ts_occur_time AS withOffset(ts_occur_time, 60000)
) WITH (
  type = 'datahub',...
);

輸出表

--每日商場客流量
CREATE TABLE trading_area_daily_people (
    count_date                      varchar, --日期
  people_sum                                            bigint,     --客流量
    primary key (count_date)
) WITH (
    type='rds',...
);

業務代碼

--每日商場可流量統計
INSERT into trading_area_daily_people
select 
    DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd') count_date,
    count(distinct data.user_device_mac)
from
user_wifidata data
group by DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd')
;

每日商店的客流量

顧客WIFI信息通過關聯商場中的商店維表獲得商店名稱,在此基礎上按照商店名稱和按天維度時間字段進行分組,使用count distinct語法去重統計每日商店的顧客人數。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--每日商店的客流量
CREATE TABLE shop_area_daily_people (
    count_date                      varchar,  --日期
  shop_name                                                varchar,    --商店名
  people_sum                                            bigint,      --客流量
    primary key (count_date)
) WITH (
    type='rds',...
);

維表

--商店維表
CREATE TABLE area_shop (
    id                                      bigint,
    shop_code             varchar, --商店編碼
  shop_name                      varchar, --商店名
    PRIMARY KEY (shop_code),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);

業務代碼

--每日商店的客流量統計
INSERT into shop_area_daily_people
select 
    DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd') count_date,
      mas.shop_name,
    count(distinct data.user_device_mac)
from
user_wifidata data
left outer join area_shop FOR SYSTEM_TIME AS OF PROCTIME() mas 
on data.shop_code=mas.shop_code 
group by DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd'),mas.shop_name
;

顧客的實時位置

顧客WIFI信息通過關聯探測顧客位置維表(place_info),獲取到顧客的實時位置信息,通過1秒鐘的滾動窗口實現每秒鐘上報一次位置信息,實時投放到數據大屏上,從而實現顧客實時密度圖功能。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--顧客來店數據表
CREATE TABLE area_customer_info_allday (
    user_device_mac        varchar, --終端mac地址
    come_time              varchar, --到來的時間
  ap_mac                               varchar, --WIFI的MAC地址
    occur_time                       varchar, --記錄時間
    used_x                                bigint,  --顧客的經度
    used_y                                bigint,  --顧客的維度
    first_come_flag                 varchar, --是否是第一次
    rssi                               varchar, --接收信號強度
    save_time                         varchar, --時間
    pt                                   varchar, --pt
  PRIMARY KEY (user_device_mac)
) WITH (
  type='ots'
);

維表

--探測設備可以獲取到顧客的位置
CREATE TABLE place_info (
    user_device_mac               varchar,  --終端mac地址
    used_x                        bigint,        --顧客的經度
    used_y                        bigint,        --顧客的維度
    min_rssi                      bigint,     --最小的接收信號強度
    PRIMARY KEY (user_device_mac),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);           

業務代碼

--通過1秒鐘的滾動窗口實現每秒鐘上報一次位置信息
CREATE view record_sec
(
  user_device_mac          ,
    come_time                ,
    ap_mac                             ,
    occur_time                    ,
    user_loca_x                    ,
    user_loca_y                    ,
    first_come_flag            ,
    rssi                                ,
    save_time                        ,
    shop_code                        ,
    pt
) as 
select data.user_device_mac ,
DATE_FORMAT(TUMBLE_START(data.ts_occur_time, INTERVAL '1' SECOND),'yyyy-MM-dd HH:mm:ss') ,
data.ap_mac,
data.occur_time,
'',
'',
'',
min(data.rssi),
FROM_UNIXTIME(unix_timestamp()),
grd.shop_code,
max(data.pt) 
from  
user_wifidata data 
GROUP BY TUMBLE(ts_occur_time, INTERVAL '1' SECOND),ap_mac,user_device_mac,shop_code,occur_time
;

CREATE view view1 as
select
a0.user_device_mac,
a0.come_time,
a0.ap_mac,
a0.occur_time,
mai.used_x ,
mai.used_y ,
a0.first_come_flag,
a0.rssi,
a0.save_time,
a0.pt
from  
record_sec a0 
left OUTER  join place_info FOR SYSTEM_TIME AS OF PROCTIME() mai 
on mai.user_device_mac=a0.user_device_mac;

INSERT into area_customer_info_allday
select *                    
 from view1 ;

每日商場新增顧客人數

顧客WIFI信息通過關聯探測顧客位置維表(place_info),獲取到顧客的實時位置信息,然後通過關聯顧客來店記錄維表,獲取到顧客是否是新顧客信息,統計新顧客的人數。同時將新顧客信息更新到顧客來店記錄表中標註成常客。此場景下,顧客來店記錄表在同一個作業中即作為結果表又作為維表。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--每日商場新增顧客人數
CREATE TABLE customer_info_people_sum (
  people_sum                       bigint,      --新增顧客人數
  come_time                            varchar,    --日期
PRIMARY KEY (come_time)
) WITH (
    type='rds',...
);

--顧客來店記錄表
CREATE TABLE shop_user_record_update (
    user_device_mac     varchar,          --終端mac地址
  first_come_flag          varchar ,            --是否是第一次來
  first_coming_time     varchar,          --進店時間
  last_coming_time      varchar ,            --離開時間
    PRIMARY KEY (user_device_mac),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
  tableName='shop_user_record'...
);

維表

--使用場景一“商店中顧客的實時位置”的維表:place_info
CREATE TABLE place_info (
    user_device_mac               varchar, --終端mac地址
    used_x                        bigint,     --顧客的經度
    used_y                        bigint,  --顧客的維度
    min_rssi                      bigint,  --最小的接收信號強度
    PRIMARY KEY (user_device_mac),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);  

--顧客來店記錄
CREATE TABLE shop_user_record (
    user_device_mac     varchar,  --終端mac地址
  first_come_flag     varchar,    --是否是第一次來
  first_coming_time     varchar,  --進店時間
    last_coming_time       varchar,    --離開時間
    PRIMARY KEY (user_device_mac),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='ots',
  tableName='shop_user_record'...
);

業務代碼

--通過1秒鐘的滾動窗口實現每秒鐘上報一次位置信息
CREATE view record_sec
(
  user_device_mac,
    come_time             ,
    ap_mac                  ,
    occur_time         ,
    user_loca_x         ,
    user_loca_y         ,
    first_come_flag,
    rssi                     ,
    save_time             ,
    shop_code             ,
    pt
) as 
select data.user_device_mac ,
DATE_FORMAT(TUMBLE_START(data.ts_occur_time, INTERVAL '1' SECOND),'yyyy-MM-dd HH:mm:ss') ,
data.ap_mac,
data.occur_time,
'',
'',
'',
min(data.rssi),
FROM_UNIXTIME(unix_timestamp()),
grd.shop_code,
max(data.pt) 
from  
user_wifidata data
GROUP BY TUMBLE(ts_occur_time, INTERVAL '1' SECOND),ap_mac,user_device_mac,shop_code,occur_time
;

CREATE view view1 as
select
a0.user_device_mac,
a0.come_time,
a0.ap_mac,
a0.occur_time,
mai.used_x ,
mai.used_y ,
a0.first_come_flag,
a0.rssi,
a0.save_time,
a0.pt
from  
record_sec a0 
left OUTER  join place_infomst_buynoplace_infow_ap_info FOR SYSTEM_TIME AS OF PROCTIME() mai 
on mai.user_device_mac=a0.user_device_mac;

CREATE view view2 as
select
a1.user_device_mac,
a1.come_time,
a1.ap_mac,
a1.occur_time,
cast(a1.used_x as varchar) as used_x,
cast(a1.used_y as varchar) as used_y,
mci.first_come_flag as first_come_flag,
a1.rssi,
a1.save_time,
a1.pt,
mci.first_coming_time,
mci.last_coming_time
from  
view1 a1
left OUTER  join shop_user_record FOR SYSTEM_TIME AS OF PROCTIME() mci 
on mci.user_device_mac = a1.user_device_mac ;


CREATE view record_out as
select
a2.user_device_mac as user_device_mac,
a2.come_time  as come_time,
a2.occur_time as occur_time,
a2.ap_mac as ap_mac,
a2.used_x as used_x,
a2.used_y as used_y,
if(a2.first_come_flag is null ,'1','0') as first_come_flag,--first_come_flag,
a2.rssi as rssi,
a2.save_time as save_time,
a2.pt as pt,
a2.first_coming_time as first_coming_time,
a2.last_coming_time as last_coming_time
from 
view2 a2 ;

--更新shop_user_record維表的顧客來店記錄
INSERT into shop_user_record_update  
select user_device_mac , first_come_flag,first_coming_time,  last_coming_time           
 from record_out ;

--統計每日商店新顧客數
insert into customer_info_people_sum
select t1.people_sum,t1.come_time
from (select  count(distinct dt1.user_device_mac) as people_sum, DATE_FORMAT(dt1.come_time,'yyyy-MM-dd') as come_time
            from record_out dt1 where  dt1.first_come_flag='1' 
      group by DATE_FORMAT(dt1.come_time,'yyyy-MM-dd')) t1 

每日商場中顧客數量前5的商店

顧客WIFI信息按照天維度和商店分組後,統計每日商店顧客數量,通過topn語句獲取到每日顧客數量前5的商店信息,再通過關聯商場店鋪表得到商店名。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--每日商場中顧客數量前5的商店
CREATE TABLE top5_shop (
        shop_code   varchar, --商店編碼
    shop_name     varchar, --商店名
    date_time     varchar, --記錄顧客進入商店的時間(天維度)
    people_sum  bigint,  --顧客數量
    PRIMARY KEY (shop_code)
) WITH (
  type='rds'...
);

維表

--商場店鋪表:使用場景一“每日商店的客流量”的維表:area_shop
CREATE TABLE area_shop (
    id                              bigint,
    shop_code         varchar, --商店編碼
  shop_name                  varchar, --商店名
    PRIMARY KEY (shop_code),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);

業務代碼

--按天、商店分組統計商店的顧客數
CREATE VIEW Window1 AS
SELECT shop_code,
       DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd') AS date_time,
       count(distinct user_device_mac) AS people_sum
FROM user_wifidata data
GROUP BY shop_code, DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd');

-- 統計每天top5客流量的商店
CREATE VIEW top5_view AS
SELECT shop_code, date_time, people_sum, rownum FROM
(
   SELECT
   shop_code, date_time, people_sum,
   ROW_NUMBER() OVER (PARTITION BY shop_code,date_time ORDER BY people_sum DESC) as rownum
   FROM
   Window1
)
WHERE rownum <= 5;

INSERT into top5_shop
select 
    top5.shop_code ,
    mas.shop_name ,
    top5.date_time ,
    top5.people_sum
from
top5_view top5
inner join area_shop FOR SYSTEM_TIME AS OF PROCTIME() mas 
on top5.shop_code=mas.shop_code

每日商場中顧客最多的時間段及顧客數量

顧客WIFI信息按照1小時滾動窗口統計商場顧客數量,使用topn語句獲取到每日商場中顧客最多的時間段。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--商場中顧客最多的時間段及顧客數量
CREATE TABLE top1_time (
    start_time timestamp, --時間段
    people_sum bigint ,   --顧客數量
    PRIMARY KEY (start_time)
) WITH (
  type='rds',...
);

業務代碼

--1小時滾動窗口統計每小時商場的顧客數
CREATE VIEW Window1 AS
SELECT shop_code,
      TUMBLE_START(ts_occur_time, INTERVAL '1' hour) AS start_time,
      count(distinct user_device_mac) AS people_sum
FROM user_wifidata
GROUP BY shop_code, TUMBLE(ts_occur_time, INTERVAL '1' hour);

INSERT into top1_time 
SELECT start_time, people_sum FROM
(
   SELECT
   start_time, people_sum,
   ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY people_sum DESC) as rownum
   FROM
   Window1
)
WHERE rownum <= 1;

場景二:顧客喜好分析

每日單個顧客最喜歡的商店

顧客WIFI信息按照商店、顧客、天維度進行分組得到每日顧客進入商店的次數,使用topn語句獲取到每日顧客去過最多的商店,通過關聯商店名維表獲取商店名,從而統計每日單個顧客最喜歡的商店。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--單個顧客最喜歡的商店輸出表
CREATE TABLE favorite_shop (
      user_device_mac varchar,        --終端mac地址
    date_time             varchar,         --記錄顧客進入商店的時間(天維度)
    shop_code             varchar,       --商店編碼
    shop_name             varchar,        --商店名
    shop_number       bigint ,        --進店次數
PRIMARY KEY (shop_code)
) WITH (
  type='rds'...
);

維表

--商場店鋪表:使用場景一“每日商店的客流量”的維表:area_shop
CREATE TABLE area_shop (
    id                              bigint,
    shop_code         varchar, --商店編碼
  shop_name                  varchar, --商店名
    PRIMARY KEY (shop_code),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);

業務代碼

--按天、顧客和商店分組統計單個客戶當天進入單個商店的次數
CREATE VIEW Window1 AS
SELECT user_device_mac,
            shop_code,
      DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd') AS date_time,
      count(shop_code) AS shop_number
FROM user_wifidata data
GROUP BY user_device_mac,shop_code, DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd');

-- 統計一天中顧客進入次數最多的商店,並輸出
CREATE VIEW top1_view AS
SELECT user_device_mac,shop_code, date_time, shop_number, rownum FROM
(
   SELECT
   user_device_mac,shop_code, date_time, shop_number,
   ROW_NUMBER() OVER (PARTITION BY user_device_mac,shop_code,date_time ORDER BY shop_number DESC) as rownum
   FROM
   Window1
)
WHERE rownum <= 1;


--關聯商店名維表並輸出
INSERT into favorite_shop
select 
        top1.user_device_mac,
    top1.date_time ,
    top1.shop_code ,
    mas.shop_name ,
    top1.shop_number
from
top1_view top1
inner join area_shop FOR SYSTEM_TIME AS OF PROCTIME() mas 
on top1.shop_code=mas.shop_code

單日單個顧客進入超過一次的商店

顧客WIFI信息按照商店、顧客、天維度進行分組獲取到每日顧客進入商店的次數,通過where語句獲取到每日顧客進入超過一次的商店,再通過關聯商店名維表得到商店名,從而得到單日單個顧客進入超過一次的商店信息。

輸入表

--顧客WIFI信息:使用場景一“每日商場客流量”的輸入表:user_wifidata

輸出表

--單日單個顧客進入超過一次的商店
CREATE TABLE morethanonce_shop (
      user_device_mac varchar,                --終端mac地址
    date_time             varchar,                 --記錄顧客進入商店的時間(天維度)
    shop_code             varchar,               --商店編碼
    shop_name             varchar,                --商店名
    shop_number       bigint,                 --進店次數
  PRIMARY KEY (user_device_mac)
) WITH (
  type='rds',...
);

維表

--商場店鋪表:使用場景一“每日商店的客流量”的維表:area_shop
CREATE TABLE area_shop (
    id                              bigint,
    shop_code         varchar, --商店編碼
  shop_name                  varchar, --商店名
    PRIMARY KEY (shop_code),
    PERIOD FOR SYSTEM_TIME
) WITH (
    type='rds',...
);

業務代碼

--按天、顧客和商店分組統計單個客戶當天進入單個商店的次數
CREATE VIEW Window1 AS
SELECT user_device_mac,
            shop_code,
      DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd')AS date_time,
      count(shop_code) AS shop_number
FROM user_wifidata data
GROUP BY user_device_mac,shop_code, DATE_FORMAT(data.ts_occur_time,'yyyy-MM-dd');

-- 1天中顧客進入超過兩次的商店
CREATE VIEW exceed2_view AS
SELECT user_device_mac,shop_code, date_time, shop_number FROM
Window1
WHERE shop_number >= 2;

INSERT into morethanonce_shop
select 
        exceed2.user_device_mac,
    exceed2.date_time ,
    exceed2.shop_code  ,
    mas.shop_name          ,
    exceed2.shop_number
from
exceed2_view exceed2
inner join area_shop FOR SYSTEM_TIME AS OF PROCTIME() mas 
on exceed2.shop_code=mas.shop_code

Leave a Reply

Your email address will not be published. Required fields are marked *