開發與維運

​實戰:Flink 1.12 維表 Join Hive 最新分區功能體驗

我們生產常有將實時數據流與 Hive 維表 join 來豐富數據的需求,其中 Hive 表是分區表,業務上需要關聯上 Hive 最新分區的數據。上週 Flink 1.12 發佈了,剛好支撐了這種業務場景,我也將 1.12 版本部署後做了一個線上需求並上線。對比之前生產環境中實現方案,最新分區直接作為時態表提升了很多開發效率,在這裡做一些小的分享。

● Flink 1.12 前關聯 Hive 最新分區方案
● Flink 1.12 關聯 Hive 最新分區表
● 關聯Hive最新分區 Demo
● Flink SQL 開發小技巧

Flink 1.12 前關聯 Hive 最新分區方案

在分區時態表出來之前,為了定期關聯出最新的分區數據,通常要寫 DataStream 程序,在 map 算子中實現關聯 Hive 最新分區表的邏輯,得到關聯打寬後的 DataStream 對象,通過將該 DataStream 對象轉換成 Table 對象後,再進行後續的 SQL 業務邏輯加工。

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env, streamSettings);  
DataStream<Tuple2<MasterBean, HiveDayIndexBean>> indexBeanStream = masterDataStream.map(new IndexOrderJoin());

map 算子中的主要邏輯: 將 T+2 的維度數據與實時數據關聯,返回 Tuple2 數據,因為離線數倉出數一般在凌晨 3 點,有時候由於集群資源不穩定導致數據產出慢,業務對實時性要求也不高,所以這裡用的是 T+2 的數據。

public class IndexOrderJoin extends RichMapFunction<MasterBean, Tuple2<MasterBean, HiveDimBean>> {    
    private Map<Integer, Map<String, HiveDimBean>> map = null;  
    Logger logger;  
  
    @Override  
    public void open(Configuration parameters) throws Exception {  
        logger = LoggerFactory.getLogger(Class.forName("com.hll.util.IndexOrderJoin"));  
        map = new HashMap<>(); 
    }  
  
public Tuple2<MasterBean, HiveDayIndexBean> map(MasterBean masterBean) {    
    if (map.get(masterBean.getReportDate() - 2) == null) {    
        //如果map裡沒有T+2的維表數據則查詢一次Hive,並將結果存入線程級別map,所以保證Task維表數據是全的    
        logger.info("initial hive data : {}", masterBean.getReportDate());    
        map.put(masterBean.getReportDate() - 2, getHiveDayIndex(masterBean.getReportDate() - 2));    
    }    
    //將的kafka數據與hive join後返回打寬數據    
    return new Tuple2<>(masterBean, map.get(masterBean.getReportDate() - 2).get(masterBean.getGroupID()));    
}

基於關聯打寬後的 DataStream 創建視圖,然後再做後續的 SQL 業務邏輯查詢。

tblEnv.createTemporaryView("index_order_master", indexBeanStream); tblEnv.sqlUpdate("select group_id, group_name, sum(amt) from index_order_master  group by group_id, group_name"); 
tblEnv.execute("rt_aggr_master_flink");

可以看出,在沒有支持 Hive 最新分區做時態表的時候,簡單的一個 join 便涉及到DataStream、map 算子,程序的代碼量和維護成本會是比較大的。

Flink 1.12 關聯 Hive 最新分區表

Flink 1.12 支持了 Hive 最新的分區作為時態表的功能,可以通過 SQL 的方式直接關聯 Hive 分區表的最新分區,並且會自動監聽最新的 Hive 分區,當監控到新的分區後,會自動地做維表數據的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成 Kafka 流實時關聯最新的 Hive 分區實現數據打寬。

image.png

圖片出自徐榜江(雪盡)在 FFA 2020 上的分享

參數解釋

■ streaming-source.enable 開啟流式讀取 Hive 數據。

■ streaming-source.partition.include

1.latest 屬性: 只讀取最新分區數據。
2.all: 讀取全量分區數據 ,默認值為 all,表示讀所有分區,latest 只能用在 temporal join 中,用於讀取最新分區作為維表,不能直接讀取最新分區數據。

■ streaming-source.monitor-interval 監聽新分區生成的時間、不宜過短 、最短是1 個小時,因為目前的實現是每個 task 都會查詢 metastore,高頻的查可能會對metastore 產生過大的壓力。需要注意的是,1.12.1 放開了這個限制,但仍建議按照實際業務不要配個太短的 interval。

■ streaming-source.partition-order 分區策略

主要有以下 3 種,其中最為推薦的是 partition-name:

1.partition-name 使用默認分區名稱順序加載最新分區
2.create-time 使用分區文件創建時間順序
3.partition-time 使用分區時間順序

具體配置

使用 Hive 最新分區作為 Tempmoral table 之前,需要設置必要的兩個參數:

'streaming-source.enable' = 'true',  
'streaming-source.partition.include' = 'latest'

我們可以再創建一張基於 Hive 表的新表,在 DDL 的 properties 裡指定這兩個參數,也可以使用 SQL Hint 功能,在使用時通過 SQL Hint 指定 query 中表的參數。以使用 SQL Hint 為例,我們需要用 / option / 指定表的屬性參數,例如:

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest') */;

我們需要顯示地開啟 SQL Hint 功能, 在 SQL Client 中可以用 set 命令設置:

set table.dynamic-table-options.enabled= true;

在程序代碼中,可以通過 TableConfig 配置:

tblEnv.getConfig().getConfiguration().setString("table.dynamic-table-options.enabled", 
"true");

Flink 官網也給出了一個詳細的例子,這裡也簡單說明下。

--將方言設置為hive以使用hive語法  
SET table.sql-dialect=hive;      
CREATE TABLE dimension_table (      
  product_id STRING,      
  product_name STRING,      
  unit_price DECIMAL(10, 4),      
  pv_count BIGINT,      
  like_count BIGINT,      
  comment_count BIGINT,      
  update_time TIMESTAMP(3),      
  update_user STRING,      
  ...      
) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (      
  -- 在創建hive時態表時指定屬性      
  'streaming-source.enable' = 'true',      
  'streaming-source.partition.include' = 'latest',      
  'streaming-source.monitor-interval' = '12 h',      
  'streaming-source.partition-order' = 'partition-name',  -- 監聽partition-name最新分區數據    
);      

--將方言設置為default以使用flink語法  
SET table.sql-dialect=default;      
CREATE TABLE orders_table (      
  order_id STRING,      
  order_amount DOUBLE,      
  product_id STRING,      
  log_ts TIMESTAMP(3),      
  proctime as PROCTIME()      
) WITH (...);      


--將流表與hive最新分區數據關聯  
SELECT * FROM orders_table AS order     
JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim    
ON order.product_id = dim.product_id;

關聯 Hive 最新分區 Demo

工程依賴

將 Demo 工程中使用到的 connector 和 format 依賴貼到這裡,方便大家本地測試時參考。

<dependencies>  
    
  
    <dependency>  
        <groupId>mysql</groupId>  
        <artifactId>mysql-connector-java</artifactId>  
        <version>${mysql.version}</version>  
        <!--<scope>provided</scope>-->  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-jdbc_2.12</artifactId>  
        <version>${flink.version}</version>  
    </dependency>  
 
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-sql-connector-kafka_2.11</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-json</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>  
  
    <dependency>  
        <groupId>org.apache.hive</groupId>  
        <artifactId>hive-exec</artifactId>  
        <version>3.1.0</version>  
    </dependency>  
  
</dependencies>  

在 Sql Client 中註冊 HiveCatalog:

vim conf/sql-client-defaults.yaml 
catalogs: 
  - name: hive_catalog 
    type: hive 
    hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件

創建 Kafka 表

CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
    master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,  
proctime as PROCTIME()  -- PROCTIME用來和Hive時態表關聯  
) WITH (  
 'connector' = 'kafka',  
 'topic' = 'topic_name',  
 'format' = 'json',  
 'properties.bootstrap.servers' = 'host:9092',  
 'properties.group.id' = 'flinkTestGroup',  
 'scan.startup.mode' = 'timestamp',  
 'scan.startup.timestamp-millis' = '1607844694000'  
);

Flink 事實表與 Hive 最新分區數據關聯

dim_extend_shop_info 是 Hive 中已存在的表,所以我們下面用 table hint 動態地開啟維表參數。

CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,   
     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
       JOIN hive_catalog.flink_db.dim_extend_shop_info   
     /*+ OPTIONS('streaming-source.enable'='true',             'streaming-source.partition.include' = 'latest',  
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時態表  
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
    where groupID in (202042)) t  where t.rn = 1

結果數據 Sink 到 MySQL

CREATE TABLE hive_catalog.flink_db_sink.rt_aggr_bill_food_unit_rollup_flk (  
      report_date String,  
      group_id int,  
      group_name String,  
      shop_id int,  
      shop_name String,  
      brand_id BIGINT,  
      brand_name String,  
      province_name String,  
      city_name String,  
      foodcategory_name String,  
      food_name String,  
      food_code String,  
      unit String,  
      rt_food_unit_cnt double,  
      rt_food_unit_amt double,  
      rt_food_unit_real_amt double,  
    PRIMARY KEY (report_date, group_id, shop_id, brand_id, foodcategory_name, food_name, food_code, unit) NOT ENFORCED) WITH (  
    'connector' = 'jdbc',   
    'url' = 'jdbc:mysql://host:4400/db_name?autoReconnect=true&useSSL=false',  
    'table-name' = 'table-name',   
    'username' = 'username',   
    'password' = 'password'  
)  

insert into hive_catalog.flink_db_sink.rt_aggr_bill_food_unit_rollup_flk  
select reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name 
   , SUM(foodNumber)  rt_food_cnt  
   , sum(paidAmount)  rt_food_amt  
   , sum(foodAmount)  rt_food_real_amt  
   from  hive_catalog.flink_db.view_fact_bill_master  
   group by reportDate, group_id, group_name, shop_id, shop_name, brand_id, brand_name, province_name, city_name;

ORC format 的 BUG

在讀取 ORC format 的表時,無法讀取數據,我也向社區提了一個 Jira: https://issues.apache.org/jira/browse/FLINK-20576,讀取其他 format 的表不存在問題,本地測試了讀取 parquet 和 csv 都是正常的。

總結下上面的代碼,只需通過 Flink SQL 便能實現 Kafka 實時數據流關聯最新的 Hive 分區。同時我們結合了 HiveCatalog,可以複用 hive 的表和已經創建過的 kafka source 表,MySql sink 表,使得程序只需要關心具體的業務邏輯,無需關注 source/sink 表的創建,提高了代碼的複用性以及可讀性。對比之前的方案,純 SQL 的開發顯然降低了開發維護成本和用戶門檻。

Flink SQL 開發小技巧

結合 Hive catalog,持久化 source 與 sink 表,減少重複建表,使得代碼只需關注邏輯 SQL。
結合 Flink 視圖,組織好業務加工邏輯,提高 SQL 的可讀性。
利用 SQL Client 調試 SQL,程序沒問題後再打包上線,而不是直接提交到集群做測試。

Leave a Reply

Your email address will not be published. Required fields are marked *