大數據

【最佳實踐】實時計算Flink在廣告行業的實時數倉建設實踐

行業背景

  • 行業現狀: 

    • 廣告仍然是互聯網公司的主要變現手段,2019年,中國廣告市場總體規模達到8674.28億元,較2018年增長了8.54%,據統計全球互聯網市值前十的公司廣告收入佔比高達40%,可見其重要性。AI、大數據、智能投放等創新技術的普及應用,不僅創生了一批獨角獸營銷平臺,而且大幅拉低了廣告投放門檻,拓寬了廣告市場空間。
  • 大數據在其行業中的作用:

    • 大數據技術的應用在改變我們生活及工作的同時,為我們尋找數據背後的客觀規律提供了一種有效途徑。對潛在消費群體進行深入分析,並進行定製營銷基礎上的現代廣告營銷,對數據的規模及精準度有著極高的要求,而大數據的出現無疑為其落地提供了強有力的支撐。

業務場景

類似媒體,新聞類等APP,上面有各種廣告位提供給廣告主。廣告主投放廣告,用戶點擊廣告將實時的產生操作日誌數據,對這些日誌數據進行實時分析,通過每個廣告位上不同廣告的投放地區、廣告ID、設備唯一編碼等信息,可以統計點擊次數、投放次數等指標,可用於制定更高效的廣告投放策略,降低投放成本,提高廣告收益。

技術架構

image.png

架構解析:
數據採集:該場景中,APP、Web、Server等服務上會產生大量的廣告投放、用戶廣告點擊等操作日誌數據,這些日誌數據被實時採集至日誌服務系統(SLS),作為Flink的數據源。
實時數倉架構:該場景中,整個實時數倉構建,全部通過 Flink完成。Flink讀取SLS中的原始日誌數據,經過數據清洗、數據處理等操作寫出到DataHub,Flink進一步讀取DataHub的數據進行實時統計分析,最終輸出對應的指標結果到RDS,供業務系統使用。

業務指標

  • 實時數據中間層,對原始日誌進行實時數據清洗

    • 獲取投放主題及維度打寬
    • 獲取點擊主題及維度打寬
  • 統計投放指標

    • 某個廣告在某個省的當天投放量
    • 某個廣告在某個市的當天投放量
    • 某個廣告在某個投放終端的當天投放量
  • 統計點擊指標

    • 某個廣告在某個省的當天點擊量
    • 某個廣告在某個市的當天點擊量
    • 某個廣告在某個投放終端的當天點擊量
  • 熱門廣告排行榜    

業務代碼

場景一:對原始日誌進行實時數據清洗

投放主題

根據業務主題分成投放主題和點擊主題,當release_status=1時為投放主題。

輸入表

create table ods_release(
  `sid` varchar,           --投放請求ID
  exts varchar,                       --擴展信息
  device_type varchar,     --1 android| 2 ios | 9 其他
  release_status varchar,  --投放狀態 1 or 2
  device_num varchar,      --設備唯一編碼
  release_session varchar, --投放會話ID
  `date` date              --創建時間
) with (
  type ='sls',
...
);

輸出表

create table dw_release_exposure(
  release_session varchar, -- comment '投放會話id'
  release_status varchar,  -- comment '投放狀態'
  device_num varchar,      -- comment '設備唯一編碼'
  device_type varchar,     -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,       -- comment '地區'
  aid varchar,             -- comment '廣告id'
  ct date                  -- comment '創建時間'
)with(
type='datahub',
...
);

業務代碼

insert into dw_release_exposure
select
  release_session,
  release_status,
  device_num,
  device_type,
  json_value(exts,'$.area_code'),
  json_value(exts,'$.aid'),
  `date` as ct
from
ods_release
where release_status='1'
;

投放主題關聯維度表

投放主題與地區維度表、設備維度表進行聚合,得出寬表

輸入表

create table dw_release_exposure(
  release_session varchar, -- comment '投放會話id'
  release_status varchar,  -- comment '投放狀態'
  device_num varchar,      -- comment '設備唯一編碼'
  device_type varchar,     -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,       -- comment '地區'
  aid varchar,             -- comment '廣告id'
  ct date                  -- comment '創建時間'
)with(
type='datahub',
...
);

--dim維度表
--(地區,省市,唯一地區編碼,編碼和city_id是一一對應的)
create table dim_province(
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  region_id bigint,
  region_name varchar,
 PRIMARY KEY (area_code),
 PERIOD FOR SYSTEM_TIME--定義維表的變化週期。
 )with(
    type= 'rds',
...
);

--(用戶設備維度表)
create table dim_device(
  device_type varchar comment '1 android| 2 ios | 9 其他',
  device_name varchar comment '設備名字',
 PRIMARY KEY (device_type),
 PERIOD FOR SYSTEM_TIME--定義維表的變化週期。
)with(
type= 'rds',
...
);

輸出表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

業務代碼

insert into dm_release_exposure
select
  a.aid,
  count(a.aid) aid_count,
  c.device_name,
  a.area_code,
  b.province_id,
  b.province_name,
  b.city_id,
  b.city_name,
  a.ct
from
dw_release_exposure a
join
dim_province  FOR SYSTEM_TIME AS OF PROCTIME() as b on a.area_code=b.area_code
join
dim_device  FOR SYSTEM_TIME AS OF PROCTIME() as c on a.device_type=c.device_type
group by
a.aid,
a.area_code,
a.ct
;

點擊主題

根據業務主題分成投放主題和點擊主題,當release_status=2時為點擊主題。

輸入表

create table ods_release(
  `sid` varchar,           --投放請求ID
  exts varchar,                       --擴展信息
  device_type varchar,     --1 android| 2 ios | 9 其他
  release_status varchar,  --投放狀態 1 or 2
  device_num varchar,      --設備唯一編碼
  release_session varchar, --投放會話ID
  `date` date              --創建時間
  ) with (
  type ='sls',
...
);

輸出表

create table dw_release_click(
  release_session varchar,  -- comment '投放會話id'
  release_status varchar,   -- comment '投放狀態'
  device_num varchar,       -- comment '設備唯一編碼' 
  device_type varchar,      -- comment '1 android| 2 ios | 9 其他'
  `user_id` varchar,          -- comment '用戶id'
  area_code varchar,        -- comment '地區'
  aid varchar,              -- comment '廣告id'
  ct date                   -- comment '創建時間'
)with(
type='datahub',
...
);

業務代碼

insert into dw_release_click
select
  release_session,
  release_status,
  device_num,
  device_type,
  json_value(exts,'$.user_id') as `user_id`,
  json_value(exts,'$.area_code') as area_code,
  json_value(exts,'$.aid') as aid,
  `date` as ct
from
ods_release
where release_status='2'
;

點擊主題關聯維度表

點擊主題與地區維度表進行聚合,得出寬表

輸入表

create table dw_release_click(
  release_session varchar,  -- comment '投放會話id'
  release_status varchar,   -- comment '投放狀態'
  device_num varchar,       -- comment '設備唯一編碼' 
  device_type varchar,      -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,        -- comment '地區'
  aid varchar,              -- comment '廣告id'
  user_id varchar,          -- comment '用戶id'
  ct date                   -- comment '創建時間'
)with(
type='datahub',
...
);

--dim維度表
--(地區,省市,唯一地區編碼,編碼和city_id是一一對應的)
create table dim_province(
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  region_id bigint,
  region_name varchar,
 PRIMARY KEY (area_code),
 PERIOD FOR SYSTEM_TIME--定義維表的變化週期。
 )with(
    type= 'rds',
...
);

--(用戶設備維度表)
create table dim_device(
device_type varchar comment '1 android| 2 ios | 9 其他',
device_name varchar comment '設備名字',
 PRIMARY KEY (device_type),
 PERIOD FOR SYSTEM_TIME--定義維表的變化週期。
)with(
type= 'rds',
...
);

輸出表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

業務代碼

insert into dm_release_click
select
  a.aid,
  count(a.aid) aid_count,
  c.device_name,
  a.area_code,
  b.province_id,
  b.province_name,
  b.city_id,
  b.city_name,
  a.ct
from
dw_release_click a
join
dim_province  FOR SYSTEM_TIME AS OF PROCTIME() as b
on a.area_code=b.area_code
join
dim_device  FOR SYSTEM_TIME AS OF PROCTIME() as c on
a.device_type=c.device_type
group by
a.aid,
a.area_code,
a.ct
;

場景二:統計投放指標

某個廣告在某個省的當天投放量

以aid和province_name分組,統計某個廣告在某個省的當天投放量

輸入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

輸出表

--某個廣告在某個省的當天投放量
CREATE TABLE ads_release_exposure_pro (
    aid                       VARCHAR,
    aid_count                 BIGINT,
    province_name             VARCHAR,
  ct                        DATE,
    primary key(aid,province_name,ct)
) WITH (
    type= 'rds',
...
);

業務代碼

insert into ads_release_exposure_pro
select 
  aid,
  sum(aid_count) as aid_count,
  province_name,
  ct
from
dm_release_exposure
group by
aid,
province_name,
ct
;

某個廣告在某個市的當天投放量

以aid和city_name分組,統計某個廣告在某個市的當天投放量

輸入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_exposure_city (
    aid                   VARCHAR,
    aid_count             BIGINT,
    city_name             VARCHAR,
  ct                    DATE,
    primary key(aid,city_name,ct)
) WITH (
    type= 'rds',
...
);

業務代碼

insert into ads_release_exposure_city
select 
  aid,
  sum(aid_count) as aid_count,
  city_name,
  ct
from
dm_release_exposure
group by
aid,
city_name,
ct
;

某個廣告在某個投放終端的當天投放量

以aid和device_name分組,統計某個廣告在某個用戶客戶端上的當天投放量

輸入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_exposure_device (
    aid                     VARCHAR,
    aid_count               BIGINT,
    device_name             VARCHAR,
  ct                      DATE,
    primary key(aid,device_name,ct)
) WITH (
    type= 'rds',
...
);

業務代碼

insert into ads_release_exposure_device
select
  aid,
  sum(aid_count),
  device_name,
  ct
from
dm_release_exposure
group by 
aid,
device_name,
ct
;

場景三:統計點擊指標

某個廣告在某個省的當天點擊量

以ct和aid、provice_name分組,統計某個廣告在某個省的當天點擊量

輸入表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_click_pro (
  aid                  VARCHAR,
  aid_count            BIGINT,
  province_name        VARCHAR,
  ct                   DATE,
  primary key(aid,province_name,ct)
) WITH (
  type= 'rds',
...
);

業務代碼

insert into ads_release_click_pro
select
  aid,
  count(aid) as aid_count,
  province_name,
  ct
from
dm_release_click
group by
aid,
province_name,
ct
;

某個廣告在某個市的當天點擊量

以ct和aid、city_name分組,統計某個廣告在某個市的當天點擊量

輸入表

create table dm_release_click(
aid varchar,
aid_count bigint,
device_name varchar,
area_code varchar,
province_id bigint,
province_name varchar,
city_id bigint,
city_name varchar,
ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_click_city (
  aid                  VARCHAR,
  aid_count            BIGINT,
  city_name            VARCHAR,
  ct                   DATE,
  primary key(aid,city_name,ct)
) WITH (
  type= 'rds',
...
);

業務代碼

insert into ads_release_click_city
select
aid,
count(aid) as aid_count,
city_name,
ct
from
dm_release_click
group by
aid,
city_name,
ct
;

某個廣告在某個投放終端的當天投放量

以aid和device_name分組,統計某個廣告在某個用戶客戶端上的當天投放量

輸入表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_click_device (
  aid                     VARCHAR,
  aid_count               BIGINT,
  device_name             VARCHAR,
  ct                      DATE,
    primary key(aid,device_name,ct)
) WITH (
  type= 'rds',
...
);

業務代碼

insert into ads_release_click_device
select
  aid,
  sum(aid_count),
  device_name,
  ct
from
dm_release_exposure
group by
aid,
device_name,
ct
;

場景四:熱門廣告排行榜

以ct和aid分組,計算當天每個廣告的總點擊量,對廣告ID進行topn排序,得到點擊次數最多的三個廣告作為最熱門廣告。根據按天維度的時間字段(ct)和廣告ID(aid)分組,計算每天每個廣告的總點擊量,根據廣告ID對點擊量進行topn排序,統計得到每天點擊次數最多的三個廣告,用於數據大屏中的熱門廣告排行榜。

輸入表

create table dm_release_click(
aid varchar,
aid_count bigint,
area_code varchar,
province_id bigint,
province_name varchar,
city_id bigint,
city_name varchar,
ct date
)with(
type='datahub',
...
);

輸出表

CREATE TABLE ads_release_click_dtclick (
  Ranking              BIGINT,
    aid                  VARCHAR,
    ct                   DATE,
  aid_count            BIGINT,
  primary key(aid,ct)
) WITH (
    type= 'rds',
...
);

業務代碼

INSERT INTO ads_release_click_dtclick
SELECT 
Ranking,
aid,
ct,
aid_count
FROM (
  SELECT *,
     ROW_NUMBER() OVER (PARTITION BY `ct` ORDER BY aid_count desc) AS Ranking
  FROM (
        SELECT 
       `ct` AS `ct`,
        COUNT(aid) AS aid_count,
        aid
        FROM  dm_release_click
        GROUP BY `ct`,aid
    )a
) 
WHERE Ranking <= 3 

Leave a Reply

Your email address will not be published. Required fields are marked *