
1. 最佳實踐綜述
本次最佳實踐,將結合JAVA代碼對消息隊列RocketMQ版(簡稱RocketMQ)的使用原理進行分析。
RocketMQ 是企業級互聯網架構的核心產品,具備低延遲、高併發、高可用、高可靠,可支撐萬億級數據洪峰的分佈式消息中間件。可通過RocketMQ控制檯創建RocketMQ實例,無需安裝包,省去繁雜的手續。對RocketMQ消息服務消息可視化可以按Topic、MessageID或Topic不同維度查詢發送的消息、按消息軌跡功能展示發送和消費關係、消息是否成功消費等信息。其中資源報表可以快速的統計RocketMQ在一定時間段內發送和訂閱消息的TPS數。
本次最佳實踐的內容主要包含:
(1)消息同步和異步發送的JAVA示例代碼及原理分析。
(2)針對同步和異步發送的區別選擇適用的消息發送方式滿足需求。
(3)對消息發送可以分Topic,更細粒化標籤tag消息進行歸類。
(4)通過Topic和Tag選擇過濾消費消息。
(5)對消息發送失敗有進行消息重試處理。
(6)結合JAVA代碼對集群和廣播訂閱消息消費原理進行詳述。
2. 最佳實踐代碼設計
2.1 生產者發送消息
本章對生產者發送消息的兩種模式進行代碼的說明。
2.1.1 同步發送消息
同步發送原理:
同步發送是指消息發送方發出一條消息後,會在收到服務端返回響應之後才發下一條消息的通訊方式。
1.在pom.xml文件導入依賴包。
<dependency>
<groupId>************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties連接mq的參數值。# POC2專有云MQ配置mq.accessKey=lB2eniMz61o2Z1qcmq.secretKey=cweVrg5sKGHMIrE99Btmq109yR1HaImq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.*************-d01.mq.namesrv.cloud.poc2.com:9876mq.normalTopic=pdsa_topicmq.producerId=GID_pdsa_mqmq.consumerId=CID_consumermq.sendMsgTimeoutMillis=3000mq.tag=TagA
3.同步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
package com.aliware.edas.com.aliware.edas.rocketmq;import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;import com.aliyun.openservices.ons.api.*;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.stereotype.Component;import java.util.Date;import java.util.Properties;/** * @author liuhuihui */@Component("simpleMQProduce")@ RefreshScopepublic class SimpleMQProduce extends ProducerEntry{ StringBuilder content = new StringBuilder(); public void sendMsg() { for(int i = 0; i < 6400; i++) { content.append(String.valueOf("A")); } Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId()); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.AccessKey, this.getAccessKey()); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.SecretKey, this.getSecretKey()); //設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis()); // 設置 TCP 接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr()); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可 producer.start(); Message msg = new Message( // Message 所屬的 Topic this.getTopic(), // Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾 this.getTag(), // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預, // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 (content.toString()).getBytes()); // 設置代表消息的業務關鍵屬性,請儘可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發 // 注意:不設置也不會影響消息正常收發 msg.setKey("ORDERID_" + 1); try { SendResult sendResult = producer.send(msg); // 同步發送消息,只要不拋異常就是成功 if(sendResult != null) { System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length())); } } catch(Exception e) { // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理 System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length())); e.printStackTrace(); } // 在應用退出前,銷燬 Producer 對象 // 注意:如果不銷燬也沒有問題 producer.shutdown(); }}
2.1.2 同步發送應用場景
此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
2.1.3 異步發送消息
異步發送原理:
異步發送是指發送方發出一條消息後,不等服務端返回響應,接著發送下一條消息的通訊方式。消息隊列RocketMQ版的異步發送,需要用戶實現異步發送回調接口。
1.在pom.xml文件導入依賴包。
<dependency>
<groupId>c***********ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties連接mq的參數值。# POC2專有云MQ配置mq.accessKey=lB2eniMz61o2Z1qcmq.secretKey=cweVrg5sKGHMIrE99Btmq109yR1HaImq.onsAddr=http://MQ_INST_1870773007797099_Bb48AjKM.**********-d01.mq.namesrv.cloud.poc2.com:9876mq.normalTopic=pdsa_topicmq.producerId=GID_pdsa_mqmq.consumerId=CID_consumermq.sendMsgTimeoutMillis=3000mq.tag=TagA
3.異步發送示例代碼,針對性適配後面MQ性能壓測場景代碼,內容包含發送每條消息數據大小50Kb ,Topic和Tag消息更細粒化分類,消息發送失敗進行重試處理。
package com.aliware.edas.com.aliware.edas.rocketmq;import com.aliware.edas.com.aliware.edas.mq.ProducerEntry;import com.aliyun.openservices.ons.api.*;import org.springframework.cloud.context.config.annotation.RefreshScope;import org.springframework.stereotype.Component;import java.util.Date;import java.util.Properties;/** * @author liuhuihui */@Component("asyncSimpleMQProduce")@ RefreshScopepublic class AsyncSimpleMQProduce extends ProducerEntry{ StringBuilder content = new StringBuilder(); public void sendMsg() { for(int i = 0; i < 6400; i++) { content.append(String.valueOf("A")); } Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, this.getProcucerId()); // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.AccessKey, this.getAccessKey()); // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.SecretKey, this.getSecretKey()); //設置發送超時時間,單位毫秒 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.getSendMsgTimeoutMillis()); // 設置 TCP 接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr()); Producer producer = ONSFactory.createProducer(properties); // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可 producer.start(); Message msg = new Message( // Message 所屬的 Topic this.getTopic(), // Message Tag 可理解為 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾 this.getTag(), // Message Body 可以是任何二進制形式的數據, MQ 不做任何干預, // 需要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 (content.toString()).getBytes()); // 設置代表消息的業務關鍵屬性,請儘可能全局唯一。 // 以方便您在無法正常收到消息情況下,可通過阿里雲服務器管理控制檯查詢消息並補發 // 注意:不設置也不會影響消息正常收發 msg.setKey("ORDERID_" + 1); while(true) { producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(new Date() + "消息長度:" + content.length() + "--發送消息內容:" + content.substring(0, 50) + "****" + content.substring(content.length() - 50, content.length())); } @Override public void onException(OnExceptionContext context) { System.out.println("發送失敗!"); } }); } // 在應用退出前,銷燬 Producer 對象 // 注意:如果不銷燬也沒有問題 // producer.shutdown(); }}
2.1.4 異步發送應用場景
異步發送一般用於鏈路耗時較長,對響應時間較為敏感的業務場景,例如用戶視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
2.2 消費者訂閱消息
本章對消費者訂閱消息的兩種模式進行代碼的說明。
2.2.1 集群訂閱
集群訂閱原理:
同一個Group ID所標識的所有Consumer平均分攤消費消息。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在集群消費模式下每個實例平均分攤,只消費其中的3條消息。
1.在pom.xml文件導入依賴包。
<dependency>
<groupId>**************ices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.配置文件application.properties連接mq的參數值。
3.集群訂閱示例代碼,適配後面MQ性能壓測場景代碼。
@Component("simpleMQConsumer")@ RefreshScopepublic classSimpleMQConsumerextendsProducerEntry{ public void receive() { Properties properties = new Properties(); //您在控制檯創建的GroupID properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId()); //AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.AccessKey, this.getAccessKey()); //SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.SecretKey, this.getSecretKey()); //設置TCP接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr()); //集群訂閱方式(默認) properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); Consumer consumer = ONSFactory.createConsumer(properties); //訂閱另外一個Topic consumer.subscribe(this.getTopic(), "*", new MessageListener() { //訂閱全部Tag @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive:" + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); }}
2.2.2 廣播訂閱
廣播訂閱原理:
同一個Group ID所標識的所有Consumer都會各自消費某條消息一次。例如某個Topic有9條消息,一個Group ID有3個Consumer實例,那麼在廣播消費模式下每個實例都會各自消費9條消息。
1.在pom.xml文件導入依賴包.
<dependency>
<groupId>**************ces</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.2.Final</version>
</dependency>
2.廣播消費示例代碼。
@Component("simpleMQConsumer")@ RefreshScopepublic classSimpleMQConsumerextendsProducerEntry{ public void receive() { Properties properties = new Properties(); //您在控制檯創建的GroupID properties.put(PropertyKeyConst.GROUP_ID, this.getProcucerId()); //AccessKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.AccessKey, this.getAccessKey()); //SecretKey阿里雲身份驗證,在阿里雲服務器管理控制檯創建 properties.put(PropertyKeyConst.SecretKey, this.getSecretKey()); //設置TCP接入域名,到控制檯的實例基本信息中查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, this.getOnsAddr()); //廣播訂閱方式(默認) properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(this.getTopic(), "*", new MessageListener() { //訂閱全部Tag @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive:" + message); return Action.CommitMessage; } }); consumer.start(); System.out.println("Consumer Started"); }}
後續內容
JAVA應用開發MQ實戰最佳實踐——Series2:消息隊列RocketMQ性能測試案例
我們是阿里雲智能全球技術服務-SRE團隊,我們致力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用雲、基於雲構建更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上雲、用好雲,讓客戶雲上業務運行更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿里雲SRE技術學院釘釘圈子,和更多雲上人交流關於雲平臺的那些事。
