開發與維運

MQ奪命連環11問

你們為什麼使用mq?具體的使用場景是什麼?

mq的作用很簡單,削峰填谷。以電商交易下單的場景來說,正向交易的過程可能涉及到創建訂單、扣減庫存、扣減活動預算、扣減積分等等。每個接口的耗時如果是100ms,那麼理論上整個下單的鏈路就需要耗費400ms,這個時間顯然是太長了。

如果這些操作全部同步處理的話,首先調用鏈路太長影響接口性能,其次分佈式事務的問題很難處理,這時候像扣減預算和積分這種對實時一致性要求沒有那麼高的請求,完全就可以通過mq異步的方式去處理了。同時,考慮到異步帶來的不一致的問題,我們可以通過job去重試保證接口調用成功,而且一般公司都會有核對的平臺,比如下單成功但是未扣減積分的這種問題可以通過核對作為兜底的處理方案。

使用mq之後我們的鏈路變簡單了,同時異步發送消息我們的整個系統的抗壓能力也上升了。

那你們使用什麼mq?基於什麼做的選型?

我們主要調研了幾個主流的mq,kafka、rabbitmq、rocketmq、activemq,選型我們主要基於以下幾個點去考慮:

  1. 由於我們系統的qps壓力比較大,所以性能是首要考慮的要素。
  2. 開發語言,由於我們的開發語言是java,主要是為了方便二次開發。
  3. 對於高併發的業務場景是必須的,所以需要支持分佈式架構的設計。
  4. 功能全面,由於不同的業務場景,可能會用到順序消息、事務消息等。

基於以上幾個考慮,我們最終選擇了RocketMQ。

你上面提到異步發送,那消息可靠性怎麼保證?

消息丟失可能發生在生產者發送消息、MQ本身丟失消息、消費者丟失消息3個方面。

生產者丟失

生產者丟失消息的可能點在於程序發送失敗拋異常了沒有重試處理,或者發送的過程成功但是過程中網絡閃斷MQ沒收到,消息就丟失了。

由於同步發送的一般不會出現這樣使用方式,所以我們就不考慮同步發送的問題,我們基於異步發送的場景來說。

異步發送分為兩個方式:異步有回調和異步無回調,無回調的方式,生產者發送完後不管結果可能就會造成消息丟失,而通過異步發送+回調通知+本地消息表的形式我們就可以做出一個解決方案。以下單的場景舉例。

  1. 下單後先保存本地數據和MQ消息表,這時候消息的狀態是發送中,如果本地事務失敗,那麼下單失敗,事務回滾。
  2. 下單成功,直接返回客戶端成功,異步發送MQ消息
  3. MQ回調通知消息發送結果,對應更新數據庫MQ發送狀態
  4. JOB輪詢超過一定時間(時間根據業務配置)還未發送成功的消息去重試
  5. 在監控平臺配置或者JOB程序處理超過一定次數一直髮送不成功的消息,告警,人工介入。


一般而言,對於大部分場景來說異步回調的形式就可以了,只有那種需要完全保證不能丟失消息的場景我們做一套完整的解決方案。

MQ丟失

如果生產者保證消息發送到MQ,而MQ收到消息後還在內存中,這時候宕機了又沒來得及同步給從節點,就有可能導致消息丟失。

比如RocketMQ:

RocketMQ分為同步刷盤和異步刷盤兩種方式,默認的是異步刷盤,就有可能導致消息還未刷到硬盤上就丟失了,可以通過設置為同步刷盤的方式來保證消息可靠性,這樣即使MQ掛了,恢復的時候也可以從磁盤中去恢復消息。

比如Kafka也可以通過配置做到:

acks=all 只有參與複製的所有節點全部收到消息,才返回生產者成功。這樣的話除非所有的節點都掛了,消息才會丟失。
replication.factor=N,設置大於1的數,這會要求每個partion至少有2個副本
min.insync.replicas=N,設置大於1的數,這會要求leader至少感知到一個follower還保持著連接
retries=N,設置一個非常大的值,讓生產者發送失敗一直重試

雖然我們可以通過配置的方式來達到MQ本身高可用的目的,但是都對性能有損耗,怎樣配置需要根據業務做出權衡。

消費者丟失

消費者丟失消息的場景:消費者剛收到消息,此時服務器宕機,MQ認為消費者已經消費,不會重複發送消息,消息丟失。

RocketMQ默認是需要消費者回復ack確認,而kafka需要手動開啟配置關閉自動offset。

消費方不返回ack確認,重發的機制根據MQ類型的不同發送時間間隔、次數都不盡相同,如果重試超過次數之後會進入死信隊列,需要手工來處理了。(Kafka沒有這些)

你說到消費者消費失敗的問題,那麼如果一直消費失敗導致消息積壓怎麼處理?

因為考慮到時消費者消費一直出錯的問題,那麼我們可以從以下幾個角度來考慮:

  1. 消費者出錯,肯定是程序或者其他問題導致的,如果容易修復,先把問題修復,讓consumer恢復正常消費
  2. 如果時間來不及處理很麻煩,做轉發處理,寫一個臨時的consumer消費方案,先把消息消費,然後再轉發到一個新的topic和MQ資源,這個新的topic的機器資源單獨申請,要能承載住當前積壓的消息
  3. 處理完積壓數據後,修復consumer,去消費新的MQ和現有的MQ數據,新MQ消費完成後恢復原狀

那如果消息積壓達到磁盤上限,消息被刪除了怎麼辦?

這。。。他媽都刪除了我有啥辦法啊。。。冷靜,再想想。。有了。


最初,我們發送的消息記錄是落庫保存了的,而轉發發送的數據也保存了,那麼我們就可以通過這部分數據來找到丟失的那部分數據,再單獨跑個腳本重發就可以了。如果轉發的程序沒有落庫,那就和消費方的記錄去做對比,只是過程會更艱難一點。

說了這麼多,那你說說RocketMQ實現原理吧?

RocketMQ由NameServer註冊中心集群、Producer生產者集群、Consumer消費者集群和若干Broker(RocketMQ進程)組成,它的架構原理是這樣的:

  1. Broker在啟動的時候去向所有的NameServer註冊,並保持長連接,每30s發送一次心跳
  2. Producer在發送消息的時候從NameServer獲取Broker服務器地址,根據負載均衡算法選擇一臺服務器來發送消息
  3. Conusmer消費消息的時候同樣從NameServer獲取Broker地址,然後主動拉取消息來消費

為什麼RocketMQ不使用Zookeeper作為註冊中心呢?

我認為有以下幾個點是不使用zookeeper的原因:

  1. 根據CAP理論,同時最多隻能滿足兩個點,而zookeeper滿足的是CP,也就是說zookeeper並不能保證服務的可用性,zookeeper在進行選舉的時候,整個選舉的時間太長,期間整個集群都處於不可用的狀態,而這對於一個註冊中心來說肯定是不能接受的,作為服務發現來說就應該是為可用性而設計。
  2. 基於性能的考慮,NameServer本身的實現非常輕量,而且可以通過增加機器的方式水平擴展,增加集群的抗壓能力,而zookeeper的寫是不可擴展的,而zookeeper要解決這個問題只能通過劃分領域,劃分多個zookeeper集群來解決,首先操作起來太複雜,其次這樣還是又違反了CAP中的A的設計,導致服務之間是不連通的。
  3. 持久化的機制來帶的問題,ZooKeeper 的 ZAB 協議對每一個寫請求,會在每個 ZooKeeper 節點上保持寫一個事務日誌,同時再加上定期的將內存數據鏡像(Snapshot)到磁盤來保證數據的一致性和持久性,而對於一個簡單的服務發現的場景來說,這其實沒有太大的必要,這個實現方案太重了。而且本身存儲的數據應該是高度定製化的。
  4. 消息發送應該弱依賴註冊中心,而RocketMQ的設計理念也正是基於此,生產者在第一次發送消息的時候從NameServer獲取到Broker地址後緩存到本地,如果NameServer整個集群不可用,短時間內對於生產者和消費者並不會產生太大影響。

那Broker是怎麼保存數據的呢?

RocketMQ主要的存儲文件包括commitlog文件、consumequeue文件、indexfile文件。

Broker在收到消息之後,會把消息保存到commitlog的文件當中,而同時在分佈式的存儲當中,每個broker都會保存一部分topic的數據,同時,每個topic對應的messagequeue下都會生成consumequeue文件用於保存commitlog的物理位置偏移量offset,indexfile中會保存key和offset的對應關係。


CommitLog文件保存於${Rocket_Home}/store/commitlog目錄中,從圖中我們可以明顯看出來文件名的偏移量,每個文件默認1G,寫滿後自動生成一個新的文件。


由於同一個topic的消息並不是連續的存儲在commitlog中,消費者如果直接從commitlog獲取消息效率非常低,所以通過consumequeue保存commitlog中消息的偏移量的物理地址,這樣消費者在消費的時候先從consumequeue中根據偏移量定位到具體的commitlog物理文件,然後根據一定的規則(offset和文件大小取模)在commitlog中快速定位。

Master和Slave之間是怎麼同步數據的呢?

而消息在master和slave之間的同步是根據raft協議來進行的:

  1. 在broker收到消息後,會被標記為uncommitted狀態
  2. 然後會把消息發送給所有的slave
  3. slave在收到消息之後返回ack響應給master
  4. master在收到超過半數的ack之後,把消息標記為committed
  5. 發送committed消息給所有slave,slave也修改狀態為committed

你知道RocketMQ為什麼速度快嗎?

是因為使用了順序存儲、Page Cache和異步刷盤。

  1. 我們在寫入commitlog的時候是順序寫入的,這樣比隨機寫入的性能就會提高很多
  2. 寫入commitlog的時候並不是直接寫入磁盤,而是先寫入操作系統的PageCache
  3. 最後由操作系統異步將緩存中的數據刷到磁盤

什麼是事務、半事務消息?怎麼實現的?

事務消息就是MQ提供的類似XA的分佈式事務能力,通過事務消息可以達到分佈式事務的最終一致性。

半事務消息就是MQ收到了生產者的消息,但是沒有收到二次確認,不能投遞的消息。

實現原理如下:

  1. 生產者先發送一條半事務消息到MQ
  2. MQ收到消息後返回ack確認
  3. 生產者開始執行本地事務
  4. 如果事務執行成功發送commit到MQ,失敗發送rollback
  5. 如果MQ長時間未收到生產者的二次確認commit或者rollback,MQ對生產者發起消息回查
  6. 生產者查詢事務執行最終狀態
  7. 根據查詢事務狀態再次提交二次確認

最終,如果MQ收到二次確認commit,就可以把消息投遞給消費者,反之如果是rollback,消息會保存下來並且在3天后被刪除。

Leave a Reply

Your email address will not be published. Required fields are marked *