開發與維運

ECS上如何部署Kafka

一、kafka介紹及原理

kafka是由Apache軟件基金會發布的一個開源流處理平臺,由Scala和Java編寫。它是一種高吞吐量的分佈式發佈的訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。

這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop一樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

> 1、kafka的特性

kafka是一種高吞吐量的分佈式發佈訂閱消息系統,具有以下特性:

通過磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能;
持久性:使用文件性存儲,日誌文件存儲消息,需要寫入硬盤,採用達到一定閾值才寫入硬盤,從而減少磁盤I/O,如果kafka突然宕機,數據會丟失一部分;
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒數百萬的消息;
支持通過kafka服務器和消費機集群來分區消息;
支持Hadoop並行數據加載。

> 2、kafka相關術語

Broker:消息中間件處理節點,一個Kafka節點就是一個broker,一個或者多個Broker可以組成一個Kafka集群;
Topic:Kafka根據topic對消息進行歸類,發佈到Kafka集群的每條消息都需要指定一個topic;
Producer:消息生產者,向Broker發送消息的客戶端;
Consumer:消息消費者,從Broker讀取消息的客戶端;
ConsumerGroup:每個Consumer屬於一個特定的Consumer Group,一條消息可以發送到多個不同的Consumer Group,但是一個Consumer Group中只能有一個Consumer能夠消費該消息;
Partition:物理上的概念,一個topic可以分為多個partition,每個partition內部是有序的。

> 3、Topic和Partition的區別

一個topic可以認為一個一類消息,每個topic將被分成多個partition,每個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為long型的數字,它唯一標記一條消息。每條消息都被append到partition中,是順序寫磁盤,因此效率非常高(順序寫磁盤比隨機寫內存的速度還要高,這是kafka高吞吐率的一個很重要的保證)。
image.png

每一條消息被髮送到broker中,會根據partition規則選擇被存儲到哪一個partition(默認採用輪詢的方式進行寫入數據)。如果partition規則設置合理,所有消息可以均勻分佈到不同的partition裡,這樣就實現了水平擴展。(如果一個topic對應一個文件,那這個文件所在的機器I/O將會成為這個topic的性能瓶頸,而partition解決了這個問題),如果消息被消費則保留append.log兩天

> 4、kafka的架構

image.png
如上圖所示,一個典型的kafka體系架構包括若干Producer(可以是服務器日誌,業務數據,頁面前端產生的page view等),若干個broker(kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer(Group),以及一個Zookeeper集群。kafka通過Zookeeper管理集群配置,選舉出leader,以及在consumer group發生變化時進行重新調整。Producer使用push(推)模式將消息發佈到broker,consumer使用pull(拉)模式從broker訂閱並消費消息。

zookeeper群集中有兩個角色:leader和follower,leader對外提供服務,follower負責leader裡面所產生內容同步消息寫入生成時產生replicas(副本);
kafka的高可靠性的保證來源於其健壯的副本(replicas)策略。通過調節其副本相關參數,可以使得kafka在性能和可靠性之間運轉之間的遊刃有餘。kafka從0.8.x版本開始提供partition級別的複製的。
5、kafka的文件存儲機制
kafka中消息是以topic進行分類的,生產者通過topic向kafka broker發送消息,消費者通過topic讀取數據。然而topic在物理層面又能以partition為分組,一個topic可以分為若干個partition,partition還可以細分為segment,一個partition物理上由多個segment組成。

為了便於說明問題,假設這裡只有一個kafka集群,且這個集群只有一個kafka broker,也就是隻有一臺物理機。在這個kafka broker的server.properties配置文件中定義kafka的日誌文件存放路徑以此來設置kafka消息文件存儲目錄,與此同時創建一個topic:test,partition的數量為4,啟動kafka就可以在日誌存放路徑中看到生成4個目錄,在kafka文件存儲中,同一個topic下有多個不同的partition,每個partition為一個目錄,partition的名稱規則為:topic名稱+有序序號,第一個序號從0開始。

segment是什麼?

如果就以partition為最小存儲單位,我們可以想象當Kafka producer不斷髮送消息,必然會引起partition文件的無限擴張,這樣對於消息文件的維護以及已經被消費的消息的清理帶來嚴重的影響,所以這裡以segment為單位又將partition細分。每個partition(目錄)相當於一個巨型文件被平均分配到多個大小相等的segment(段)數據文件中(每個segment 文件中消息數量不一定相等)這種特性也方便old segment的刪除,即方便已被消費的消息的清理,提高磁盤的利用率。每個partition只需要支持順序讀寫就行。

segment文件由兩部分組成,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數據文件。這兩個文件的命令規則為:partition全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值(偏移量),數值大小為64位,20位數字字符長度,沒有數字用0填充。

> 6、數據的可靠性和持久性保證

當producer向leader發送數據時,可以通request.required.acks參數來設置數據可靠性的級別:

1(默認):producer的leader已成功收到數據並得到確認。如果leader宕機了,則會丟失數據;
0 :producer無需等待來自broker的確認而繼續發送下一批消息。這種情況下數據傳輸效率最高,但是數據可靠性確是最低的;
-1:producer需要等待所有follower都確認接收到數據後才算一次發送完成,可靠性最高。

> 7、leader選舉

一條消息只有被所有follower都從leader複製過去才會被認為已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而造成數據丟失。而對於producer而言,它可以選擇是否等待消息commit。

一種非常常用的選舉leader的方式是“少數服從多數”,在進行數據的複製過程中,存在多個follower,並且每個follower的數據速度都不相同,當leader宕機後,當前的follower上誰的數據最多誰就是leader。

> 二、部署單機kafka

1、部署kafka

kafka服務依賴於JAVA環境,如果沒有,自行安裝
[root@kafka ~]# wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz
[root@kafka ~]# tar zxf kafka_2.11-2.2.1.tgz
[root@kafka ~]# mv kafka_2.11-2.2.1/ /usr/local/kafka
[root@kafka ~]# cd /usr/local/kafka/bin/

啟動zookeeper

[root@kafka bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties &

啟動kafka

[root@kafka bin]# ./kafka-server-start.sh ../config/server.properties &

查看端口是否在監聽

[root@kafka bin]# netstat -anput | grep 9092
tcp6 0 0 :::9092 :::* LISTEN 43326/java
tcp6 0 0 192.168.171.134:44388 192.168.171.134:9092 ESTABLISHED 43326/java
tcp6 0 0 192.168.171.134:9092 192.168.171.134:44388 ESTABLISHED 43326/java

由於kafka是通過zookeeper來調度的,所以,即使是單機kafka也需要啟動zookeeper服務,kafka的安裝目錄下是默認集成了zookeeper的,直接啟動即可。
2、測試kafka

在本機創建kafka,副本數量為1,分區數量為1

[root@kafka bin]# ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看本機的topic

[root@kafka bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
test

發送消息到test

[root@kafka bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test

aaa
bbb
ccc

開啟新的終端,進行讀取消息測試,“--from-beginning”表示從開頭讀取

[root@kafka bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
aaa
bbb
ccc

Leave a Reply

Your email address will not be published. Required fields are marked *