Step By Step
1、創建實例,登陸阿里雲控制檯
2、實例下面分別創建Topic和Http
Group
3、pom.xml
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.2</version>
</dependency>
4、Producer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class ProducerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 設置HTTP接入域名(此處以公共雲生產環境為例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
"LTAIOZZg********",
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
"v7CjUJCMk7j9aK****************"
);
// 所屬的 Topic
final String topic = "****";
// Topic所屬實例ID,默認實例為空
final String instanceId = "MQ_INST_********";
// 獲取Topic的生產者
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循環發送40條消息
for (int i = 0; i < 40; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// 普通消息
pubMsg = new TopicMessage(
// 消息內容
"hello common mq!".getBytes(),
// 消息標籤
"A"
);
// 設置屬性
pubMsg.getProperties().put("a", String.valueOf(i));
// 設置KEY
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// 消息內容
"hello delay mq!".getBytes(),
// 消息標籤
"B"
);
// 設置屬性
pubMsg.getProperties().put("b", String.valueOf(i));
// 定時消息, 定時時間為10s後
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// 同步發送消息,只要不拋異常就是成功
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步發送消息,只要不拋異常就是成功
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
5、Consumer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;
public class ConsumerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 設置HTTP接入域名(此處以公共雲生產環境為例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
"LTAIOZZg********",
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯創建
"v7CjUJCMk7j9aK****************"
);
String topicName = "****";
String consumer = "GID_****"; //Http Consumer Group Name
String messageTag =""; // Tag,為空表示訂閱全部Tag
String instanceId = "MQ_INST_******";
MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);
while(true) {
try {
// 消費消息,輪訓時間設置為3秒,一次至多拉去三條消息
List<Message> listMessage = mqConsumer.consumeMessage(3, 3);
if (listMessage == null || listMessage.size() == 0) {
System.out.println("Message is not exist!");
} else {
List<String> receiptHandles = new ArrayList<String>();
for (Message message : listMessage
) {
System.out.println("MessageBody" + message.getMessageBodyString());
receiptHandles.add(message.getReceiptHandle());
}
// 回調刪除
mqConsumer.ackMessage(receiptHandles);
}
}catch (Exception ex)
{
System.out.println("error:" + ex.getMessage());
mqClient.close();
}
}
}
}