開發與維運

RocketMQ 核心設計理念

本文由阿里雲釘群直播整理而來。

講師介紹:
丁威:中通科技技術平臺部資深架構師。《RocketMQ技術內幕》作者,社區直播講師。開源愛好者,關注分佈式、雲計算、大數據領域。目前主要負責消息中間件與全鏈路壓測的實施與落地。

本次分享將主要圍繞以下四個方面展開
1、如何學習RocketMQ之我所見。

2、路由註冊、發現、剔除設計模式。

3、消息發送高可用設計。

4、RocketMQ存儲設計。

5、RocketMQ消息消費。

6、RocketMQ HA(主從同步)。

一、如何學習RocketMQ之我所見

image.png

為大家介紹自身學習RocketMQ經驗,給大家學習提供了借鑑的思路。

首先是通讀RocketMQ官方文檔,特別是RocketMQ 3.x版本設計手冊,從全局瞭解RocketMQ的設計理念,需要解決的問題等。從官方文檔設計理念,大家會發現官方文檔中不僅囊括了RocketMQ,還包括了MQ中間件涉及的各個方面,比如MQ通用的角色如Prodcuer消息生產者,Consumer消息消費者,Push Consumner推模式,Pull Consumner拉模式,Producer Group生產者組,Consumner Group消費者組通過這些名詞介紹你就會對RocketMQ有一個整體的瞭解,瞭解RocketMQ要解決那些問題,如何訂閱和發佈的實現機制是什麼,還有RocketMQ存儲特點零拷貝原理,相信大家肯對會這些疑問產生好奇心,提出問題。通過反覆閱讀官方文檔對RocketMQ的整體有大概的認識,同時也會給大家帶來一些思考,如果讓你來實現這些功能你會怎麼做,如果自己不會,是不是可以帶著這個問題看看RocketMQ是怎樣實現的。這時大家會發現學習新東西不是非常困難。

其次下載RocketMQ源碼不要立馬查看源碼,大家可以重點關注example包這是官方提供的使用示例。在閱讀RocketMQ源碼中,首先關注的是example官方示例,通過對官方提供示例的學習可以知曉RocketMQ的使用方式,注意事項,從而達到使用目的。但在大家學會使用之後要想駕馭RocketMQ並且能夠處理工作中遇到得各種問題,分析源碼是最好的方式。首先通過分析源碼的過程中大家通過他的實現細節,瞭解工作原理方便為以後生產實際工作中在出現問題時提供解決問題的思路和方法。另一方面是RocketMQ的代碼質量非常高,RocketMQ擁有高性能。為了實現高性能會涉及到很多方面,比如說RocketMQ在多線程方面的實踐,在高併發編程中基於文件的設計模式,基於Nitty的網絡通信等待這些在分析源碼的過程中能夠提升大家的工作中處理問題的能力,對我們大家自身編程能力的提升是非常有幫助的。在這裡特別提示分析源碼首先要有一定給分佈式的基礎,在大家會發現分析源碼有難度時,介紹了自身如何通過六個月的時間打下堅實的基礎,真正看懂源碼的過程。還介紹到個人博客,大家可以通過逆序查看從而瞭解講師在查看RocketMQ源碼之前做了那些準備工作,比如集合,鎖,Netty基礎,對於學習分佈式系統同學,這些基礎都是需要掌握的。

如果大家對源碼實在看不懂時又想全面的學習RocketMQ的知識體系,可以通過閱讀《RocketMQ技術內幕》一書來進行學習,書中對RocketMQ的原理,設計理念,實踐細節講的非常透徹,相信對大家的學習會起到一定的幫助。

官方地址:http://rocketmq.apache.org/
講師CSDN地址:https://me.csdn.net/prestigeding

二、路由註冊、發現、提出設計模式

首先介紹了業界我們接觸經常到的一些中間件的服務註冊-發現的實現原理。在此這列舉了Dubbo中註冊-發現機制中的實時推送模式。介紹Dubbo是如何提供服務的,首先服務提供者啟動時會向註冊中心發送消息註冊自己也就是告知註冊中心自己可以提供服務了,那如何註冊呢?註冊中心在收到服務提供者發送的消息後會創建節點名稱為的服務提供者的全路徑名例如com.springboot.dubbo.demo.Demoservice文件,同時在對應節點目錄產生四個節點如下圖所示。

image.png

在服務提供者啟動時通過事件機制,發送事件消息的方式推送給註冊中心,註冊中心收到消息後在對應providers目錄增加一條記錄保存該服務。對於服務消費者通過訂閱註冊中心消息知曉那些服務提供者可以提供服務從而服務被消費者遠程調用。如果註冊中心providers下面的節點增加或者是減少時,註冊中心都會通過事件機制發送消息及時的通知到服務消費者。同時為了保證服務高可用,服務提供者每30s向註冊中心發送消息告知自己的狀態正常。這種模式的優點是實時性非常高,只要提供者有變動,消費者都能及時的知道,缺點就是服務註冊實現較複雜,至少每個服務都需要具備消息的發佈-訂閱能力,而且,像zookeeper內部實現複雜,並不適用於RocketMQ,有點大財小用。

RocketMQ核心概念
1)Topic:消息主題,一級消息類型,生產者向其發送消息。

2)生產者:也稱為消息發佈者,負責生產併發送消息至主題Topic。

3)消費者:也稱為消息訂閱者,負責從主題Topic 接收並消費消息。

4)消息:生產者向 主題Topic 發送並最終傳送給消費者的數據和(可選)屬性的組合。

5)消息屬性:生產者可以為消息定義的屬性,包含 Message Key 和 Tag。

6)Group:一類生產者或消費者,這類生產者或消費者通常生產或消費同一類消息,且消息發佈或訂閱的邏輯一致。

7)生產者集群:用來表示發送消息應用,一個生產者集群下包含多個生產者實例,可以是多臺機器,也可以是一臺機器的多個進程,或者一個進程的多個生產者對象。
一個生產者集群可以發送多個主題Topic 消息。發送分佈式事務消息時,如果生產者中途意外宕機,消息存儲者Broker 會主動回調生產者集群的任意一臺機器來確認事務狀態。

8)消費者集群:用來表示消費消息應用,一個消費者集群下包含多個消費者實例,可以是多臺機器,也可以是多個進程,或者是一個進程的多個消費者對象。一個消費者集群下的多個消費者以均攤方式消費消息。
如果設置的是廣播方式,那麼這個消費者集群下的每個實例都消費全量數據。

RocketMQ如何事件服務發現-註冊-拉取模式

image.png

RocketMQ使用拉取模式實現主題Topic路由有什麼缺點呢?
1.主題Topic路由中心(NameServer)Topic是基於最終一致性,極端情況下會出現數據不一致。

2.客戶端無法實時感知路由信息的變化,例如某臺消息存儲Brocker自身進程為關閉,但停止向NameServer發送心跳包,但生產者無法立即感知該Brocker服務器的異常,會對消息發送造成一定的可用性?

RocketMQ並不打算解決上述問題,因為基於上述的設計,RocketMQ NameServer的實現非常簡單高效,至於消息發送的高可用,則有消息發送客戶端自己保證。

RocketMQ的設計遵循的一個設計理念:崇尚“缺陷美”,簡單,高性能。

image.png

如果在知道定時拉取模式的不足時,有哪些方法方式去解決這些問題,帶著這些問題去研究RocketMQ源碼可以獲得更大的收穫,事半功倍。

三、消息發送高可用設計

問題:RocketMQ消息發送如何實現高可用?

答案:RocketMQ消息發送分三步首先是Topic路由尋址,其次是選擇消息隊列,最後是消息發送重試,Broker規避。比如存在主題Topic A有兩條路由存儲消息的Broker A 和存儲消息的Broker B 共8個隊列(Broker A q1, Broker A q2, Broker A q3,Broker A q4, Broker B q1, Broker B q2, Broker B q3,Broker B q4)。在RocketMQ客戶端向RocketMQ集群發送消息的時候,首先要選擇隊列對於多個隊列的選擇系統默認使用輪詢機制。比如發送第一條消息時如果選擇Broker A收到了消息,那麼發送第二條消息則會選擇Broker B,發送第三條消息重新開始一輪選擇Broker A發送,依次不斷輪詢發送,這也是RocketMQ默認的負載均衡機制。

如果RocketMQ客戶端選擇Broker A q1發送一條消息後,Broker A因為一些其他的原因導致Broker A不可用,RocketMQ客戶端嘗試進行重新發送,RocketMQ客戶端第一次選擇Broker A q2發送,第二次RocketMQ客戶端選擇Broker A q2發送,發現第一次和第二次都失敗,RocketMQ客戶端會重試兩次,共發送三次。當Broker A故障導致不可用時,無論對Broker A重試多少次都會失敗,RocketMQ客戶端重試三次失敗,則該消息被告知發送失敗。RocketMQ採用規避機制解決次問題,首先RocketMQ客戶端第一次向Broker A發送消息失敗時在第二次選擇Broker時會規避掉Broker A 的所有隊列(Broker A q1, Broker A q2, Broker A q3,Broker A q4),也就是說Broker A的所有隊列不參加選擇,也就是第二次選擇會選擇Broker B 上的隊列,這樣可以保證第一次消息發送失敗後,第二次可以成功發送消息,從而實現高可用。RocketMQ為了保證高可用提供了另外一種機制設置一個時間在RocketMQ客戶端第一次向Broker A發送消息失敗後在設定時間內RocketMQ客戶端不再向不可用的Broker A發送消息,進一步保證高可用。

建議大家看發送消息的源碼可用重點關注上面提到的高可用的機制,進一步探尋RocketMQ高可用的設計思想重試加規避。

image.png

四、RocketMQ存儲設計設計

RocketMQ的存儲設計是RocketMQ的最重要的部分,採取了一種數據與索引分離的存儲方法。有效降低文件資源、IO資源,內存資源的損耗。即便是阿里這種海量數據,高併發場景也能夠有效降低端到端延遲,並具備較強的橫向擴展能力。作為一款高性能的MQ要具有一下特點。

1.吞度量Tps很高,能夠支持高併發。

2.響應延時要很短。

3.支持海量消息的堆積能力。

以上特點離不開RocketMQ存儲機制。

首先介紹RocketMQ存儲設計整體組織方式

image.png

其次是RocketMQ存儲設計之CommitLog文件,CommitLog是消息存儲文件,所有主題Topic消息到達Broker後按順序存儲在CommitLog文件中的。每個CommitLog文件默認大小為1GB,固定文件大小方便內存映射 。通過對RocketMQ源碼分析,學習RocketMQ如何完成內存映射的實現方式給大家一些借鑑的思想。RocketMQ對CommitLog這樣的定長文件理解為一個邏輯的物理文件,巧妙的構造了文件名,比如第一文件名是00000000000000000000是以物理磁盤上文件的偏移量為文件名,對於第二個文件名0000000000010733741824就是以第一文件的偏移量作為文件名的。RocketMQ對CommitLog這樣設計的優點能夠快速定位到一條消息到達Brocker後落在那個文件中,擁有很高檢索的效率。

image.png

對於一條按順序寫入CommitLog文件的消息,雖然極大的提高了文件的寫入性能,但對於消息讀取消息就會很慢,為了解決讀取速度慢的問題RocketMQ引入ConsumeQueue文件類似於kaffka的隊列文件稱為消息消費隊列文件。ConsumeQueue文件是對於CommitLog文件的基於Topic的索引文件,主要用於消費者根據Topic消息消費時,其組織方式為/topic/queue,同一隊列存在多個文件,ConsumeQueue設計及其巧妙,每個條目使用固定長度(8字節CommitLog物理偏移量,4字節消息長度,8字節tag的 hashCode),這裡不是存儲tag的原始字符串,而是存儲hashCode,目的就是確保每個條目的長度固定,可以使用訪問類似數組下標的方式快速定位條目,極大的提高CommitLog的讀取性能,試想一下,消息消費者根據Topic,消息消費進度(ConsumeQueue邏輯偏移量),即第幾個ConsumeQueue條目,這樣根據消費進度去訪問消息的方法為使用邏輯偏移量logicOffset*20即可找到條目的起始偏移量(ConsumeQueue文件中的偏移量),然後讀取該偏移量後20個字節即得到了一個條目,無需遍歷整個ConsumeQueue文件。

image.png

相信大家在探究源碼的過程中深刻理解上面的設計理念
然後是RocketMQ基於文件的Hash索引。類比mysql的Hash索引方式,基於文件的HashMap方式。提供了通過消息屬性檢索消息的機制,使用定長的方式,可以像使用數組一樣去方便的檢索。比如想要把一個訂單編號的數據,首先要把訂單編號的HashCode放在,通過HashCode在500W個Hash槽中取出一個,再去判斷取出的hash槽中有沒有消息,如果hash槽位為-1則有數據,
37:24IndexFile文件基於物理磁盤文件按實現Hash索引。其文件有40字節的文件,500W個Hash槽組成。每個hash槽為4個字節,最後由2000W的Index條目組成,每個條目由20個字節構成,分別為4字節索引key的HashCode、8字節消息物理偏移量、4字節時間戳、4字節的前一個Index條目(Hash衝突的鏈表結構)。

image.png

1、 內存映射。

2 、基於文件定長設計,應用數組的結構,方便檢索。

3 基於HashCode的設計。

五、RocketMQ消息消費設計

首先介紹RocketMQ消息消費概要。消息消費通常需要考慮消息隊列負載、消費模式、拉去機制、消息過濾、消息消費(處理消息)、消費進度反饋、消息消費限流等方面。

1.消息隊列負載模式:RocketMQ集群內(同一消費組內)的消費者共同承擔主題Topic下所有消息的消費,即一條消息只能被集群中一個消費者消費。

RocketMQ的隊列負載原則是一個消費者可以共同承擔同一主題下的多個消息消費隊列,但同一個消息消費隊列同一時間只允許被分配給一個消費者。

2.消息消費模式:RocketMQ執行集群消費和廣播消費兩種模式。

3.消息拉取模式:RocketMQ消息拉取支持推、拉兩種模式,其本質為拉模式。

4.消息消費:RocketMQ支持順尋消息、併發消息兩種模式,每個消費組使用獨立的線程池來處理拉取到的消息。

5.RocketMQ的消息消費端的限流主要包含兩個維度:

1)消息堆積數量

   如果消息消費處理隊列中的消息條數超過1000條會出發消費端的流控,其具體做法是放棄本次拉取動作,並且延遲50ms後將放入該拉取任務放入到pullRequestQueue中,每1000條流控會打印一次消費端流控日誌。

2)消息堆積大小
如果處理隊列中堆積的消息總內存大小超過100M,同樣觸發一次流控。

image.png

併發消息拉取與消費流程
首先一個消費客戶端有兩個線程(PullMessageService線程和RebalanceService線程)工作,PullMessageService線程負責拉取消息,從阻塞隊列pullRequestQueue中通過take的方式獲取一條拉取消息的任務,如果隊列pullRequestQueue為空時,則PullMessageService線程阻塞。怎麼喚醒隊列,則需要RebalanceService線程每20s進行一次隊列重新負載,獲取主題Topic的所有消息隊列與當前訂閱該主題的所有消費者按照隊列負載算法分配隊列

併發消息拉取與消費的幾個核心要點:

1.PullMessageService線程與RebalanceService線程的交互。

2.每個消費組一個一個線程池,用來異步處理消息。

3.消息進度反饋。

RocketMQ消息消費-消費進度反饋機制
拉取流程:

1.PullMessageService從Brocker服務器拉取一批消息,默認32條。

2.先存儲到本地處理隊列ProcessQueue。

3.提交到消費組線程池,異步執行。

4.提交到線程池後,繼續在從Brocker服務器拉取下一批消息。

思考:由於是併發消息,例如thread-1線程在消費消息msg1,thread-2線程在消費消息msg2,thread-3線程在消費消息msg3,此時如果thread-3線程先消費完消息msg3,但thread-1線程,thread-2線程還沒處理完消息msg1,消息msg2,那thread-1線程是如何向消息存儲者Brocker反饋消息msg3的偏移量?

在這裡提示重複消費的問題是由業務方處理。

RocketMQ主從同步
RocketMQ的主從同步主要是為了讀寫分離並不提供主從服務切換功能,當主節點服務宕機後,RocketMQ只提供讀取服務不提供寫入服務。

實現步驟

1.首先啟動Master服務並在指定端口進行監聽。

2.客戶端啟動,主動連接Master服務,連接TCP連接。

3.客戶端每5s(糾正不是5s只要有消息就會拉取,讀不到消息時休眠5s再次拉取消息)的間隔時間向服務端拉取消息,如果第一次拉取的話,先獲取本地commitLog文件中最大的偏移量,以該偏移量向服務端拉取數據。

4.服務端解析請求,並返回一批數據給客戶端。

5.客戶端收到一批數據後,將消息寫入本地。
commitLog文件中,然後向Master服務彙報拉取進度,並更新下一次待拉取偏移量。

6.然後重複第三步。

問題1:主,從服務器都在運行中,消息消費者是從主節點拉取消息還是從從節點拉取?

答:默認情況下,RocketMQ消息消費者從主服務器拉取,當主服務器積壓的消息超過物理內存的40%,則建議從從服務器拉取,但如果slaveReadEnable為false,表示從服務器不可讀,從服務器也不會接管消息拉取。

問題2:當消息消費者向從服務器拉取消息後,會一直從從服務器拉取?

答:不是的,分如下情況:

1.如果從服務器的slaveReadEnable設置為false,則下次拉取,從主服務器拉取。

2.如果從服務器允許讀取並且從服務器積壓的消息為超過其物理內存的30%,下次拉取使用的Brocker為訂閱組的Brocker指定的Brocker服務器,改制默認為0,代表為主服務器。

3.如果從服務器允許讀取並且從服務器積壓的消息超過了其物理內存的30%,下次拉取使用的Brocker為訂閱組的whichBrockerWhenConsumeSlowly指定的Brocker服務器,該值默認為1,代表從服務器。

問題3:主從服務消息消費進度是如何同步的?

答:消息消費進度的同步是單向的,從服務器開啟一個定時任務,定時從主服務器同步消息消費進度;無論消息消費者是從主服務器拉取的消息還是從從服務器拉取的消息,在向Broker反饋消息消費進度時,優先向主服務器彙報;消息消費者向主服務器拉取消息時,如果消息消費者內存中存在消息消費進度時,主服務器會嘗試更新消息消費進度。

如果主服務器宕機後恢復後,消息消費者是否會重複消費?

當主服務器宕機恢復後,從服務器在同步消費進度時同步到的消息消費進度還是主服務器宕機前的進度,從而造成重複消費。只要消息消費者不重啟的情況下,消息消費進度還是實時的。還是之前的,如果在此期間消息消費者重啟了,那麼重複消費就無法避免。在RocketMQ使用過程中很多地方會引發重複消費
大家可以通過以上講述內容為切入點更深入的瞭解RocketMQ。

直播答疑

問題1:如果發送的是順序消息,Brocker掛了,怎麼做規避策略?還是說就是無法發送?

丁慧:順序消息指的是消息消費方式兩種(順序消費和併發消費)RocketMQ能夠保證進入消費者的消息按順序消費,並不是消息發送者。例如訂單場景下,我們會使用訂單編號作為key,RocketMQ能夠保證同一個單號的所有消息發送到同一個隊列。如果Brocker宕機後隊列數量會減少,消息會重新發送就無法保證發送的順序性。如果要保證發送的順序性,可以使用一個Topic一個隊列,這樣會犧牲你的高可用性。RocketMQ順序消息指的是消費的順序性,而不是發送的順序性。

問題2:那些場景會用到MQ?

丁威: MQ的使用場景1對流量進行削鋒填谷操作,使用他的消息堆積能力。例如雙十一期間,訂單量Tps是平時的幾倍或者幾十倍,如果通過服務來處理,無法抵擋訂單洪峰,可以使用MQ進行降為打擊,比如你的訂單到達系統後先將訂單放入MQ中,然後消費者的數量是有限的,可以平穩的通過異步的方式處理訂單,保證系統高可用,不會造成你的服務在關鍵時刻宕機。使用MQ作為大量消息的擋箭牌,抵擋訂單洪峰。2解耦系統模塊,降低系統複雜性。比如當用戶登陸後,要送優惠券、送積分時,就可以在用戶登錄後發送消息到MQ,你的優惠券系統,積分系統等都可以訂閱MQ接到消息後再去派發優惠券或是積分等其他業務。

問題3:業務端如何解決重複消費的問題?

開發者:可以藉助key+redis去重。

Leave a Reply

Your email address will not be published. Required fields are marked *