開發與維運

Kafka 消息丟失與消費精確一次性

消息丟失的場景
如果Kafka Producer使用“發後即忘”的方式發送消息,即調用producer.send(msg)方法來發送消息,方法會立即返回,但此時並不能說明消息已經發送成功。消息發送方式詳見初次邂逅Kafka生產者。
如果在消息過程中發生了網絡抖動,那麼消息就會丟失;或發送的消息本身不符合要求,如大小超過Broker端的承受能力等(消息太大的情況在生產中實際遇到過,最後通過在發送前將消息分包,再依次發送,解決了該問題)。
解決該問題的方法就是:Producer要使用帶回調通知的方法發送消息,即producer.send(msg, callback)。回調方法callback可以告訴我們消息是否真的提交成功了,一旦出現消息發送失敗的情況,可以使用代碼進行容錯及補救。
例如:網絡抖動導致的消息丟失,可以使Producer重試;消息不合格,則將消息格式進行調整,再發送。Producer使用帶回調的消息發送API,可以及時發現消息是否發送失敗並作相應處理。
消費者丟失數據
Consumer端丟失數據主要體現在:拉取了消息,並提交了消費位移,但是在消息處理結束之前突然發生了宕機等故障。消費者重生後,會從之前已提交的位移的下一個位置重新開始消費,之前未處理完成的消息不會再次處理,即相當於消費者丟失了消息。
解決Consumer端丟失消息的方法也很簡單:將位移提交的時機改為消息處理完成後,確認消費完成了一批消息再提交相應的位移。這樣做,即使處理消息的過程中發生了異常,由於沒有提交位移,下次消費時還會從上次的位移處重新拉取消息,不會發生消息丟失的情況。
具體的實現方法為,Consumer在消費消息時,關閉自動提交位移,由應用程序手動提交位移。
Broker端丟失數據
Broker端丟失數據主要有以下幾種情況:
原來的Broker宕機了,卻選舉了一個落後Leader太多的Broker成為新的Leader,那麼落後的這些消息就都丟失了,可以禁止這些“unclean”的Broker競選成為Leader;
Kafka使用頁緩存機制,將消息寫入頁緩存而非直接持久化至磁盤,將刷盤工作交由操作系統來調度,以此來保證高效率和高吞吐量。如果某一部分消息還在內存頁中,未持久化至磁盤,此時Broker宕機,重啟後則這部分消息丟失,使用多副本機制可以避免Broker端丟失消息;
避免消息丟失的最佳實踐
不使用producer.send(msg),而使用帶回調的producer.send(msg, callback)方法;
設置acks = all。acks參數是Producer的一個參數,代表了對消息“已提交”的定義。如果設置成all,則表示所有的Broker副本都要接收到消息,才算消息“已提交”,是最高等級的“已提交”標準;
設置retries為一個較大的值,retries表示Producer發送消息失敗後的重試次數,如果發生了網絡抖動等瞬時故障,可以通過重試機制重新發送消息,避免消息丟失;
設置unclean.leader.election.enable = false。這是一個Broker端參數,表示哪些Broker有資格競選為分區的Leader。如果一個落後Leader太多的Follower所在Broker成為了新的Leader,則必然會導致消息的丟失,故將該參數設置為false,即不允許這種情況的發生;
設置replication.factor >= 3。Broker端參數,表示每個分區的副本數大於等於3,使用冗餘的機制來防止消息丟失;
設置min.insync.replicas > 1。Broker端參數,控制的是消息至少被寫入多少個副本蔡栓是“已提交”,將該參數設置成大於1可以提升消息持久性;
確保replication.factor > min.insync.replicas。若兩者相等,則如果有一個副本掛了,整個分區就無法正常工作了。推薦設置為:replication.factor = min.insync.replicas + 1;
確保消息消費完再提交位移,將Consumer端參數enable.auto.commit設置為fasle,關閉位移自動提交,使用手動提交位移的形式。
精確一次消費
目前Kafka默認提供的消息可靠機制是“至少一次”,即消息不會丟失。上一節中我們知道,Producer如果發送消息失敗,則可以通過重試解決,若Broker端的應答未成功發送給Producer(如網絡抖動),Producer此時也會進行重試,再次發送原來的消息。這就是Kafka默認提供消息至少一次性的原因,不過這可能會導致消息重複發送。
如果需要保證消息消費的“最多一次”,那麼禁止Producer的重試即可。但是寫入失敗的消息如果不重試則會永遠丟失。是否有其他方法來保證消息的發送既不丟失,也不重複消費?或者說即使Producer重複發送了某些消息,Broker端也能夠自動去重。
Kafka實際上通過兩種機制來確保消息消費的精確一次:
冪等性(Idempotence)
事務(Transaction)
冪等性
所謂的冪等,簡單說就是對接口的多次調用所產生的結果和調用一次是一致的。在Kafka中,Producer默認不是冪等性的,Kafka於0.11.0.0版本引入該特性。設置參數enable.idempotence為true即可指定Producer的冪等性。開啟冪等生產者後,Kafka會自動進行消息的去重發送。為了實現生產者的冪等性,Kafka引入了producer id(後簡稱PID)和序列號(sequence number)兩個概念。
生產者實例在被創建的時候,會分配一個PID,這個PID對用戶完全透明。對於每個PID,消息發送到的每一個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將對應的序列號值加1。
Broker端在內存中為每一對維護一個序列號SN_old。針對生產者發送來的每一條消息,對其序列號SN_new進行判斷,並作相應處理。
只有SN_new比SN_old大1時,即SN_new = SN_old + 1時,broker才會接受這條消息;
SN_new < SN_old + 1,說明消息被重複寫入,broker直接丟棄該條消息;
SN_new > SN_old + 1,說明中間有數據尚未寫入,出現了消息亂序,可能存在消息丟失的現象,對應的生產者會拋出OutOfOrderSequenceException。
注意:序列號針對,這意味著冪等生產者只能保證單個主題的單一分區內消息不重複;其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性,這裡的會話即可以理解為:Producer進程的一次運行。當重啟了Producer進程之後,則冪等性保證就失效了。
事務
冪等性並不能跨多個分區運作,而Kafka事務則可以彌補這個缺陷。Kafka從0.11版本開始提供了對事務的支持,主要在read committed隔離級別。它能保證多條消息原子性地寫入到目標分區,同時也能寶恆Consumer只能看到事務成功提交的消息。
Producer端配置
事務型Producer能保證消息原子性地寫入多個分區。批量的消息要麼全部寫入成功,要麼全部失敗。並且,事務型Producer在重啟後,Kafka依然保證它們發送消息的精確一次處理。開啟事務型Producer的配置如下:
和冪等性Producer一樣,開啟enable.idempotence = true。
設置Producer端參數transcational.id。最好為其設置一個有意義的名字。
設置了事務型的Producer可以調用一些事務API,如下:initTransaction、beginTransaction、commitTransaction和abortTransaction,分別對應事務的初始化、事務開啟、事務提交和事務終止。
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
}
catch (KafkaExecption e) {
producer.abortTransaction();
}
上述代碼中,事務型Producer可以保證record1和record2要麼全部提交成功,要麼全部寫入失敗。實際上,即使寫入失敗,Kafka也會將它們寫入到底層的日誌中,也就是說Consumer還是會看到這些消息,具體Consumer端讀取事務型Producer發送的消息需要另行配置。
Consumer端配置

讀取事務型Producer發送的消息時,Consumer端的isolation.level參數表徵著事務的隔離級別,即決定了Consumer以怎樣的級別去讀取消息。該參數有以下兩個取值:
read_uncommitted:默認值,表面Consumer能夠讀到Kafka寫入的任何消息,不論事務型Producer是否正常提交了事務。顯然,如果啟用了事務型的Producer,則Consumer端參數就不要使用該值,否則事務是無效的。
read_committed:表面Consumer只會讀取事務型Producer成功提交的事務中寫入的消息,同時,非事務型Producer寫入的所有消息對Consumer也是可見的。
總結

Kafka所提供的消息精確一次消費的手段有兩個:冪等性Producer和事務型Producer。
冪等性Producer只能保證單會話、單分區上的消息冪等性;
事務型Producer可以保證跨分區、跨會話間的冪等性;
事務型Producer功能更為強大,但是同時,其效率也會比較低下。
本文來源於:
奈學開發者社區

Leave a Reply

Your email address will not be published. Required fields are marked *