開發與維運

阿里雲Rocket MQ Java Http SDK發送消費消息示例Demo

Step By Step

1、創建實例,登陸阿里雲控制檯
圖片.png
圖片.png

2、實例下面分別創建Topic和Http
Group
圖片.png
圖片.png

3、pom.xml

        <dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 設置HTTP接入域名(此處以公共雲生產環境為例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
                "LTAIOZZg********",
                // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
                "v7CjUJCMk7j9aK****************"
        );

        // 所屬的 Topic
        final String topic = "****";
        // Topic所屬實例ID,默認實例為空
        final String instanceId = "MQ_INST_********";

        // 獲取Topic的生產者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 循環發送40條消息
            for (int i = 0; i < 40; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息內容
                            "hello common mq!".getBytes(),
                            // 消息標籤
                            "A"
                    );
                    // 設置屬性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 設置KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // 消息內容
                            "hello delay mq!".getBytes(),
                            // 消息標籤
                            "B"
                    );
                    // 設置屬性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定時消息, 定時時間為10s後
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步發送消息,只要不拋異常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步發送消息,只要不拋異常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {

    public static void main(String[] args) {

        MQClient mqClient = new MQClient(
                // 設置HTTP接入域名(此處以公共雲生產環境為例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
                "LTAIOZZg********",
                // SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,為空表示訂閱全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
            try {

                // 消費消息,輪訓時間設置為3秒,一次至多拉去三條消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
                    System.out.println("Message is not exist!");
                } else {
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回調刪除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}

參考鏈接

RocketMQ HTTP 協議 SDK

Leave a Reply

Your email address will not be published. Required fields are marked *