開發與維運

Flink 1.11 新特性之 SQL Hive Streaming 簡單示例

7月7日,Flink 1.11 版本發佈,與 1.10 版本相比,1.11 版本最為顯著的一個改進是 Hive Integration 顯著增強,也就是真正意義上實現了基於 Hive 的流批一體。

本文用簡單的本地示例來體驗 Hive Streaming 的便利性並跟大家分享體驗的過程以及我的心得,希望對大家上手使用有所幫助。

添加相關依賴

測試集群上的 Hive 版本為 1.1.0,Hadoop 版本為 2.6.0,Kafka 版本為 1.0.1。

<properties>
  <scala.bin.version>2.11</scala.bin.version>
  <flink.version>1.11.0</flink.version>
  <flink-shaded-hadoop.version>2.6.5-10.0</flink-shaded-hadoop.version>
  <hive.version>1.1.0</hive.version>
</properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-kafka_${scala.bin.version}</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
    <version>${flink-shaded-hadoop.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
  </dependency>

另外,別忘了找到 hdfs-site.xml 和 hive-site.xml,並將其加入項目。

創建執行環境

Flink 1.11 的 Table/SQL API 中,FileSystem Connector 是靠增強版 StreamingFileSink 組件實現,在源碼中名為 StreamingFileWriter。我們知道,只有在 Checkpoint 成功時,StreamingFileSink 寫入的文件才會由 Pending 狀態變成 Finished 狀態,從而能夠安全地被下游讀取。所以,我們一定要打開 Checkpointing,並設定合理的間隔。

1.jpg

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))

註冊 HiveCatalog

val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,              // catalog name
  "default",                // default database
  "/Users/lmagic/develop",  // Hive config (hive-site.xml) directory
  "1.1.0"                   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)

創建 Kafka 流表

Kafka Topic 中存儲的是 JSON 格式的埋點日誌,建表時用計算列生成事件時間與水印。1.11 版本 SQL Kafka Connector 的參數相比 1.10 版本有一定簡化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")

tableEnv.executeSql(
  """
    |CREATE TABLE stream_tmp.analytics_access_log_kafka (
    |  ts BIGINT,
    |  userId BIGINT,
    |  eventType STRING,
    |  fromType STRING,
    |  columnType STRING,
    |  siteId BIGINT,
    |  grouponId BIGINT,
    |  partnerId BIGINT,
    |  merchandiseId BIGINT,
    |  procTime AS PROCTIME(),
    |  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
    |  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
    |) WITH (
    |  'connector' = 'kafka',
    |  'topic' = 'ods_analytics_access_log',
    |  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'
    |  'properties.group.id' = 'flink_hive_integration_exp_1',
    |  'scan.startup.mode' = 'latest-offset',
    |  'format' = 'json',
    |  'json.fail-on-missing-field' = 'false',
    |  'json.ignore-parse-errors' = 'true'
    |)
  """.stripMargin
)

前面已經註冊了 HiveCatalog,故在 Hive 中可以觀察到創建的 Kafka 流表的元數據(注意該表並沒有事實上的列)。

hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;
OK
# col_name              data_type               comment


# Detailed Table Information
Database:               stream_tmp
Owner:                  null
CreateTime:             Wed Jul 15 18:25:09 CST 2020
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka
Table Type:             MANAGED_TABLE
Table Parameters:
    flink.connector         kafka
    flink.format            json
    flink.json.fail-on-missing-field    false
    flink.json.ignore-parse-errors  true
    flink.properties.bootstrap.servers  kafka110:9092,kafka111:9092,kafka112:9092
    flink.properties.group.id   flink_hive_integration_exp_1
    flink.scan.startup.mode latest-offset
    flink.schema.0.data-type    BIGINT
    flink.schema.0.name     ts
    flink.schema.1.data-type    BIGINT
    flink.schema.1.name     userId
    flink.schema.10.data-type   TIMESTAMP(3)
    flink.schema.10.expr    TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))
    flink.schema.10.name    eventTime
    flink.schema.2.data-type    VARCHAR(2147483647)
    flink.schema.2.name     eventType
    # 略......
    flink.schema.9.data-type    TIMESTAMP(3) NOT NULL
    flink.schema.9.expr     PROCTIME()
    flink.schema.9.name     procTime
    flink.schema.watermark.0.rowtime    eventTime
    flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
    flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '15' SECOND
    flink.topic             ods_analytics_access_log
    is_generic              true
    transient_lastDdlTime   1594808709

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
    serialization.format    1
Time taken: 1.797 seconds, Fetched: 61 row(s)

創建 Hive 表

Flink SQL 提供了兼容 HiveQL 風格的 DDL,指定 SqlDialect.HIVE 即可( DML 兼容還在開發中)。

為了方便觀察結果,以下的表採用了天/小時/分鐘的三級分區,實際應用中可以不用這樣細的粒度(10分鐘甚至1小時的分區可能更合適)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")

tableEnv.executeSql(
  """
    |CREATE TABLE hive_tmp.analytics_access_log_hive (
    |  ts BIGINT,
    |  user_id BIGINT,
    |  event_type STRING,
    |  from_type STRING,
    |  column_type STRING,
    |  site_id BIGINT,
    |  groupon_id BIGINT,
    |  partner_id BIGINT,
    |  merchandise_id BIGINT
    |) PARTITIONED BY (
    |  ts_date STRING,
    |  ts_hour STRING,
    |  ts_minute STRING
    |) STORED AS PARQUET
    |TBLPROPERTIES (
    |  'sink.partition-commit.trigger' = 'partition-time',
    |  'sink.partition-commit.delay' = '1 min',
    |  'sink.partition-commit.policy.kind' = 'metastore,success-file',
    |  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'
    |)
  """.stripMargin
)

Hive 表的參數複用了 SQL FileSystem Connector 的相關參數,與分區提交(Partition Commit)密切相關。僅就上面出現的4個參數簡單解釋一下。

  • sink.partition-commit.trigger:觸發分區提交的時間特徵。默認為 processing-time,即處理時間,很顯然在有延遲的情況下,可能會造成數據分區錯亂。所以這裡使用 partition-time,即按照分區時間戳(即分區內數據對應的事件時間)來提交。
  • partition.time-extractor.timestamp-pattern:分區時間戳的抽取格式。需要寫成 yyyy-MM-dd HH:mm:ss 的形式,並用 Hive 表中相應的分區字段做佔位符替換。顯然,Hive 表的分區字段值來自流表中定義好的事件時間,後面會看到。
  • sink.partition-commit.delay:觸發分區提交的延遲。在時間特徵設為 partition-time 的情況下,當水印時間戳大於分區創建時間加上此延遲時,分區才會真正提交。此值最好與分區粒度相同,例如若 Hive 表按1小時分區,此參數可設為 1 h,若按 10 分鐘分區,可設為 10 min。
  • sink.partition-commit.policy.kind:分區提交策略,可以理解為使分區對下游可見的附加操作。 metastore 表示更新 Hive Metastore 中的表元數據, success-file 則表示在分區內創建 _SUCCESS 標記文件。

當然,SQL FileSystem Connector 的功能並不限於此,還有很大自定義的空間(如可以自定義分區提交策略以合併小文件等)。具體可參見官方文檔。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#streaming-sink

流式寫入 Hive

注意將流表中的事件時間轉化為 Hive 的分區。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(
  """
    |INSERT INTO hive_tmp.analytics_access_log_hive
    |SELECT
    |  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,
    |  DATE_FORMAT(eventTime,'yyyy-MM-dd'),
    |  DATE_FORMAT(eventTime,'HH'),
    |  DATE_FORMAT(eventTime,'mm')
    |FROM stream_tmp.analytics_access_log_kafka
    |WHERE merchandiseId > 0
  """.stripMargin
)

來觀察一下流式 Sink 的結果吧。

2.jpg

上文設定的 Checkpoint Interval 是 20 秒,可以看到,上圖中的數據文件恰好是以 20 秒的間隔寫入的。由於並行度為 3,所以每次寫入會生成 3 個文件。分區內所有數據寫入完畢後,會同時生成 _SUCCESS 文件。如果是正在寫入的分區,則會看到 .inprogress 文件。

通過 Hive 查詢一下,確定數據的時間無誤。

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))
    > FROM hive_tmp.analytics_access_log_hive
    > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';
OK
2020-07-15 23:23:00 2020-07-15 23:23:59
Time taken: 1.115 seconds, Fetched: 1 row(s)

流式讀取 Hive

要將 Hive 表作為流式 Source,需要啟用 Dynamic Table Options,並通過 Table Hints 來指定 Hive 數據流的參數。以下是簡單地通過 Hive 計算商品 PV 的例子。

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)

val result = tableEnv.sqlQuery(
  """
     |SELECT merchandise_id,count(1) AS pv
     |FROM hive_tmp.analytics_access_log_hive
     |/*+ OPTIONS(
     |  'streaming-source.enable' = 'true',
     |  'streaming-source.monitor-interval' = '1 min',
     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
     |) */
     |WHERE event_type = 'shtOpenGoodsDetail'
     |AND ts_date >= '2020-07-15'
     |GROUP BY merchandise_id
     |ORDER BY pv DESC LIMIT 10
   """.stripMargin
)

result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()

三個 Table Hint 參數的含義解釋如下。

  • streaming-source.enable:設為 true,表示該 Hive 表可以作為 Source。
  • streaming-source.monitor-interval:感知 Hive 表新增數據的週期,以上設為 1 分鐘。對於分區表而言,則是監控新分區的生成,以增量讀取數據。
  • streaming-source.consume-start-offset:開始消費的時間戳,同樣需要寫成 yyyy-MM-dd HH:mm:ss 的形式。

更加具體的說明仍然可參見官方文檔。

https://links.jianshu.com/go?to=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Ftable%2Fhive%2Fhive_streaming.html%23streaming-reading

最後,由於 SQL 語句中有 ORDER BY 和 LIMIT 邏輯,所以需要調用 toRetractStream() 方法轉化為回撤流,即可輸出結果。

The End

Flink 1.11 的 Hive Streaming 功能大大提高了 Hive 數倉的實時性,對 ETL 作業非常有利,同時還能夠滿足流式持續查詢的需求,具有一定的靈活性。

感興趣的同學也可以自己上手測試。

Leave a Reply

Your email address will not be published. Required fields are marked *