開發與維運

JAVA應用開發MQ實戰最佳實踐

1. 最佳實踐綜述

本次最佳實踐,將結合JAVA代碼對消息隊列RocketMQ版(簡稱RocketMQ)的使用原理進行分析。
RocketMQ 是企業級互聯網架構的核心產品,具備低延遲、高併發、高可用、高可靠,可支撐萬億級數據洪峰的分佈式消息中間件。可通過RocketMQ控制檯創建RocketMQ實例,無需安裝包,省去繁雜的手續。對RocketMQ消息服務消息可視化可以按Topic、MessageID或Topic不同維度查詢發送的消息、按消息軌跡功能展示發送和消費關係、消息是否成功消費等信息。其中資源報表可以快速的統計RocketMQ在一定時間段內發送和訂閱消息的TPS數。
本次最佳實踐的內容主要包含:
(1)消息同步和異步發送的JAVA示例代碼及原理分析。
(2)針對同步和異步發送的區別選擇適用的消息發送方式滿足需求。
(3)對消息發送可以分Topic,更細粒化標籤tag消息進行歸類。
(4)通過Topic和Tag選擇過濾消費消息。
(5)對消息發送失敗有進行消息重試處理。
(6)結合JAVA代碼對集群和廣播訂閱消息消費原理進行詳述。

2. 最佳實踐代碼設計

2.1 生產者發送消息

本章對生產者發送消息的兩種模式進行代碼的說明。

2.1.1 同步發送消息

同步發送原理:
同步發送是指消息發送方發出一條消息後,會在收到服務端返回響應之後才發下一條消息的通訊方式。

  1. 在pom.xml文件導入依賴包。

    <groupId>****.****.**ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>

  2. 配置文件application.properties連接mq的參數值。
    # POC2專有云MQ配置

mq.accessKey=*1qc
mq.secretKey=*1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA

  1. 同步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
  2. com.aliware.edas.com.aliware.edas.rocketmq;

import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Properties;

/**

  • @author liuhuihui
    */

@Component("simpleMQProduce")
@RefreshScope
public class SimpleMQProduce extends ProducerEntry {

StringBuilder content = new StringBuilder();

public void sendMsg() {
    for (int i = 0; i < 6400; i++) {
        content.append(String.valueOf("A"));
    }
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
    // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
    properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
    // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
    properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
    //設置發送超時時間,單位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
    // 設置 TCP 接入域名,到控制檯的實例基本信息中查看
    properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

    Producer producer = ONSFactory.createProducer(properties);
    // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
    producer.start();

    Message msg = new Message(
            // Message 所屬的 Topic
            this.getTopic(),
            // Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
            this.getTag(),
            // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
            // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
            (content.toString()).getBytes());
    // 設置代表消息的業務關鍵屬性,請儘可能全局唯一。
    // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發
    // 注意:不設置也不會影響消息正常收發
    msg.setKey("ORDERID_" + 1);

    try {
        SendResult sendResult = producer.send(msg);
        // 同步發送消息,只要不拋異常就是成功
        if (sendResult != null) {
            System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
        }
    }
    catch (Exception e) {
        // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
        System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
        e.printStackTrace();
    }

    // 在應用退出前,銷燬 Producer 對象
    // 注意:如果不銷燬也沒有問題
    producer.shutdown();
}

}

2.1.2 同步發送應用場景

此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。

2.1.3 異步發送消息

異步發送原理:
異步發送是指發送方發出一條消息後,不等服務端返回響應,接著發送下一條消息的通訊方式。消息隊列RocketMQ版的異步發送,需要用戶實現異步發送回調接口。

  1. 在pom.xml文件導入依賴包。

     <groupId>*************ces</groupId>
     <artifactId>ons-client</artifactId>
     <version>1.8.2.Final</version>

2.配置文件application.properties連接mq的參數值。
#POC2專有云MQ配置
mq.accessKey=*qc
mq.secretKey=*1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA

  1. 異步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb ,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
  2. com.aliware.edas.com.aliware.edas.rocketmq;

import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Properties;

/**

  • @author liuhuihui
    */

@Component("asyncSimpleMQProduce")
@RefreshScope
public class AsyncSimpleMQProduce extends ProducerEntry {

StringBuilder content = new StringBuilder();

public void sendMsg() {
    for (int i = 0; i < 6400; i++) {
        content.append(String.valueOf("A"));
    }
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
    // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
    properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
    // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
    properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
    //設置發送超時時間,單位毫秒
    properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
    // 設置 TCP 接入域名,到控制檯的實例基本信息中查看
    properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());

    Producer producer = ONSFactory.createProducer(properties);
    // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
    producer.start();

    Message msg = new Message(
            // Message 所屬的 Topic
            this.getTopic(),
            // Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
            this.getTag(),
            // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
            // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
            (content.toString()).getBytes());
    // 設置代表消息的業務關鍵屬性,請儘可能全局唯一。
    // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發
    // 注意:不設置也不會影響消息正常收發
    msg.setKey("ORDERID_" + 1);

    while (true) {
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length()-50, content.length()));
            }

            @Override
            public void onException(OnExceptionContext context) {
                System.out.println("發送失敗!");
            }
        });
    }

    // 在應用退出前,銷燬 Producer 對象
    // 注意:如果不銷燬也沒有問題

// producer.shutdown();

}

}

2.1.4 異步發送應用場景

異步發送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如用戶視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

2.2 消費者訂閱消息

本章對消費者訂閱消息的兩種模式進行代碼的說明。

2.2.1 集群訂閱

集群訂閱原理:
同一個Group ID所標識的所有Consumer平均分攤消費消息。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在集群消費模式下每個實例平均分攤,只消費其中的3條消息。

  1. 在pom.xml文件導入依賴包。

    <groupId>**************ices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>

  2. 配置文件application.properties連接mq的參數值。
  3. 集群訂閱示例代碼,適配後面MQ性能壓測場景代碼。
    @Component("simpleMQConsumer")

@RefreshScope
public classSimpleMQConsumerextendsProducerEntry{
public void receive(){

Properties properties=new Properties();
//您在控制檯創建的GroupID
properties.put(PropertyKeyConst.GROUP_ID,this.getProcucerId());
//AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey,this.getAccessKey());
//SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey,this.getSecretKey());
//設置TCP接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,this.getOnsAddr());
//集群訂閱方式(默認)
properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
Consumer consumer=ONSFactory.createConsumer(properties);
//訂閱另外一個Topic
consumer.subscribe(this.getTopic(),"*",new MessageListener(){//訂閱全部Tag 
    @Override
    public Action consume(Message message,ConsumeContext context){
    System.out.println("Receive:"+message);
    return Action.CommitMessage;
    }
});
consumer.start();
System.out.println("Consumer Started");
}

}

2.2.2 廣播訂閱

廣播訂閱原理:
同一個Group ID所標識的所有Consumer都會各自消費某條消息一次。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在廣播消費模式下每個實例都會各自消費9條消息。

  1. 在pom.xml文件導入依賴包.

    <groupId>**************ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>

  2. 廣播消費示例代碼。
    @Component("simpleMQConsumer")

@RefreshScope
public classSimpleMQConsumerextendsProducerEntry{
public void receive(){

Properties properties=new Properties();
//您在控制檯創建的GroupID
properties.put(PropertyKeyConst.GROUP_ID,this.getProcucerId());
//AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey,this.getAccessKey());
//SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey,this.getSecretKey());
//設置TCP接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,this.getOnsAddr());
//廣播訂閱方式(默認)
properties.put(PropertyKeyConst.MessageModel,PropertyValueConst.CLUSTERING);
Consumer consumer=ONSFactory.createConsumer(properties);
consumer.subscribe(this.getTopic(),"*",new MessageListener(){//訂閱全部Tag 
    @Override
    public Action consume(Message message,ConsumeContext context){
    System.out.println("Receive:"+message);
    return Action.CommitMessage;
    }
});
consumer.start();
System.out.println("Consumer Started");
}

}

3. 消息隊列RocketMQ性能測試案例

3.1 RocketMQ測試分析

客戶場景,信息共享交換平臺:

  1. 交換平臺需支持每秒萬級別數據傳輸
  2. 實現跨路段、跨部門、跨行業、跨區域信息即時共享,做到高可靠、低延遲

    1.png

客戶現場展示場景設計思路:

  1. 針對性的編寫一套JAVA代碼來支撐本次MQ性能POC驗證。
  2. 選擇合適規格的單臺ECS,在單個TOPIC下運行多線程代碼實現和MQ的訂閱發送,統計1分鐘內訂閱和發送的數據交換TPS情況。
  3. 考慮POC也要符合客戶實際生產場景中MQ使用邏輯,ECS應運行4個獨立的JAR包,兩對JAR包交叉經過MQ進行數據交換。
  4. 消息體內的內容應打印到屏幕,通過消息軌跡驗證消息的被消費情況。
  5. 驗證結果:客戶指定場景下8線程異步,1分鐘TPS在10K以上。

3.2 創建資源

本章節詳細描述如何創建消息隊列 RocketMQ 版的資源。

3.2.1 創建RocketMQ實例
  1. 登錄Apsara Stack控制檯。
  2. 在左側導航欄中單擊中間件產品 > 消息隊列訪問管理控制檯界面。

    2.png

  3. 在消息隊列頁面,選擇區域與部門後,單擊MQ,進入MQ控制檯。

    3.png

  4. 單擊左側導航欄概覽後,在概覽頁面中,單擊創建實例。
  5. 在創建實例對話框,選擇實例類型,並輸入實例名和描述,然後單擊確認。

    4.png

說明: 創建完實例後,單擊左側導航欄實例詳情,可以查看創建實例的Topic數上限、消息發送TPS上限、消息訂閱TPS上限和TCP協議接入地址等。

3.2.2 創建 Topic

Topic 是消息隊列 RocketMQ 版裡對消息的一級歸類,例如可以創建 Topic_Trade 這一 Topic 來識別交易類消息,消息生產者將消息發送到 Topic_Trade,而消息消費者則通過訂閱該 Topic 來獲取和消費消息。
創建Topic要注意一下幾點:

  • Topic 不能跨實例使用,例如在實例 A 中創建的 Topic A 不能在實例 B 中使用。
  • Topic 名稱必須在同一實例中是唯一的。
  • 您可創建不同的 Topic 來發送不同類型的消息,例如用 Topic A 發送普通消息,Topic B 發送事務消息,Topic C 發送定時/延時消息。
  1. 在控制檯左側導航欄,單擊 Topic 管理。
  2. 在 Topic 管理頁面上方選擇剛創建的實例,單擊創建 Topic 按鈕。

    5.png

  3. 在創建 Topic 對話框中的 Topic 一欄,輸入 Topic 名稱,選擇該 Topic 對應的消息類型,輸入該 Topic 的備註內容,然後單擊確定。
3.2.3 創建 Group ID

創建完實例和 Topic 後,您需要為消息的消費者(或生產者)創建客戶端 ID ,即 Group ID 作為標識。

  • Group ID 必須在同一實例中是唯一的。
  • Group ID 和 Topic 的關係是 N:N,即一個消費者可以訂閱多個 Topic,同一個 Topic 也可以被多個消費者訂閱;一個生產者可以向多個 Topic 發送消息,同一個 Topic 也可以接收來自多個生產者的消息。
  1. 在控制檯左側導航欄,單擊 Group 管理。
  2. 在 Group 管理頁面上方選擇剛創建的實例,然後選擇TCP協議 > 創建Group ID 。本文以 TCP 協議為例。

    image.png

  3. 在創建 Group ID 對話框中,輸入 Group ID 和描述,然後單擊確認。

3.3 場景落地

  1. 準備p1,c1,p2,c2雙發送雙訂閱應用小程序,p1、p2小程序參考“第二章節”生產者異步發送消息代碼,c1、c2小程序參考“第二章節”消費者集群訂閱消息代碼
    p1-8081.jar

c1-8083.jar
p2-8082.jar
c2-8084.jar

  1. 通過Xshell連接到專有云ops1環境,把準備的4個jar包上傳至一臺16c32gECS服務器上

    7.png

  2. 同時在該目錄下編寫啟動4個jar包的start.sh腳本:
    #!/bin/bash

nohup java -jar p1-8081.jar &
nohup java -jar p2-8081.jar &
nohup java -jar c1-8081.jar &
nohup java -jar c2-8081.jar &

  1. 編寫停用4個jar包的stop.sh腳本:
    #!/bin/bash

process=jar
PID=$(ps -ef|grep $process |grep -v grep|awk '{print $2}')
#echo $PID
if [ ! -n "${PID[0]}" ];then

    echo "\"$process\" process not find"
    ps -ef|grep $process

else

    kill -9 $PID
    echo "kill $process success"
    ps -ef|grep $process

fi

  1. 編寫請求消息隊列2個發送和2個訂閱接口的curl腳本:
    #!/bin/bash

curl http://192.168.0.150:8081/echo-sync-final-send
curl http://192.168.0.150:8082/echo-sync-final-send
curl http://192.168.0.150:8083/echo-final-mq
curl http://192.168.0.150:8084/echo-final-mq

  1. ./start.sh執行啟動4個jar包,通過tail -f nohup.out查看啟動日誌,每個jar包啟動完成日誌如下:

    8.png

  2. 通過ps -ef | grep jar確認4個jar是否在ECS服務器運行,有如下圖則表示4個應用小程序運行正常:
    9.png
  3. ./curl.sh執行請求2個消息發送和2個消息訂閱接口,請求之後我們查看一下程序後臺日誌發現代碼打印的消息發送和消息訂閱的日誌不停的在刷,截取單條消息發送日誌如下:

    10.png

截取單條消息訂閱日誌如下:

11.png

3.4 消息查詢

如遇消息消費有問題,則可通過查詢具體發送的消息內容來排查問題。消息隊列 RocketMQ 版提供了三種消息查詢的方式,分別是按 Message ID、Message Key 以及 Topic 查詢。

3.4.1 查詢方式說明

三種查詢方式的特點和對比如下表所述。
表1:查詢方式對比

表2.png

3.4.2 推薦查詢過程

推薦按照以下流程查詢消息。

12.png

3.4.3 查詢步驟
  1. 登錄MQ控制檯。
  2. 在左側導航欄,單擊消息軌跡。
  3. 在消息軌跡頁面,您可單擊以下任一頁籤,然後按頁面提示輸入相應信息,再單擊搜索按鈕來查詢消息。
  • 按 Message ID 查詢

按 Message ID 查詢消息屬於精確查詢,您輸入 Topic 和 Message ID 即可精確查詢到任意一條消息。因此,為了儘可能精確地查詢,建議在發送消息成功後將 Message ID 信息打印到日誌中,方便問題排查。

13.png

  • 按 Message Key 查詢

消息隊列 RocketMQ 版根據您設置的 Message Key 建立消息的索引信息,當您輸入 Key 進行查詢時,消息隊列 RocketMQ 版根據該索引即可匹配相關的消息返回。

14.png

  • 按 Topic 查詢

按 Topic 查詢一般用在 Message ID 和 Message Key 都無法獲得的情況下,根據 Topic 和消息的發送時間範圍,批量獲取該時間範圍內的所有消息,然後再找到關心的數據。

15.png

  1. 在操作欄中單擊消息詳情,可查看到軌跡的簡要信息,主要是消息本身的屬性以及接收狀態的信息。

    16.png

  2. 在展開的區域中,單擊查看軌跡即可查看完整的鏈路圖。下圖示例為在 TCP 協議下,按 Message ID 查詢普通消息的軌跡。

    17.png

對於 Message Key 和 Topic 查詢方式,如果匹配到多條軌跡,可以進行上下翻頁,查看比對軌跡數據。

3.4.4 查詢結果說明

您可以在控制檯的消息查詢頁面看到查詢到的消息。直接顯示的信息包含 Message ID、Tag、Key 和存儲時間。此外,您還可以在每一行消息操作列下載消息內容、查詢信息軌跡以及查看消息詳情。
投遞狀態是消息隊列 RocketMQ 版根據各個 Group ID 的消費進度計算出的結果,投遞狀態的信息如下表所示。
表 2: 消息投遞狀態

消息投遞狀態.png

3.4.5 消費驗證

消息隊列 RocketMQ 版提供了消費驗證功能,該功能可以將指定消息推送給指定的在線客戶端,以檢測客戶端消費該消息的邏輯和結果是否符合預期。
說明: 消費驗證功能僅僅是用於驗證客戶端的消費邏輯是否正常,並不會影響正常的收消息流程,因此消息的消費狀態等信息在消費驗證後並不會改變。

3.5 查看消息生產數據

可供查看的消息生產數據是某個Topic在一個時間段內從Broker接收的消息的總量或者TPS。

  1. 登錄MQ控制檯。
  2. 在左側導航欄,單擊資源報表。
  3. 在資源報表頁面,單擊消息生產頁籤。
  4. 在Topic一欄,選擇Topic,並指定其他信息,然後單擊搜索。
    字段說明:
  • 採集類型:分為總量和TPS。總量提供該週期內Topic接收的消息總量;TPS提供每個週期內該Topic接收消息的平均TPS。
  • 採集週期:包括1分鐘、10分鐘後、30分鐘、1小時。採集週期決定了數據採集的時間間隔,週期越短,採集點越密集,消息消費數據越詳細。
  • 時間範圍:RocketMQ最多可以提供最近三天之內的消息的生產查詢。

查詢結果以圖表的形式顯示

18.png

19.png

3.6 查看消息消費數據

可供查看的消息消費數據是某個Topic在一個時間段內投遞給某個Group ID的消息的總量或TPS。
具體操作步驟如下:

  1. 登錄MQ控制檯。
  2. 在左側導航欄,單擊資源報表。
  3. 在資源報表頁面,單擊消息消費頁籤。
  4. 在Group ID和Topic欄,分別選擇您要查詢的Group ID和Topic。
  5. 指定其他信息,然後單擊搜索。
    字段說明:
  • 採集類型:分為總量和TPS。總量提供每個週期內該Topic投遞給該Group ID的消息總量,TPS提供每個週期內該Topic投遞給該Group ID消息的平均TPS。
  • 採集週期:包括1分鐘、10分鐘後、30分鐘、1小時。採集週期決定了數據採集的時間間隔,週期越短,採集點越密集,消息消費數據越詳細。
  • 時間範圍:RocketMQ最多可以提供最近三天之內的消息的消費查詢。

查詢結果以圖表的形式顯示

20.png

21.png

我們是阿里雲智能全球技術服務-SRE團隊,我們致力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用雲、基於雲構建更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上雲、用好雲,讓客戶雲上業務運行更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿里雲SRE技術學院釘釘圈子,和更多雲上人交流關於雲平臺的那些事。

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *