雲計算

什麼是消息隊列Rocket MQ| 《Rocket MQ 使用排查指南》第一章

下一章:發送問題排查 | 《Rocket MQ 使用排查指南》第二章>>>

點擊免費下載
《Rocket MQ 使用排查指南》>>>

test

也可以PC端點擊https://developer.aliyun.com/topic/download?id=820下載

什麼是消息隊列Rocket MQ

核心概念

消息隊列 RocketMQ 版是阿里雲基於 Apache RocketMQ 構建的低延遲、高 併發、高可用、高可靠的分佈式消息中間件。消息隊列 RocketMQ 版既可為分佈式 應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆 積、高吞吐、可靠重試等特性。

產品功能與特性

消息隊列 RocketMQ 版在阿里雲多個地域(Region)提供了高可用消息雲服務。 單個地域內採用多機房部署,可用性極高,即使整個機房都不可用,仍然可以為應用 提供消息發佈服務。

消息隊列 RocketMQ 版提供 TCP 和 HTTP 協議的多語言接入方式,方便不 同編程語言開發的應用快速接入消息隊列 RocketMQ 版消息雲服務。您可以將應 用部署在阿里雲 ECS、企業自建雲,或者嵌入到移動端、物聯網設備中與消息隊列 RocketMQ 版建立連接進行消息收發;同時,本地開發者也可以通過公網接入消息隊 列 RocketMQ 版服務進行消息收發。

image.png

系統部署架構

系統部署架構如下圖所示。

image.png

圖中所涉及到的概念如下所述:

● Name Server:是一個幾乎無狀態節點,可集群部署,在消息隊列 RocketMQ 版中提供命名服務,更新和發現 Broker 服務。

● Broker:消息中轉角色,負責存儲消息,轉發消息。分為 Master Broker 和 Slave Broker,一個 Master Broker 可以對應多個 Slave Broker,但是一個 Slave Broker 只能對應一個 Master Broker。Broker 啟動後需要完成一次將 自己註冊至 Name Server 的操作;隨後每隔 30s 定期向 Name Server 上報 Topic 路由信息。

● 生產者:與 Name Server 集群中的其中一個節點(隨機)建立長鏈接(Keepalive),定期從 Name Server 讀取 Topic 路由信息,並向提供 Topic 服務的 Master Broker 建立長鏈接,且定時向 Master Broker 發送心跳。

● 消費者:與 Name Server 集群中的其中一個節點(隨機)建立長連接,定期從 Name Server 拉取 Topic 路由信息,並向提供 Topic 服務的 Master Broker、 Slave Broker 建立長連接,且定時向 Master Broker、Slave Broker 發送心 跳。Consumer 既可以從 Master Broker 訂閱消息,也可以從 Slave Broker 訂閱消息,訂閱規則由 Broker 配置決定。 應用場景

應用場景

削峰填谷

流量削峰也是消息隊列 RocketMQ 版的常用場景,一般在秒殺或團隊搶購活動 中使用廣泛。

在秒殺或團隊搶購活動中,由於用戶請求量較大,導致流量暴增,秒殺的應用在 處理如此大量的訪問流量後,下游的通知系統無法承載海量的調用量,甚至會導致系 統崩潰等問題而發生漏通知的情況。為解決這些問題,可在應用和下游通知系統之間 加入消息隊列 RocketMQ 版。

image.png

秒殺處理流程如下所述:

  1. 用戶發起海量秒殺請求到秒殺業務處理系統。
  2. 秒殺處理系統按照秒殺處理邏輯將滿足秒殺條件的請求發送至消息隊列 RocketMQ 版。
  3. 下游的通知系統訂閱消息隊列 RocketMQ 版的秒殺相關消息,再將秒殺成 功的消息發送到相應用戶。
  4. 用戶收到秒殺成功的通知。 異步解耦 傳統處理

異步解耦

傳統處理

最常見的一個場景是用戶註冊後,需要發送註冊郵件和短信通知,以告知用戶注 冊成功。傳統的做法有以下兩種:

(1)串行方式

串行方式下的註冊流程如下圖所示。

image.png

數據流動如下所述:

● 用戶在註冊頁面填寫賬號和密碼並提交註冊信息,這些註冊信息首先會被寫入 註冊系統成功。

● 註冊信息寫入註冊系統成功後,再發送請求至郵件通知系統。郵件通知系統收 到請求後向用戶發送郵件通知。

● 郵件通知系統接收註冊系統請求後再向下游的短信通知系統發送請求。短信通 知系統收到請求後向用戶發送短信通知。

以上三個任務全部完成後,才返回註冊結果到客戶端,用戶才能使用賬號登錄。

假設每個任務耗時分別為 50 ms,則用戶需要在註冊頁面等待總共需要 150 ms 才能登錄。

(2)並行方式

並行方式下的註冊流程如下圖所示。

image.png

數據流動如下所述:

● 用戶在註冊頁面填寫賬號和密碼並提交註冊信息,這些註冊信息首先會被寫入 註冊系統成功。

● 註冊信息寫入註冊系統成功後,再同時發送請求至郵件和短信通知系統。郵件 和短信通知系統收到請求後分別向用戶發送郵件和短信通知。

以上兩個任務全部完成後,才返回註冊結果到客戶端,用戶才能使用賬號登錄。

假設每個任務耗時分別為 50 ms,其中,郵件和短信通知並行完成,則用戶需 要在註冊頁面等待總共需要 100 ms 才能登錄。

以下就註冊場景中使用了消息隊列 RocketMQ 版的效果進行說明。

異步解耦

對於用戶來說,註冊功能實際只需要註冊系統存儲用戶的賬戶信息後,該用戶便 可以登錄,後續的註冊短信和郵件不是即時需要關注的步驟。

對於註冊系統而言,發送註冊成功的短信和郵件通知並不一定要綁定在一起同步 完成,所以實際當數據寫入註冊系統後,註冊系統就可以把其他的操作放入對應的消 息隊列 RocketMQ 版中然後馬上返回用戶結果,由消息隊列 RocketMQ 版異步地 進行這些操作。

image.png

數據流動如下所述:

● 用戶在註冊頁面填寫賬號和密碼並提交註冊信息,這些註冊信息首先會被寫入 註冊系統成功。

● 註冊信息寫入註冊系統成功後,再發送消息至消息隊列 RocketMQ 版。消息 隊列 RocketMQ 版會馬上返回響應給註冊系統,註冊完成。用戶可立即登錄。

● 下游的郵件和短信通知系統訂閱消息隊列 RocketMQ 版的此類註冊請求消息, 即可向用戶發送郵件和短信通知,完成所有的註冊流程。

用戶只需在註冊頁面等待註冊數據寫入註冊系統和消息隊列 RocketMQ 版的時 間,即等待 55 ms 即可登錄。

異步解耦是消息隊列 RocketMQ 版的主要特點,主要目的是減少請求響應時間和 解耦。主要的適用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消 息放入消息隊列。同時,由於使用了消息隊列 RocketMQ 版,只要保證消息格式不 變,消息的發送方和接收方並不需要彼此聯繫,也不需要受對方的影響,即解耦和。

順序收發

消息隊列 RocketMQ 版順序消息分為兩種情況:

全局順序:對於指定的一個 Topic,所有消息將按照嚴格的先入先出(FIFO)的 順序,進行順序發佈和順序消費;

分區順序:對於指定的一個 Topic,所有消息根據 Sharding Key 進行區塊分 區,同一個分區內的消息將按照嚴格的 FIFO 的順序,進行順發布和順序消費,可以 保證一個消息被一個進程消費。

在註冊場景中,可使用用戶 ID 作為 Sharding Key 來進行分區,同一個分區下 的新建、更新或刪除註冊信息的消息必須按照 FIFO 的順序發佈和消費。

分佈式事務一致性

註冊系統註冊的流程中,用戶入口在網頁註冊系統,通知系統在郵件系統,兩個系統之間的數據需要保持最終一致。

普通消息處理

如上所述,註冊系統和郵件通知系統之間通過消息隊列進行異步處理。註冊系統 將註冊信息寫入註冊系統之後,發送一條註冊成功的消息到消息隊列 RocketMQ 版, 郵件通知系統訂閱消息隊列 RocketMQ 版的註冊消息,做相應的業務處理,發送注 冊成功或者失敗的郵件。

image.png

流程說明如下:

  1. 註冊系統發起註冊。
  2. 註冊系統向消息隊列 RocketMQ 版發送註冊消息成功與否的消息。

2.1 消息發送成功,進入 3。

2.2 消息發送失敗,導致郵件通知系統未收到消息隊列 RocketMQ 版發送 的註冊成功與否的消息,而無法發送郵件,最終郵件通知系統和註冊 系統之間的狀態數據不一致。

  1. 郵件通知系統收到消息隊列 RocketMQ 版的註冊成功消息。
  2. 郵件通知系統發送註冊成功郵件給用戶。

在這樣的情況下,雖然實現了系統間的解藕,上游系統不需要關心下游系統的業 務處理結果;但是數據一致性不好處理,如何保證郵件通知系統狀態與註冊系統狀態 的最終一致。

image.png

流程說明如下:

  1. 註冊系統向消息隊列 RocketMQ 版發送半事務消息。 1.1 半事務消息發送成功,進入 2。

1.2 半事務消息發送失敗,註冊系統不進行註冊,流程結束。(最終註冊系 統與郵件通知系統數據一致)

  1. 註冊系統開始註冊。

2.1 註冊成功,進入 3.1。

2.2 註冊失敗,進行 3.2。

  1. 註冊系統向消息隊列 RocketMQ 版發送半消息狀態。

3.1 提交半事務消息,產生註冊成功消息,進入 4。

3.2 回滾半事務消息,未產生註冊成功消息,流程結束。(最終註冊系統與 郵件通知系統數據一致)

  1. 郵件通知系統接收消息隊列 RocketMQ 版的註冊成功消息。
  2. 郵件通知系統發送註冊成功郵件。(最終註冊系統與郵件通知系統數據一致)

大規模機器的緩存同步

雙十一大促時,各個分會場會有玲琅滿目的商品,每件商品的價格都會實時變 化。使用緩存技術也無法滿足對商品價格的訪問需求,緩存服務器網卡滿載。訪問較 多次商品價格查詢影響會場頁面的打開速度。

此時需要提供一種廣播機制,一條消息本來只可以被集群的一臺機器消費,如果 使用消息隊列 RocketMQ 版的廣播消費模式,那麼這條消息會被所有節點消費一次, 相當於把價格信息同步到需要的每臺機器上,取代緩存的作用。

消息類型

普通消息

普通消息是指消息隊列 RocketMQ 版中無特性的消息,即發送到服務端會立馬 被消費的消息,且消息是無序消費,不會按照發送的順序一次順序消費。

定時消息

Producer 將消息發送到消息隊列 RocketMQ 版服務端,但並不期望立馬投遞 這條消息,而是推遲到在當前時間點之後的某一個時間投遞到 Consumer 進行消費, 該消息即定時消息。

延時消息

Producer 將消息發送到消息隊列 RocketMQ 版服務端,但並不期望立馬投遞 這條消息,而是延遲一定時間後才投遞到 Consumer 進行消費,該消息即延時消息。 定時消息與延時消息在代碼配置上存在一些差異,但是最終達到的效果相同:消 息在發送到消息隊列 RocketMQ 版服務端後並不會立馬投遞,而是根據消息中的屬 性延遲固定時間後才投遞給消費者。

全局順序消息

對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發佈 和消費。

分區順序消息

對於指定的一個 Topic,所有消息根據 Sharding Key 進行區塊分區。同一個分 區內的消息按照嚴格的 FIFO 順序進行發佈和消費。Sharding Key 是順序消息中用 來區分不同分區的關鍵字段,和普通消息的 Message Key 是完全不同的概念。

事務消息

消息隊列 RocketMQ 版提供類似 X/Open XA 的分佈事務功能,通過消息隊列

RocketMQ 版的事務消息能達到分佈式事務的最終一致。

SDK 支持語言及協議

RocketMQ 支持 tcp 協議以及 http 協議的接入。

其中推薦使用阿里推出的 tcp 協議下的三大 sdk:java,C/C++,.NET。

除了阿里推出的 sdk,我們還支持開源的多語言 sdk 接入阿里雲 RocketMQ: java,go,python,C++。

如 果 您 想 使 用 多 語 言 的 sdk, 推 薦 使 用 http 協 議 接 入:java,PHP,go, Python,Nodejs,C#,C++。

具體協議以及 sdk 的獲取參考鏈接: https://help.aliyun.com/document_detail/124693.html?spm=a2c4g.11186 623.6.582.104e425cW5Tbm2

RocketMQ 快速入門教程

如果您使用的是阿里雲主賬號,則可以通過本文來體驗從開通服務、創建資源、 到使用 SDK 收發消息的完整流程,快速上手消息隊列 RocketMQ 版。 無論您使用的是消息隊列 RocketMQ 版支持的何種協議、何種語言,前三個步 驟都一致,只是在控制檯上具體填寫的信息會略有不同,請以控制檯說明為準。但在 調用 SDK 時,不同協議和語言的示例代碼有所不同,本文以 TCP 協議下的 Java SDK 為例進行說明。

步驟一:開通服務

  1. 在消息隊列 RocketMQ 版產品頁,單擊立即開通。
  2. 在確認訂單頁面,選擇我已閱讀並同意《消息隊列 MQ 服務協議》,再單擊 立即開通即可完成開通。

步驟二:創建資源

在使用消息隊列 RocketMQ 版時,請注意以下網絡訪問限制:

● Topic 和 Group ID 需創建在同一個地域(Region)下的同一個實例中才能互通。 例如,當某 Topic 創建在華東 1(杭州)下的實例 A 中,那麼該 Topic 只能被在 華東 1(杭州)下的實例 A 中創建的 Group ID 對應的生產端和消費端訪問。

● 如果只是測試,或者需要在本地(非阿里雲 ECS 服務器)使用消息隊列 RocketMQ 版的服務,請將 Topic 和 Group ID 都創建在公網地域下的實例 中。生產端和消費端可以部署在本地或者部署在任意地域的 ECS 上,前提是 本地服務器或者相應的 ECS 需要能夠訪問公網。

創建實例

實例是用於消息隊列 RocketMQ 版服務的虛擬機資源,會存儲消息主題 (Topic)和客戶端 ID(Group ID)信息。

  1. 登錄消息隊列 RocketMQ 版控制檯。在頁面頂部導航欄,選擇地域,如公 網地域。
  2. 在左側導航欄,單擊實例詳情。
  3. 在實例詳情頁面右上角,單擊創建實例按鈕。
  4. 在創建實例對話框,選擇實例類型,並輸入實例名和描述,然後單擊確認。

創建 Topic

Topic 是消息隊列 RocketMQ 版裡對消息的一級歸類,例如可以創建 Topic_ Trade 這一 Topic 來識別交易類消息,消息生產者將消息發送到 Topic_Trade,而 消息消費者則通過訂閱該 Topic 來獲取和消費消息。

● Topic 不能跨實例使用,例如在實例 A 中創建的 Topic A 不能在實例 B 中 使用。

● Topic 名稱必須在同一實例中是唯一的。

● 您可創建不同的 Topic 來發送不同類型的消息,例如用 Topic A 發送普通消 息,Topic B 發送事務消息,Topic C 發送定時 / 延時消息。

  1. 在控制檯左側導航欄,單擊 Topic 管理。
  2. 在 Topic 管理頁面上方選擇剛創建的實例,單擊創建 Topic 按鈕。
  3. 在創建 Topic 對話框中的 Topic 一欄,輸入 Topic 名稱,選擇該 Topic 對 應的消息類型,輸入該 Topic 的備註內容,然後單擊確定。

您創建的 Topic 將出現在 Topic 列表中。

創建 Group ID

創建完實例和 Topic 後,您需要為消息的消費者(或生產者)創建客戶端 ID , 即 Group ID 作為標識。

● Group ID 必須在同一實例中是唯一的。

● Group ID 和 Topic 的關係是 N:N,即一個消費者可以訂閱多個 Topic,同 一個 Topic 也可以被多個消費者訂閱;一個生產者可以向多個 Topic 發送消 息,同一個 Topic 也可以接收來自多個生產者的消息。 說明 :消費者必須有對應的 Group ID,生產者不做強制要求。

  1. 在控制檯左側導航欄,單擊 Group 管理。
  2. 在 Group 管理頁面上方選擇剛創建的實例,然後選擇 TCP 協議 > 創建 Group ID。本文以 TCP 協議為例。 說明 :TCP 和 HTTP 協議下的 Group ID 不可以共用,因此需分別創建。
  3. 在創建 Group ID 對話框中,輸入 Group ID 和描述,然後單擊確認。

創建阿里雲 AccessKey

阿里雲 AccessKey 用於收發消息時進行賬戶鑑權。

在調用 SDK 發送和訂閱消息的時候,除了需要指定創建的 Topic 和 Group ID 以外,還需輸入您在 RAM 控制檯創建的身份驗證信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。

步驟三:獲取接入點

在控制檯創建好資源後,您需通過控制檯獲取實例的接入點。在收發消息時,您需要為生產端和消費端配置該接入點,以此接入某個具體實例或地域的服務。

  1. 在控制檯左側導航欄,單擊實例詳情。
  2. 在實例詳情頁面上方選擇剛創建的實例。
  3. 在默認顯示的實例信息頁籤的獲取接入點信息區域,您可以分別看到新創 建實例的 TCP 和 HTTP 協議接入點。接入點性質因協議而異,具體說明 如下:

● TCP 協議:您在控制檯看到的 TCP 協議接入點是地域下某個具體實例的接入 點。同一地域下的不同實例的接入點各不相同。

● HTTP 協議:您在控制檯看到的 HTTP 協議接入點是某個地域的接入點,跟 具體實例無關。您在收發消息時還需另外設置實例 ID。

  1. 在 TCP 協議的接入點區域,單擊複製。 對於 TCP 協議的接入點,您還可以單擊示例代碼,查看在各種開發語言的程序 中如何設置接入點。

完成以上準備工作後,您就可以運行示例代碼,用消息隊列 RocketMQ 版進行 消息發送和訂閱了。

步驟四:發送消息

您可以通過以下方式發送消息: 控制檯發送消息:用於快速驗證 Topic 資源的可用性,主要用作測試。

  1. 在控制檯左側導航欄,單擊 Topic 管理。
  2. 在 Topic 管理頁面,找到您剛剛創建的 Topic,單擊右側操作列的發送。
  3. 在發送消息對話框中的 Message Body 一欄,輸入消息的具體內容,單擊 確定。

控制檯會返回消息發送成功通知以及相應的 Message ID。

調用 SDK 發送消息:用於生產環境下使用消息隊列 RocketMQ 版。

下文以調用 TCP Java SDK 為例進行說明。

調用 TCP Java SDK 發送消息

  1. 通過以下任一方式引入依賴:

● Maven 方式引入依賴:

<dependency>
 <groupId>com.aliyun.openservices</groupId>
 <artifactId>ons-client</artifactId>
 <version>"XXX"</version>
// 設置為 Java SDK 的最新版本號
</dependency>

● 下載依賴 JAR 包:

  1. 根據以下說明設置相關參數,運行示例代碼:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制檯創建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鑑權用 AccessKeyId,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 鑑權用 AccessKeySecret,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設置 TCP 接入域名,進入控制檯的實例詳情頁面,在頁面上方選擇實例後,在實例信息中的“獲取
接入點信息”區域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
producer.start();
// 循環發送消息
while(true){
Message msg = new Message( //
// 在控制檯創建的 Topic,即該消息所屬的 Topic 名稱
"TopicTestMQ",
// Message Tag,
// 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列
RocketMQ 版服務器過濾
"TagA",
// Message Body
// 任何二進制形式的數據,消息隊列 RocketMQ 版不做任何干預,
// 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 設置代表消息的業務關鍵屬性,請儘可能全局唯一,以方便您在無法正常收到消息情況下,可通過控
制臺查詢消息並補發
// 注意:不設置也不會影響消息正常收發
msg.setKey("ORDERID_100");
// 發送消息,只要不拋異常就是成功
// 打印 Message ID,以便用於消息發送狀態查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.
getMessageId());
}
// 在應用退出前,可以銷燬 Producer 對象
// 注意:如果不銷燬也沒有問題
producer.shutdown();
}
}

查看消息是否發送成功 消息發送後,您可以在控制檯查看消息發送狀態,步驟如下:

  1. 在控制檯左側導航欄,選擇消息查詢 > 按 Message ID 查詢。
  2. 在搜索框中輸入發送消息後返回的 Message ID,單擊搜索查詢消息發送 狀態。 儲存時間表示消息隊列 RocketMQ 版服務端存儲這條消息的時間。如果查詢到此消息,表示消息已經成功發送到服務端。

步驟五:調用 SDK 訂閱消息

消息發送成功後,需要啟動消費者來訂閱消息。下文以調用 TCP Java SDK 為 例說明如何訂閱消息。

  1. 調用 TCP Java SDK 訂閱消息。

您可以運行以下示例代碼來啟動消費者,並測試訂閱消息的功能。請按照說明正 確設置相關參數。

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制檯創建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鑑權用 AccessKeyId,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 鑑權用 AccessKeySecret,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設置 TCP 接入域名,進入控制檯的實例詳情頁面,在頁面上方選擇實例後,在實例信息中的“獲取
接入點信息”區域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
  1. 查看消息訂閱是否成功。

完成上述步驟後,您可以在控制檯查看消費者是否啟動成功,即消息訂閱是否 成功。

  1. 在控制檯左側導航欄,單擊 Group 管理。
  2. 找到要查看的 Group ID,單擊該 Group ID 所在行操作列的訂閱關係。

如果是否在線顯示為是,且訂閱關係一致,則說明訂閱成功。否則說明訂閱 失敗。

Leave a Reply

Your email address will not be published. Required fields are marked *