開發與維運

Flink Ecosystems

作者:李銳

本文主要介紹了Flink SQL連接外部系統的原因和原理,介紹了常用的Flink SQL Connector,包括Kafka Connector、Elasticsearch Connector、FileSystem Connector、Hive Connector等等。本文主要分為2個部分:

  1. Flink SQL連接外部系統的實現原理
  2. Flink SQL常用的Connector

Flink SQL連接外部系統的實現原理

在講原理之前,我們先回答為什麼要使用Flink SQL?SQL是一個標準化的數據查詢語言,而在Flink SQL中,我們可以通過Catalog與各種系統集成,同時我們也開發了很豐富的內置操作符和函數,而且Flink SQL還可以同時處理批數據和流數據,能極大地提高數據分析的工作效率。

那麼Flink SQL為什麼又要對接外部系統呢?Flink SQL本身是一個流計算的引擎,它本身不維護任何數據,所以對Flink SQL而言,所有的數據都存儲在外部系統當中,也就是所有的表都是在外部系統中,我們只有對接這些外部系統,才能夠對數據進行實際的讀寫。

image.png

在講解Flink SQL如何與外部系統對接之前,我們先看一下Flink內部DataStream和Table是如何做轉換的?假設已經有一個DataStream程序了,那麼我們可以把它轉換成Table的方式來使用,用Flink SQL的一些強大功能對它進行查詢,可以通過下列例子理解,類似於Flink SQL內部的對接。

image.png

Connector

對於Flink SQL而言,對接外部系統的組件被稱作Connector。下面這張表裡列出了Flink SQL所支持的幾個比較常用的Connector,比如Filesystem對接的是文件系統,JDBC對接的是外部的關係型數據庫等等。每一個Connector主要負責實現一個source和一個sink, source負責從外部系統中讀數據,sink負責把數據寫入到外部系統中。

image.png

Format

Format指定了數據在外部系統中的格式,比如一個Kafka的表,它裡面的數據可能是CSV格式存儲的,也有可能是JSON格式存儲的,所以我們在指定一個Connector連接外部表的時候,通常也需要指定Format是什麼,這樣Flink 才能正確地去讀寫這個數據。

image.png

Catalog

Catalog可以連接外部系統的元數據,然後把元數據信息提供給Flink,這樣Flink可以直接去訪問外部系統中已經創建好的表或者database等等。比如Hive的元數據是存儲在Hive Metastore中的,那麼Flink如果想訪問Hive表的話,就有一個HiveCatalog來對接元數據。除此之外,它還可以幫助Flink 來持久化它自身的元數據。比如說HiveCatalog既可以幫Flink 來訪問Hive,也可以幫Flink來存儲一些Flink所創建的表的信息,這樣就不需要每次啟動Session的時候重新建表了,直接去讀取Hive Metastore中建好的表就可以了。

image.png

如何創建一張表來指定外部的 connector?下面的例子是通過DDL來創建的一張表,這是一個比較標準的Create Table語句,其中所有跟Connector相關的參數都在with語句當中指定,比如這裡的Connector等於Kafka等等。

image.png

當通過DDL創建了一張表後,這個表是如何在Flink當中被使用的?這裡有一個很關鍵的概念就是Table Factory。在這個黃色的框裡面,我們可以通過DDL建表,或者可以通過Catalog從外部系統中拿到,然後被轉化成Catalog Table對象。當我們在SQL語句中引用Catalog Table時,Flink會為這張表創建對應的source或者是sink,創建source和sink的這個模塊兒就叫做Table Factory。

獲取Table Factory的方式有兩種,一個是Catalog本身綁定了一個Table Factory,另一種是通過Java的SPI來確定Table Factory,但是它查找的時候要正好有一個配對才不會報錯。

image.png

Flink SQL常用的Connector

Kafka Connector

Kafka Connector是用得最多的,因為Flink是一個流計算的引擎,而Kafka又是最流行的消息隊列,所以用Flink的用戶大部分也都在用Kafka。如果我們要創建Kafka的表,就需要指定一些特定的參數,比如將Connector字段指定成Kafka,還有Kafka對應的topic等,我們可以在下圖看到這些參數及其所代表的的含義。

image.png

要使用Kafka Connector,就需要添加Kafka一些依賴的Jar包,根據所使用的Kafka版本不一樣,添加的Jar包也不太一樣,這些Jar包都可以在官網上下載到。

image.png

Elasticsearch Connector

Elasticsearch Connector只實現了Sink,所以只能往ES裡去寫,而不能從裡面讀。它的Connector類型可以指定成ES6或者ES7;Hosts就是指定的ES的各個節點,通過域名加端口號的形式;Index是指定寫ES的index,類似於傳統數據庫當中的一張表;Document Type類似於傳統數據庫的表裡面的某一行,不過在ES7裡不需要指定。

image.png

ES的Sink支持append和upsert兩種模式,如果這張ES表在定義的時候指定了PK,那麼Sink就會以upsert模式工作,如果沒有指定PK,就以append模式來工作,但是像ROW和MAP等類型是不能作為PK的。

image.png

同樣,使用ES也需要指定額外的依賴,針對不同的ES版本添加ES Connector。

image.png

FileSystem Connector

這個Connector對接的是一個文件系統,它讀寫的是這個文件系統上的文件。這裡所說的FileSystem指的是Flink的FileSystem抽象,它支持很多種不同的實現,比如支持本地文件系統、Hadoop、S3、OSS等不同的實現。同時它還支持分區,採取與Hive相似的分區目錄結構,但分區信息不需要註冊到Catalog中。

image.png

Hive Connector

Hive應該是最早的SQL引擎,在批處理場景中大部分用戶都在使用。Hive Connector可以分為兩個層面,首先在元數據上,我們通過HiveCatalog來對接Hive元數據,同時我們提供HiveTableSource、HiveTableSink來讀寫 Hive的表數據。

image.png

使用Hive Connector需要指定Hive Catalog,這裡是一個例子,展示如何指定Hive Catalog。

image.png

使用Hive Connector也需要添加一些額外的依賴,大家可以根據所使用的Hive版本來選擇對應的Jar包。

image.png

除了連接外部系統外,我們也有內置的Connector,它們一方面是幫助新的用戶能夠儘快地上手,體驗Flink SQL強大的功能,另一方面也能幫助Flink的開發人員做一些代碼的調試。

DataGen Connector

DataGen Connector是一個數據生成器。比如這裡創建了一個DataGen的表,指定了幾個字段。把Connector的類型指定成DataGen,這個時候去讀這張表,Connector會負責生成數據,也就是說數據是生成出來的,並不是事先要存儲在某個地方。然後用戶可以對DataGen Connector做一些比較細粒度的控制,比如可以指定每秒鐘生成多少行數據,然後某個字段可以指定它通過sequence也就是從小到大來創建,也可以指定通過random的方式來創建等等。

image.png

Print Connector

Print Connector提供Sink功能,負責把所有的數據打印到標準輸出或者標準錯誤輸出上,打印的格式是前面會帶一個row kind。創建 print的表的時候只需要把Connector類型指定成print就可以了。

image.png

BlackHole Connector

BlackHole Connector也是一個Sink,它會丟棄掉所有的數據,也就是說數據寫過來它什麼都不做就丟掉了,主要是可以用來做性能的測試。創建BlackHole你只需要把Connector類型指定成BlackHole就可以了。

image.png

Demo可以參考https://github.com/flink-china/sql-training/wiki/%E7%94%9F%E6%80%81%E4%B8%8E%E5%86%99%E5%85%A5%E5%A4%96%E9%83%A8%E8%A1%A8

社區二維碼.jpg

Leave a Reply

Your email address will not be published. Required fields are marked *