資安

阿里雲微服務消息隊列(MQTT For IoT)使用Demo

Step By Step

服務使用大圖

圖片.png

  • 1、不同設備通過SDK和平臺側建立連接,實現設備與平臺側的交互通信;
  • 2、通過規則流轉功能,將設備上報的消息流轉到MQ Topic,也可以通過MQ Topic向MQTT Topic下發消息;
  • 3、基於Server端管控API,實現消息的直接下發、設備在線狀態查詢以及Group的創建。
一、MQTT Server創建(公網區域)

1、創建實例
圖片.png

圖片.png

2、Topic和Group創建
圖片.png

圖片.png

圖片.png

二、設備端Java Code Sample

1、pom.xml

 <dependencies>
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
                <version>1.10</version>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.2</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.48</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
                <version>1.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun</groupId>
                <artifactId>aliyun-java-sdk-core</artifactId>
                <version>4.5.0</version>
            </dependency>
        </dependencies>

2、Device Code Sample

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {

    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 實例 ID,購買後控制檯獲取
         */
        String instanceId = "post-cn-n6w*********";
        /**
         * 接入點地址,購買 MQ4IOT 實例,且配置完成後即可獲取,接入點地址必須填寫分配的域名,不得使用 IP 地址直接連接,否則可能會導致客戶端異常。
         */
        String endPoint = "post-cn-n6w********.mqtt.aliyuncs.com";
        /**
         * 賬號 accesskey,從賬號系統控制檯獲取
         */
        String accessKey = "LTAIOZZg********";
        /**
         * 賬號 secretKey,從賬號系統控制檯獲取,僅在Signature鑑權模式下需要設置
         */
        String secretKey = "v7CjUJCMk7j9aK****************";
        /**
         * MQ4IOT clientId,由業務系統分配,需要保證每個 tcp 連接都不一樣,保證全局唯一,如果不同的客戶端對象(tcp 連接)使用了相同的 clientId 會導致連接異常斷開。
         * clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制檯申請,DeviceId 由業務方自己設置,clientId 總長度不得超過64個字符。
         */
        String clientId = "GID_MQTT_Client1@@@device1";
        /**
         * MQ4IOT 消息的一級 topic,需要在控制檯申請才能使用。
         * 如果使用了沒有申請或者沒有被授權的 topic 會導致鑑權失敗,服務端會斷開客戶端連接。
         */
        final String parentTopic = "MQTT_Topic";
        /**
         * MQ4IOT支持子級 topic,用來做自定義的過濾,此處為示意,可以填寫任何字符串,具體參考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         * 需要注意的是,完整的 topic 長度不得超過128個字符。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數代表傳輸質量,可選0,1,2,根據實際需求合理設置,具體參考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客戶端使用的協議和端口必須匹配,具體參考文檔 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密則設置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客戶端設置好發送超時時間,防止無限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客戶端連接成功後就需要儘快訂閱需要的 topic
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
                 * 消費消息需要保證在規定時間內完成,如果消費耗時超過服務端約定的超時時間,對於可靠傳輸的模式,服務端可能會重試推送,業務需要做好冪等去重處理。超時時間約定參考限制
                 * https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj
                 */
                System.out.println(
                        "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 1; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發送普通消息時,topic 必須和接收方訂閱的 topic 一致,或者符合通配符匹配規則
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * MQ4IoT支持點對點消息,即如果發送方明確知道該消息只需要給特定的一個設備接收,且知道對端的 clientId,則可以直接發送點對點消息。
             * 點對點消息不需要經過訂閱關係匹配,可以簡化訂閱方的邏輯。點對點消息的 topic 格式規範是  {{parentTopic}}/p2p/{{targetClientId}}
             */
            final String p2pSendTopic = parentTopic + "/p2p/" + clientId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

工具類:util

3、測試效果
圖片.png

4、消息流轉軌跡查詢
圖片.png

三、規則流轉測試

1、MQ創建三個不同類型的Topic
圖片.png

2、創建Group,用於MQ側消費消息
圖片.png

3、MQTT側配置流轉規則
圖片.png

圖片.png

圖片.png

圖片.png

4、MQ側代碼測試

4.1 pom.xml

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.7.1.Final</version>
        </dependency>

4.2 Code Sample

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;

public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制檯創建的 Group ID。
        properties.put(PropertyKeyConst.GROUP_ID, "GID_MessageConsumer");
        // AccessKey ID 阿里雲身份驗證,在阿里雲 RAM 控制檯創建。
        properties.put(PropertyKeyConst.AccessKey, "LTAIOZZg********");
        // Accesskey Secret 阿里雲身份驗證,在阿里雲服 RAM 控制檯創建。
        properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
        // 設置 TCP 接入域名,進入控制檯的實例詳情頁面的 TCP 協議客戶端接入點區域查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_***************_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集群訂閱方式 (默認)。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 廣播訂閱方式。
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

        Consumer consumer = ONSFactory.createConsumer(properties);
        //1、訂閱設備上行消息
        consumer.subscribe("MessageFromMQTT", "*", new MessageListener() { //訂閱多個 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        //2、訂閱設備上下線消息
        consumer.subscribe("DevcieOnlineAndOffline", "*", new MessageListener() { //訂閱全部 Tag。
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });

        consumer.start();
        System.out.println("Consumer Started");
    }
}

4.3 測試效果(先啟動消費端,然後設備端上行消息)

圖片.png

4.4 通過MQ發送消息到MQTT

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;

public class SendMQMessageToMQTT {

    public static void main(String[] args) {

            Properties properties = new Properties();
            // AccessKeyId 阿里雲身份驗證,在阿里雲用戶信息管理控制檯獲取。
            properties.put(PropertyKeyConst.AccessKey,"LTAIOZZg**********");
            // AccessKeySecret 阿里雲身份驗證,在阿里雲用戶信息管理控制檯獲取。
            properties.put(PropertyKeyConst.SecretKey, "v7CjUJCMk7j9aK****************");
            //設置發送超時時間,單位毫秒。
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 設置 TCP 接入域名,進入控制檯的實例詳情頁面的 TCP 協議客戶端接入點區域查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_********_BcLPQ2p0.mq-internet-access.mq-internet.aliyuncs.com:80");
            Producer producer = ONSFactory.createProducer(properties);

            // mqttSecondTopic:https://help.aliyun.com/document_detail/112971.html?spm=a2c4g.11186623.6.579.403242ca4pOcpC
            properties.put("mqttSecondTopic","testMq4Iot");

            // 在發送消息前,必須調用 start 方法來啟動 Producer,只需調用一次即可。
            producer.start();

            //循環發送消息。
            for (int i = 0; i < 1; i++){
                Message msg = new Message("MessageToMQTT","","MQ Message To MQTT".getBytes());
                msg.setKey("ORDERID_" + i);
                msg.setUserProperties(properties);

                try {
                    SendResult sendResult = producer.send(msg);
                    // 同步發送消息,只要不拋異常就是成功。
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                    }
                }
                catch (Exception e) {
                    // 消息發送失敗,需要進行重試處理,可重新發送這條消息或持久化這條數據進行補償處理。
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                    e.printStackTrace();
                }
            }

            // 在應用退出前,銷燬 Producer 對象。
            // 注意:如果不銷燬也沒有問題。
            producer.shutdown();
        }

    }

4.5 The Result

圖片.png

圖片.png

4.6 消息軌跡查詢

圖片.png

四、MQTT雲端API測試

1、pom.xml

        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.6</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.4</version>
        </dependency>

2、發送消息Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class SendMessage {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        SendMessageRequest request = new SendMessageRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setPayload("message from manager api!");
        request.setMqttTopic("MQTT_Topic/testMq4Iot");

        try {
            SendMessageResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }
    }
}

3、測試效果
圖片.png
圖片.png

4、查詢設備狀態Code Sample

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.profile.DefaultProfile;
import com.google.gson.Gson;
import com.aliyuncs.onsmqtt.model.v20200420.*;

public class QuerySessionByClientId {

    public static void main(String[] args) {
        DefaultProfile profile = DefaultProfile.getProfile("mq-internet-access", "LTAIOZZg********", "v7CjUJCMk7j9aK****************");
        IAcsClient client = new DefaultAcsClient(profile);

        QuerySessionByClientIdRequest request = new QuerySessionByClientIdRequest();
        request.setRegionId("mq-internet-access");
        request.setInstanceId("post-cn-n6w********");
        request.setClientId("GID_MQTT_Client1@@@device1");

        try {
            QuerySessionByClientIdResponse response = client.getAcsResponse(request);
            System.out.println(new Gson().toJson(response));
        } catch (ServerException e) {
            e.printStackTrace();
        } catch (ClientException e) {
            System.out.println("ErrCode:" + e.getErrCode());
            System.out.println("ErrMsg:" + e.getErrMsg());
            System.out.println("RequestId:" + e.getRequestId());
        }

    }
}

5、測試效果
圖片.png

更多參考

QuerySessionByClientId
快速使用 MQTT 的 Java SDK 收發消息(跨產品數據流入)
MQ發送普通消息
MQ訂閱消息
阿里雲微服務消息隊列Token C# 設備端示例Demo
阿里雲微服務消息隊列Token Java Code Sample
阿里雲微服務消息隊列Token C# Code Sample

Leave a Reply

Your email address will not be published. Required fields are marked *