大數據

JAVA應用開發MQ實戰最佳實踐——Series1:RocketMQ綜述及代碼設計

25F5EB9D-77DF-4c9c-BA3D-7674EBD136A0.png

1. 最佳實踐綜述

本次最佳實踐,將結合JAVA代碼對消息隊列RocketMQ版(簡稱RocketMQ)的使用原理進行分析。
RocketMQ 是企業級互聯網架構的核心產品,具備低延遲、高併發、高可用、高可靠,可支撐萬億級數據洪峰的分佈式消息中間件。可通過RocketMQ控制檯創建RocketMQ實例,無需安裝包,省去繁雜的手續。對RocketMQ消息服務消息可視化可以按Topic、MessageID或Topic不同維度查詢發送的消息、按消息軌跡功能展示發送和消費關係、消息是否成功消費等信息。其中資源報表可以快速的統計RocketMQ在一定時間段內發送和訂閱消息的TPS數。
本次最佳實踐的內容主要包含:
(1)消息同步和異步發送的JAVA示例代碼及原理分析。
(2)針對同步和異步發送的區別選擇適用的消息發送方式滿足需求。
(3)對消息發送可以分Topic,更細粒化標籤tag消息進行歸類。
(4)通過Topic和Tag選擇過濾消費消息。
(5)對消息發送失敗有進行消息重試處理。
(6)結合JAVA代碼對集群和廣播訂閱消息消費原理進行詳述。

2. 最佳實踐代碼設計

2.1 生產者發送消息

本章對生產者發送消息的兩種模式進行代碼的說明。

2.1.1 同步發送消息

同步發送原理:
同步發送是指消息發送方發出一條消息後,會在收到服務端返回響應之後才發下一條消息的通訊方式。
1.在pom.xml文件導入依賴包。

<dependency>
    <groupId>************ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>

2.配置文件application.properties連接mq的參數值。
# POC2專有云MQ配置
mq.accessKey=lB2eniMz61o2Z1qc
mq.secretKey=cweVrg5sKGHMIrE99Btmq109yR1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*************-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA
3.同步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
(1)1.1生產者發送消息-同步發送消息-3同步發送示例代碼.png

package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("simpleMQProduce")
@ RefreshScope
public class SimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設置發送超時時間,單位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 設置 TCP 接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
producer.start();
Message msg = new Message(
// Message 所屬的 Topic
this.getTopic(),
// Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
this.getTag(),
// Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
// 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
(content.toString()).getBytes());
// 設置代表消息的業務關鍵屬性,請儘可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發
// 注意:不設置也不會影響消息正常收發
msg.setKey("ORDERID_" + 1);
try
{
SendResult sendResult = producer.send(msg);
// 同步發送消息,只要不拋異常就是成功
if(sendResult != null)
{
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
}
catch(Exception e)
{
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
e.printStackTrace();
}
// 在應用退出前,銷燬 Producer 對象
// 注意:如果不銷燬也沒有問題
producer.shutdown();
}
}

2.1.2 同步發送應用場景

此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。

2.1.3 異步發送消息

異步發送原理:
異步發送是指發送方發出一條消息後,不等服務端返回響應,接著發送下一條消息的通訊方式。消息隊列RocketMQ版的異步發送,需要用戶實現異步發送回調接口。
1.在pom.xml文件導入依賴包。

<dependency>
    <groupId>c***********ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>

2.配置文件application.properties連接mq的參數值。
# POC2專有云MQ配置
mq.accessKey=lB2eniMz61o2Z1qc
mq.secretKey=cweVrg5sKGHMIrE99Btmq109yR1HaI
mq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.**********-d01.mq.namesrv.cloud.poc2.com:9876
mq.normalTopic=pdsa_topic
mq.producerId=GID_pdsa_mq
mq.consumerId=CID_consumer
mq.sendMsgTimeoutMillis=3000
mq.tag=TagA

3.異步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb ,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
(2)1.1生產者發送消息-異步發送消息-3異步發送示例代碼.png

package com.aliware.edas.com.aliware.edas.rocketmq;
import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;
import com.aliyun.openservices.ons.api.*;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Properties;
/**
* @author liuhuihui
*/
@Component("asyncSimpleMQProduce")
@ RefreshScope
public class AsyncSimpleMQProduce extends ProducerEntry
{
StringBuilder content = new StringBuilder();
public void sendMsg()
{
for(int i = 0; i < 6400; i++)
{
content.append(String.valueOf("A"));
}
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId());
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設置發送超時時間,單位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis());
// 設置 TCP 接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可
producer.start();
Message msg = new Message(
// Message 所屬的 Topic
this.getTopic(),
// Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾
this.getTag(),
// Message Body 可以是任何二進制形式的數據, MQ 不做任何干預,
// 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式
(content.toString()).getBytes());
// 設置代表消息的業務關鍵屬性,請儘可能全局唯一。
// 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發
// 注意:不設置也不會影響消息正常收發
msg.setKey("ORDERID_" + 1);
while(true)
{
producer.sendAsync(msg, new SendCallback()
{
@Override
public void onSuccess(SendResult sendResult)
{
System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length()));
}
@Override
public void onException(OnExceptionContext context)
{
System.out.println("發送失敗!");
}
});
}
// 在應用退出前,銷燬 Producer 對象
// 注意:如果不銷燬也沒有問題
// producer.shutdown();
}
}

2.1.4 異步發送應用場景

異步發送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如用戶視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

2.2 消費者訂閱消息

本章對消費者訂閱消息的兩種模式進行代碼的說明。

2.2.1 集群訂閱

集群訂閱原理:
同一個Group ID所標識的所有Consumer平均分攤消費消息。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在集群消費模式下每個實例平均分攤,只消費其中的3條消息。
1.在pom.xml文件導入依賴包。

<dependency>
    <groupId>**************ices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>

2.配置文件application.properties連接mq的參數值。
3.集群訂閱示例代碼,適配後面MQ性能壓測場景代碼。
(3)1.2消費者訂閱消息-集群訂閱-3集群訂閱示例代碼.png

@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制檯創建的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設置TCP接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//集群訂閱方式(默認)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
//訂閱另外一個Topic
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //訂閱全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}

2.2.2 廣播訂閱

廣播訂閱原理:
同一個Group ID所標識的所有Consumer都會各自消費某條消息一次。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在廣播消費模式下每個實例都會各自消費9條消息。
1.在pom.xml文件導入依賴包.

<dependency>
    <groupId>**************ces</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.2.Final</version>
</dependency>

2.廣播消費示例代碼。
(4)1.2消費者訂閱消息-廣播訂閱-2廣播消費示例代碼.png
@Component("simpleMQConsumer")
@ RefreshScope
public classSimpleMQConsumerextendsProducerEntry
{
public void receive()
{
Properties properties = new Properties();
//您在控制檯創建的GroupID
properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId());
//AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.AccessKey, this.getAccessKey());
//SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建
properties.put(PropertyKeyConst.SecretKey, this.getSecretKey());
//設置TCP接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr());
//廣播訂閱方式(默認)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(this.getTopic(), "*", new MessageListener()
{ //訂閱全部Tag
@Override
public Action consume(Message message, ConsumeContext context)
{
System.out.println("Receive:" + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}

後續內容

JAVA應用開發MQ實戰最佳實踐——Series2:消息隊列RocketMQ性能測試案例

我們是阿里雲智能全球技術服務-SRE團隊,我們致力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用雲、基於雲構建更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上雲、用好雲,讓客戶雲上業務運行更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿里雲SRE技術學院釘釘圈子,和更多雲上人交流關於雲平臺的那些事。

image.png

Leave a Reply

Your email address will not be published. Required fields are marked *