開發與維運

通過流處理平臺Kafka與雲原生數據倉庫PostgreSQL做實時數據交互

一、概述

Apache Kafka是一種開源流數據處理平臺,因為其部署簡單、性能良好的特性得到廣泛應用。本文介紹基於Apache Kafka平臺將按約定格式與雲原生數據倉庫PostgreSQL版做實時數據交互,同步數據至雲原生數據倉庫PostgreSQL版(以下簡稱ADBPG)的鏈路。
本文內容安排如下:第二章“背景知識”會介紹本鏈路中組件的基礎知識以及一些名詞解釋,第三章“原理與架構”會介紹鏈路的基本架構以及工作原理,第四章“開發入門”會介紹實際的搭建部署步驟,第五章“格式約定”會介紹通過本鏈路寫入的數據格式要求,第六章“參數配置”會給出本鏈路涉及的一些參數定義並給出推薦的配置值,第七章介紹本鏈路的可用性和錯誤恢復情況,第八章介紹本鏈路的exactly once保證情況。

二、背景知識

1)Kafka

Kafka是一種開源流處理平臺,是Apache社區的頂級項目之一,在流處理領域得到廣泛應用。本鏈路基於Kafka平臺實現到ADBPG的數據同步,使用時用戶將數據寫入Kafka topic,本鏈路會按配置文件消費對應topic內的數據,並將數據同步至ADBPG。

2)Zookeeper

Zookeeper是Apache社區的一套開源分佈式服務框架,Kafka集群依賴Zookeeper存儲消息隊列的狀態信息。

3)Kafka connect

Kafka connect是kafka社區維護的一套用於數據數據同步的框架(注意是同步,如果做etl更應該使用flink或spark),用於同步像關係數據庫到HDFS這樣的鏈路。Kafka Connect基於Kafka消息隊列存儲狀態信息,並支持故障恢復和task重分佈,並不直接依賴於Zookeeper。

3-1)Connectors

Connector分為Source端和Sink端,通過將"Connectors"拼接可以定義數據從哪裡讀出或者寫入哪裡。一個source connector可以看做是將外部源數據讀入特定kafka topic的接入器,一個sink connector可以看做是將kafka topic數據導出到外部的導出器。
kafka connect還包含kafka instance的概念,一個"connector instance"是負責管理Kafka和其他系統之間的進行數據複製的邏輯概念(更像是其他框架的job概念)。所有這些用於讀取或寫入的實現都被定義為“connector插件”。"connector instance"和"connector插件"都可以被稱為"connectors"。

3-2)Task

Tasks是Connect數據模型中的負責數據拷貝的主要概念。每個Connector instance都是由一系列tasks組成。有了Task概念,kafka connect可以將一個connector拆分成多個task來並行化執行。Task之間無狀態,task執行的中間狀態存儲在特定的topics裡面(config.storage.topic和status.storage.topic)。任何時候有task被start、stop或restart,通過這種方式來提供彈性,可擴展的數據pipeline。
5.png
注意kafka connect可以用分佈式和standalone兩種方式部署。如果是standalone模式,數據被存儲在本地磁盤,反之則存儲在kafka topic內(用於控制已消費的狀態用於rebalance的status.storage.topic,和存儲connecotor配置的status.storage.topic)。

3-3)Worker

Connector和task都是邏輯單位,必須被調度在一個Kafka connect worker進程內。有兩種部署方式,standalone和distributed。因為最終是用於生產環境,我們只討論distributed模式。
和其他分佈式框架一樣,Distributed模式為kafka connect提供了擴展性和錯誤容忍的特性支持。已經啟動的worker使用group.id配置來代表啟動的worker是在同一個集群上。如果向集群中添加了worker或worker掛掉,其他的worker會感知到並且自動發起投票,將task重新分配其他可用的機器上。

三、原理與架構

6.png
基於Kafka將數據同步至ADBPG的鏈路工作流程如上圖。
源端通過各種鏈路將數據寫入某個Kafka Topic後,本鏈路對應啟動Adbpg Sink Connector按配置文件的配置讀取對應的Kafka Topic的數據。SinkTask會消費對應topic內的數據,更新存儲於Kafka內的消費位點記錄(offset),解析對應的數據並轉化為JDBC操作發送到雲原生數據倉庫ADBPG。
本鏈路不會對源端的寫入方式和源端類別進行限制,只對源端寫入Kafka的數據格式進行約定(詳見第五章),用戶可以直接將文件或者單條消息通過管道丟入Kafka Topic,也可以通過開源社區(比如DebeziumConfluent)豐富的source connector資源,從MySQL/PostgreSQL/SQLServer等數據庫實時同步數據。

四、開發入門

0)版本要求

組件 版本要求 推薦版本
Java 8+ 8
Zookeeper 3.4+ 3.4.14
Kafka kafka_2.11-2.1.1+ kafka_2.11-2.1.1
Kakfa connect 一般採用kafka內置組件,與kafka保持一致即可 一般採用kafka內置組件,與kafka保持一致即可
ADBPG 6.0 6.0

1)安裝Java並配置環境變量

首先需要去官網下載jdk安裝包並安裝到對應平臺,並配置好JAVA_HOME環境變量。安裝完畢後可以通過輸入java -version檢查是否已經配置好java環境。
7.png

2)安裝zookeeper

需要去zookeeper官網下載安裝包並對應安裝配置。
安裝完畢後可以執行path_to_zookeeper/bin/zkServer.sh status 檢查zookeeper是否正常運行

3)安裝kafka集群

需要去官網下載kafka安裝包,並對應安裝配置。安裝完畢後,可以執行命令查看那kafka集群當前topic列表。
path_to_zookeeper/bin/kafka-topics.sh --list --zookeeper localhost:2181
這裡提供一個參考安裝包,可用於核對版本以及參考配置
https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka2_1_1.tar.gz

4)安裝kafka connect jar包

下載jar包,並放置入path_to_kafka/plugin_jars路徑下。

下載地址:https://adbpg-public.oss-cn-beijing.aliyuncs.com/kafka-connect-jdbc-5.3.1-jar-with-dependencies.jar

5)創建ADBPG集群及目標表

5-1)開通ADBPG實例

如果尚未開通ADBPG實例,請參照以下文檔開通6.0版本ADBPG實例(4.3版本不支持),並創建初始賬號:
https://help.aliyun.com/document_detail/50200.html?spm=a2c4g.11186623.6.557.133412bfHEMkSR
為了達到最佳的寫入性能和安全性,建議kafka集群與要寫入的ADB PG實例位於同一region、同一VPC下,通過內網通信。

5-2)設置ADBPG實例白名單

ADBPG實例需要設置白名單保證Kafka集群能夠訪問ADBPG實例:在ADBPG控制檯點擊進入所創建的ADBPG實例,點擊數據安全性-添加白名單分組。
8.png
將對應的網段添加進ADBPG實例白名單,點擊確定。

5-3)連接ADBPG實例並創建要寫入的目標表

執行寫入任務前需要在ADB PG實例中創建要寫入的目標表,這裡給出一個例子:

create table test15(                  
b1 bigint,
b2 smallint,
b3 smallint,
b4 int,
b5 boolean,
b6 real,                      
b7 double precision,          
b8 double precision,          
b9 date,
b10 time with time zone,
b11 timestamp with time zone,
b12 text,
b15 json
);

6)啟動kafka connect service

參考官網文檔配置kafka conenct service並啟動,這裡提供命令參考:
nohup bin/connect-distributed.sh config/connect-distributed.properties >> connect.log
啟動過程中,kafka connect service會不斷打印出現有plugin的加載過程,以及一些全局參數配置:
9.png
Kafka connect啟動時需要提供配置文件,這裡提供一份配置的demo,各個參數的詳細含義見第六章。

vim path_to_kafka/config/connect-distributed.properties
group.id=connect-cluster
# The name of the Kafka topic where connector offsets are stored
offset.storage.topic=connect-offsets
# Replication factor used when creating the offset storage topic
offset.storage.replication.factor=1
# Interval at which to try committing offsets for tasks.
offset.flush.interval.ms=10000
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. 
bootstrap.servers=localhost:9092
# List of paths separated by commas (,) that contain plugins 
plugin.path=path_to_kafka/plugin_jars
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
config.storage.topic=config-storage
status.storage.topic=config-status
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1

7)啟動adbpg sink connector
配置json格式的adbpg sink connector配置文件,並參考官方文檔向kafka connect提交adbpg sink任務,這裡提供命令參考:
配置文檔實例如下,全部參數配置請參考第六章。

{
     "name":"adb4pg-jdbc-sink",
     "config": {
        "name":"adb4pg-jdbc-sink",
        "topics":"server1.dbo.t1",
        "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
        "connection.url":"jdbc:postgresql://yourinstance-url:port/yourdbname",
        "connection.user":"yourname",
        "connection.password":"******",
        "col.names":"a,b,c,d,e,f,g",
        "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
        "pk.fields":"a",
        "schemas.enable" : "false",
        "target.tablename":"t1",
        "tasks.max":"1",
        "auto.create":"false",
        "table.name.format":"t1",
        "batch.size":"1",
        "value.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter"
        }
}

啟動adbpg sink connector:

#查看當前集群kafka connector列表:
curl 127.0.0.1:8083/connectors

#提交配置文件(這裡以adb4pg-sink.json為例),並啟動adbpg sink connector
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-sink.json 

#刪除connectors
curl -s -X DELETE http://localhost:8083/connectors/adb4pg-sink

#查看connector狀態
curl http://localhost:8083/connectors/adb4pg-jdbc-sink/status

10.png

五、格式約定

本鏈路支持對目標庫進行三種操作:insert、delete和update,對於單條操作對應的kafka消息的格式要求為json,json內各個字段的數據格式要求如下。

操作類別 op字段 before字段 after字段 demo
insert c 無限制 要插入記錄列名與取值對應的取值對應的字典 {"before": null,"after": {"id": 1,"name": "44"},"op": "c"}
delete d 要刪除記錄列名與取值對應的取值對應的字典 無限制 {"after": null,"before": {"id": 1,"name": "44"},"op": "d"}
update u 所要更新的原始數據的列名與取值對應的取值對應的字典 更新後記錄列名與取值對應的取值對應的字典 {"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u"}

對於某個topic內含有如下消息數據:

{"before": null,"after": {"id": 1,"name": "44"},"op": "c","transaction": null}
{"before": {"id":2},"after": {"id": 1,"name": "44"},"op": "u","transaction": null}
{"after": null,"before": {"id": 1,"name": "44"},"op": "d","transaction": null}

對應會產生的同步操作為:本鏈路對應會向目標庫表插入一條id=1,name='44' 的數據;然後按id=2更新數據,將id更新為1,name更新為'44';最後會向目標表刪除id=1,name='44'的數據。

六、參數配置

1)zookeeper參數配置

這裡僅列出本鏈路所必需的參數配置,更多參數配置請參考zookeeper官方文檔

參數名 參數含義 推薦值
dataDir 數據目錄
dataLogDir 日誌目錄
clientPort 端口 2181
server.x 各個節點配置 與實際的機器相符

2) kafka參數配置

這裡僅列出本鏈路所必需的參數配置,更多參數配置請參考kafka官方文檔。

參數名 參數含義 推薦值
broker.id 各個broker的唯一id
log.dirs 日誌路徑
offsets.topic.replication.factor 副本數 3
host.name 當前機器
host 與實際相符
port 端口 9092
zookeeper.connect 所連接的zk地址 與實際的zk地址相符
auto.create.topics.enable 是否允許自動創建topic true
log.retention.hours topic內數據過期時間,單位h 72
log.cleanup.policy topic內過期數據清理策略 delete,即為刪除

3)kafka connect參數配置

這裡僅列出本鏈路所必需的參數配置,更多參數配置請參考kafka connect官方文檔。

參數名 參數含義 推薦值
group.id 集群id,kafka connect集群上所有節點取值必須統一,標誌屬於同一集群
offset.storage.topic 記錄kafka-connect位點的topic名稱
offset.storage.replication.factor kafka connect相關topic副本數 3
offset.flush.interval.ms kafka connect flush間隔,單位ms 10000
bootstrap.servers 連接到的kafka集群地址 與實際集群相符
plugin.path kafka connector jar包路徑

4)adbpg sink connector參數配置

adbpg sink connector支持參數配置如下:

參數名 參數含義 默認值 推薦值
name kafka connector命名
topics 源數據所在topic 與源數據所在topic相符
connector.class connector所對應的實現類 必須寫io.confluent.connect.jdbc.Adb4PgSinkConnector
connector.url 目標ADBPG庫的jdbc地址
connector.user ADBPG用戶名
connector.password ADBPG密碼
col.names 寫入ADBPG目標表列名
col.types 寫入ADBPG目標表列類型
pk.fields 主鍵列表,以逗號分割
tasks.max 啟動線程數 1 1
target.tablename 寫入ADBPG目標表名
writeMode 寫入方式,取值為insert或upsert insert insert

七、可用性和錯誤恢復

1)高可用

組件 是否支持高可用 故障切換方式 依賴情況
zookeeper leader掛掉會根據paxos協議重新選主,單個follower掛掉不影響可用性。 不依賴其他組件
kafka 寫入多副本,單個broker掛掉不影響可用性。 kafka消息隊列會向zk寫入狀態量,zk不可用影響kafka集群可用性;
kafka connect 由kafka集群維護kafka connect的可用性,kafka connect掛掉之後會觸發task rebalance,進行task遷移,單個kafka connect掛掉不影響可用性。 kafka connect會向kafka topic寫入消費到的位點(offset),kafka集群不可用影響kafka connect可用性;kafka connect不會向zk寫入數據。

2)錯誤恢復

kafka/kafka connect/zookeeper三個服務組件均支持高可用,三副本配置下,掛掉任意一個或兩個節點均可自動恢復任務進度。若同組件三個節點同時掛掉,通常為硬件或網絡故障,需要按具體情況排查。

3)Task rebalancing

為了“負載均衡”每個worker上面的task數量和機器負載,kafka connect也有一個rebalance的概念,負責在connecotor啟動時,有worker掛掉時,將task遷移到更空閒的worker上。但是要注意,如果是task fail,不會觸發rebalance,這種時候通常是代碼或數據出現了異常需要人工介入處理。
11.png

八、冪等性/exactly once保證

在目標表配置主鍵,並且配置writeMode='upsert'的場景下,本鏈路能夠保證exactly-once;在其他場景下,本鏈路保證at-least-once.
kafka connector task發生失敗,kafka自身提供的offset機制進行recovery通常只能恢復到最近一次提交的offset,而不是失敗時的狀態。upsert模式可以避免有主鍵的表約束衝突或數據重複的情況。

Leave a Reply

Your email address will not be published. Required fields are marked *