作者:俏巴
概述
在使用阿里雲官方IoT JAVA Device SDK連接雲端測試的時候,發現日誌總是會打印一些莫名其妙Topic消息的訂閱和發佈,但是用戶並沒有操作這些Topic,這是因為SDK底層默認做了很多系統Topic的訂閱和發佈設置,且無法關閉,導致很多測試不能滿足預期的測試期望。如果不希望一些系統Topic的默認訂閱和發佈,建議可以使用開源MQTT Client進行Topic消息的訂閱和發佈。
操作步驟
1、創建產品和設備
參考:阿里雲物聯網平臺Qucik Start 創建產品和設備部分。
2、pom.xml
<dependencies>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
</dependencies>
3、工具類 AliyunIoTSignUtil
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Arrays;
import java.util.Map;
/**
* AliyunIoTSignUtil
*/
public class AliyunIoTSignUtil {
public static String sign(Map<String, String> params, String deviceSecret, String signMethod) {
//將參數Key按字典順序排序
String[] sortedKeys = params.keySet().toArray(new String[] {});
Arrays.sort(sortedKeys);
//生成規範化請求字符串
StringBuilder canonicalizedQueryString = new StringBuilder();
for (String key : sortedKeys) {
if ("sign".equalsIgnoreCase(key)) {
continue;
}
canonicalizedQueryString.append(key).append(params.get(key));
}
try {
String key = deviceSecret;
return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* HMACSHA1加密
*
*/
public static String encryptHMAC(String signMethod,String content, String key) throws Exception {
SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod);
Mac mac = Mac.getInstance(secretKey.getAlgorithm());
mac.init(secretKey);
byte[] data = mac.doFinal(content.getBytes("utf-8"));
return bytesToHexString(data);
}
public static final String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2) {
sb.append(0);
}
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}
}
4、main方法
import com.alibaba.taro.AliyunIoTSignUtil;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
public class IoTDemoPubSubDemo {
public static String productKey = "********";
public static String deviceName = "OpenMQTTDevice";
public static String deviceSecret = "********";
public static String regionId = "cn-shanghai";
// 物模型-屬性上報topic
private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post";
// 自定義topic,在產品Topic列表位置定義
private static String subTopic = "/"+productKey + "/" + deviceName+"/user/newdatademo";
private static MqttClient mqttClient;
public static void main(String [] args){
initAliyunIoTClient();
// ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1,
// new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build());
//
// scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS);
// 彙報屬性
postDeviceProperties();
try {
mqttClient.subscribe(subTopic); // 訂閱Topic
} catch (MqttException e) {
System.out.println("error:" + e.getMessage());
e.printStackTrace();
}
// 設置訂閱監聽
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connection Lost");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("Sub message");
System.out.println("Topic : " + s);
System.out.println(new String(mqttMessage.getPayload())); //打印輸出消息payLoad
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
}
/**
* 初始化 Client 對象
*/
private static void initAliyunIoTClient() {
try {
// 構造連接需要的參數
String clientId = "java" + System.currentTimeMillis();
Map<String, String> params = new HashMap<>(16);
params.put("productKey", productKey);
params.put("deviceName", deviceName);
params.put("clientId", clientId);
String timestamp = String.valueOf(System.currentTimeMillis());
params.put("timestamp", timestamp);
// cn-shanghai
String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883";
String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|";
String mqttUsername = deviceName + "&" + productKey;
String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1");
connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);
} catch (Exception e) {
System.out.println("initAliyunIoTClient error " + e.getMessage());
}
}
public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
// MQTT 3.1.1
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(false);
connOpts.setCleanSession(true);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(60);
mqttClient.connect(connOpts);
}
/**
* 彙報屬性
*/
private static void postDeviceProperties() {
try {
//上報數據
//高級版 物模型-屬性上報payload
System.out.println("上報屬性值");
String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}";
MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8"));
message.setQos(1);
mqttClient.publish(pubTopic, message);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
5、運行測試情況