物模型價值
物聯網元年
關鍵詞:探索、快速
2016年阿里雲物聯網平臺(前稱:物聯網套件)上線,為客戶設備上雲提供了通道能力,包括MQTT連接、消息流轉等核心功能。
第一批客戶大多基於該模式使用物聯網平臺能力,當時整個行業處於物聯網雲平臺起步期,包括AWS,Azure起步階段同樣只是提供通道能力。
基於通道能力,客戶使用物聯網平臺接入方式詳見文檔 https://developer.aliyun.com/article/746536。
這個階段的客戶大多是硬件廠商,軟硬一體開發,嘗試物聯網轉型提升設備價值,對物聯網平臺的訴求比較簡單,希望自己更多參與,對新模式有更多把控力,所以都會採用自定義協議上雲。
物聯網繁榮
關鍵詞:生態、擴展、數字化
近兩年物聯網設備、解決方案如雨後春筍般湧出,不少用戶希望趕上物聯網這波浪潮。這個階段的客戶不僅僅關注設備連雲,也開始關注圍繞設備產生的解決方案。因此客戶角色從硬件廠商,快速擴展到集成商、軟件提供商等。由於大量角色的進入,對軟硬開發解耦、易擴展的能力提出了訴求。同時我們也發現第一批使用通道能力的平臺客戶隨著自己業務發展、設備擴展,原來的架構已無法支撐,對物聯網平臺也提出了新的要求。
舉兩個典型場景:
- 老客戶升級:某個共享設備提供商,原來僅提供大學校園共享洗衣機服務,利用物聯網平臺通道能力上雲,隨著公司業務發展,從共享洗衣機業務擴展到校園淋浴、飲水機、充電樁等多類設備,原來自定義協議和API無法支撐多品類設備,難擴展。需要有一套接入標準和規範,方便快速擴展設備類型。
- 新生態客戶:某個充電樁平臺客戶,提供充電樁管理平臺,作為甲方要求大量樁企(乙方)按照平臺規範接入,典型的軟硬件分離場景。需要有一套接入標準和規範,方便快速擴展樁企規模。
這一階段平臺在通道能力之上,提供了物模型能力,物模型可以屏蔽底層設備差異,讓軟件開發者基於平臺提供的標準API開發;硬件開發者基於平臺提供的標準協議開發;從而達到軟硬開發解耦的目的。
物聯網賦能
關鍵詞:場景化、智能
物聯網終極目標一定是基於設備採集數據賦能業務,實現數字業務化。例如金融、物流、家居、餐飲、商場、醫療、交通等不同領域通過物聯網數字化後,結合數據分析智能化決策、互聯互通、場景規則、數字孿生等能力實現縱深領域場景化、智能化。
這一階段平臺在通道能力、物模型能力之上,還進一步提供設備智能運維、數據分析、可視化、數字孿生等高價值服務,幫助客戶數字化後產生真正的業務價值。
基於以上分析,物聯網已經過了最初的“元年”階段,也邁入了“繁榮”階段,正逐步朝“問物聯網要賦能”的階段演進。物模型是物聯網生態化、高擴展、數字化、智能化非常重要的基礎,強烈建議客戶使用。
物模型接入實踐
自定義接入模式
以一個老客戶為例,原來僅使用物聯網平臺通道能力,下圖中1~8流程都需要自定義開發,當客戶設備類型足夠簡單時,該模式複雜度通常不會成為客戶痛點。
面臨的挑戰
隨著客戶接入設備種類越來越多,面臨的擴展性問題也越來越嚴峻。
使用物模型後的模式
物模型模式下,設備與雲交互協議、雲平臺設備API都基於物模型標準化了,即使設備不斷擴展,客戶業務服務器和設備端邏輯都不需要進行調整,保證了擴展性。
物模型接入流程詳細介紹
流程圖
以下是客戶詳細接入流程,主要分為:雲端配置、設備開發、服務端開發、設備運行時管理四大部分。平臺會提供一些工具,使各部分流程更高效。接下來進行詳細介紹。
本文試圖手把手介紹從0到1接入物模型,還會配套介紹一些接入過程中有幫助的平臺能力,所以文章篇幅比較長,事實上客戶接入流程還是非常簡單的,真正開發只需要涉及到圖中紅色三個模塊。
如果您希望快速接入,可以直接關注P0部分,其它部分都可以跳過。
1 雲端配置
1.1 創建產品(P0)
1.登錄物聯網平臺。
2.創建產品。
說明:
• 所屬品類:標準品類庫提供了一些供參考的模板,選擇後可以修改,建議使用。
• 節點類型:根據實際選擇即可。
• 數據格式:“ICA標準數據格式(Alink JSON)”表示設備使用標準Alink JSON格式上報數據;“透傳/自定義”表示設備可以使用自定義格式,通過Alink特定Topic上報物聯網平臺,該模式客戶需要寫腳本進行轉換,透傳模式在此不做展開,後面單獨起文章介紹。
1.2 物模型建模(P0)
1.模型查看。
已有的模型是繼承自創建產品時選擇的“充電樁”品類模板。
2.編輯模型。
通過“編輯草稿”,進行修改和添加,最後需要對物模型“發佈上線”。
說明:
• 定義物模型非常重要,物模型通過屬性、事件、服務三要素描述了設備所有能力,設備和雲交互、客戶服務器訪問設備通過物模型都可以實現協議標準化。如果客戶定義的物模型如果足夠通用和專業,阿里可以幫助作為ICA行業標準進行推廣。
• 服務的調用方式有:同步調用、異步調用兩種模式。客戶雲端開發調用下行控制API,同步調用和異步調用獲取返回結果方式不一樣,在後文“3.3”章節詳細介紹。
物模型概念介紹
物模型介紹文檔請參見這裡。
瞭解物模型概念,能夠幫助您更好對設備建模。
1.3 物模型配置
當前默認是物模型強校驗模式,即設備上報數據在IoT平臺會進行物模型數據規範強校驗,如果不符合規範會報錯。
另外物模型弱校驗、免校驗、去重等規則也會在近期陸續開放,後期進行文檔補充。
配置之後,會在設備運行時生效。
關聯閱讀:4.2 物模型擴展規則校驗。
1.4 註冊三元組(P0)
1.註冊設備。
說明:
• 添加設備:測試階段使用較多,單個添加。
• 批量添加:量產階段使用,有兩種模式,“自動生成”表示設備標識符(deviceName)由平臺按照一定的規則隨機頒發;“批量上傳”支持客戶自定義設備標識符(deviceName)。
2.查看設備列表。
可以通過“設備列表”、“批次管理”兩種方式查看創建的設備列表。
通過“批次管理”查看這一批次設備詳情,並且支持下載三元組列表。
注意:此處設備標識符(deviceName)非常重要,與productKey, deviceSecret一起稱為設備的“三元組”,作為設備的唯一身份,大部分情況需要燒錄到設備上。
2 設備開發
2.1 使用設備SDK開發(P0)
設備接入SDK文檔請參見這裡。
根據需要選擇合適的語言版本。C SDK 建議使用“4.x”版本。
本文選擇 Java SDK進行演示。
環境準備:https://help.aliyun.com/document_detail/97331.html
物模型開發:https://help.aliyun.com/document_detail/97333.html
1.開發之前需要先準備如下好兩份數據:
- 設備證書信息(productKey、deviceName、deviceSecret)
- 設備物模型
為了方便查看物模型詳細數據規範,通過導出“物模型TSL”查看詳細物模型定義,其中包括物模型屬性、事件、服務標識符、參數、數據規範。抽取部分內容,針對以下屬性、事件、服務在DEMO中進行開發演示。
"schema":"https://iotx-tsl.oss-ap-southeast-1.aliyuncs.com/schema.json",
"profile":{
"productKey":"a1nhbEV****"
},
"properties":[
{
"identifier":"acOutMeterIty",
"name":"交流輸出電錶底值監測屬性",
"accessMode":"rw",
"required":false,
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
],
"events":[
{
"identifier":"post",
"name":"post",
"type":"info",
"required":true,
"desc":"屬性上報",
"method":"thing.event.property.post",
"outputData":[
{
"identifier":"acOutMeterIty",
"name":"交流輸出電錶底值監測屬性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
]
},
{
"identifier":"startChaResEvt",
"name":"啟動充電結果事件",
"type":"info",
"required":false,
"method":"thing.event.startChaResEvt.post",
"outputData":[
{
"identifier":"gunNum",
"name":"充電槍編號",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"100",
"step":"2"
}
}
}
]
}
],
"services":[
{
"identifier":"set",
"name":"set",
"required":true,
"callType":"async",
"desc":"屬性設置",
"method":"thing.service.property.set",
"inputData":[
{
"identifier":"acOutMeterIty",
"name":"交流輸出電錶底值監測屬性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
],
"outputData":[
]
},
{
"identifier":"get",
"name":"get",
"required":true,
"callType":"async",
"desc":"屬性獲取",
"method":"thing.service.property.get",
"inputData":[
"acOutMeterIty"
],
"outputData":[
{
"identifier":"acOutMeterIty",
"name":"交流輸出電錶底值監測屬性",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"200",
"step":"1"
}
}
}
]
},
{
"identifier":"startChaResService",
"name":"開啟充電",
"required":false,
"callType":"async",
"method":"thing.service.startChaResService",
"inputData":[
{
"identifier":"charm",
"name":"電量",
"dataType":{
"type":"int",
"specs":{
"min":"1",
"max":"100",
"step":"2"
}
}
}
],
"outputData":[
{
"identifier":"realcharm",
"name":"realcharm",
"dataType":{
"type":"int",
"specs":{
"min":"0",
"max":"100",
"step":"2"
}
}
}
]
}
]
}
2.開發代碼。
如下示例中只需要將三元組,和屬性、事件、服務參數替換成您的設備信息。其它代碼可以直接運行。
關於免訂閱能力介紹:
有些設備最資源比較敏感,為了避免初始化訂閱大量Alink協議中系統Topic帶來的性能開銷,平臺提供了免訂閱能力,即平臺幫設備進行Topic訂閱。
SDK只有3.1.0及以後版本支持免訂閱能力,並且默認打開該能力。
如果3.1.0及以後版本SDK您希望取消免訂閱,依舊按需訂閱Topic,可以設置SDK配置項關閉該能力,在make.settings中設置“FEATURE_MQTT_AUTO_SUBSCRIBE=n”。
public class Demo {
public static void main(String[] args) throws Exception {
String pk = "a1nhbEVCP**";
String dn = "7mBP6Dd6IT27Rt***";
String ds = "*****";
/**
* 連接 & 認證
*/
LinkKitInitParams params = new LinkKitInitParams();
// 設置 Mqtt 初始化參數
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
config.receiveOfflineMsg = false;
params.mqttClientConfig = config;
// 設置初始化三元組信息,用戶傳入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
public void onError(AError aError) {
System.out.println("===============FAILURE===============");
ALog.e(TAG, "Init Error error=" + aError);
System.out.println("===============FAILURE===============");
}
public void onInitDone(InitResult initResult) {
System.out.println("===============SUCCESS===============");
ALog.i(TAG, "onInitDone result=" + initResult);
System.out.println("===============SUCCESS===============");
}
});
//此處sleep 5S,由於上面init是異步流程
Thread.sleep(5000);
/**
* 物模型開發
*/
/**
* 上報屬性
*/
Map<String, ValueWrapper> properties = new HashMap<>();
// key為物模型中屬性標識符"acOutMeterIty",value需要遵循屬性值規範:int類型,取值範圍在0~200之間;
properties.put("acOutMeterIty", new ValueWrapper(10));
LinkKit.getInstance().getDeviceThing().thingPropertyPost(properties, new IPublishResourceListener() {
@Override
public void onSuccess(String s, Object o) {
System.out.println("=====thingPropertyPost success=======");
System.out.println(s);
System.out.println(JSON.toJSONString(o));
}
@Override
public void onError(String s, AError aError) {
System.out.println("=====thingPropertyPost failure=======");
}
});
// 上報屬性之後,雲端會返回響應結果,此處是監聽雲端返回的屬性reply
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public void onNotify(String s, String s1, AMessage aMessage) {
System.out.println("===PROPERTY REPLY===");
System.out.println("TOPIC:" + s1);
System.out.println("Payload:" + JSON.toJSONString(aMessage));
}
@Override
public boolean shouldHandle(String s, String s1) {
return false;
}
@Override
public void onConnectStateChange(String s, ConnectState connectState) {
}
});
/**
* 上報事件
*/
HashMap<String, ValueWrapper> eventMap = new HashMap<>();
// key為物模型中事件參數的標識符"gunNum", value為事件參數值需要遵循數值規範:int類型,取值範圍0~100之間;
eventMap.put("gunNum", new ValueWrapper.IntValueWrapper(50));
OutputParams eventOutput = new OutputParams(eventMap);
// 參數identity為"startChaResEvt"屬於物模型事件標識符。
LinkKit.getInstance().getDeviceThing().thingEventPost("startChaResEvt", eventOutput, new IPublishResourceListener() {
public void onSuccess(String resId, Object o) {
System.out.println("=====thingEventPost success=======");
System.out.println(resId);
System.out.println(JSON.toJSONString(o));
}
public void onError(String resId, AError aError) {
System.out.println("=====thingEventPost failure=======");
}
});
/**
* 監聽並執行下行服務
*/
// 獲取設備支持的所有服務
LinkKit.getInstance().getDeviceThing().getServices();
// 用戶可以根據實際情況註冊自己需要的服務的監聽器
List<Service> srviceList = LinkKit.getInstance().getDeviceThing().getServices();
for (int i = 0; srviceList != null && i < srviceList.size(); i++) {
Service service = srviceList.get(i);
LinkKit.getInstance().getDeviceThing().setServiceHandler(service.getIdentifier(), new ITResRequestHandler() {
public void onProcess(String identify, Object result, ITResResponseCallback itResResponseCallback) {
System.out.println("onProcess() called with: s = [" + identify + "], o = [" + result + "], itResResponseCallback = [" + itResResponseCallback + "]");
System.out.println("收到雲端異步服務調用 " + identify);
try {
/**
* 設置屬性(property)的模式
*/
// "set"為設置屬性默認的標識符
if ("set".equals(identify)) {
// TODO 用戶需要設置真實設備的的屬性
/**
* 向雲端同步設置好的屬性值
*/
Map<String, ValueWrapper> desiredProperty = (Map<String, ValueWrapper>) ((InputParams) result).getData();
LinkKit.getInstance().getDeviceThing().thingPropertyPost(desiredProperty, new IPublishResourceListener() {
@Override
public void onSuccess(String s, Object o) {
if (result instanceof InputParams) {
Map<String, ValueWrapper> data = (Map<String, ValueWrapper>) ((InputParams) result).getData();
// data.get()
ALog.d(TAG, "收到異步下行數據 " + data);
// 響應雲端 接收數據成功
itResResponseCallback.onComplete(identify, null, null);
} else {
itResResponseCallback.onComplete(identify, null, null);
}
}
@Override
public void onError(String s, AError aError) {
AError error = new AError();
error.setCode(100);
error.setMsg("setPropertyFailed.");
itResResponseCallback.onComplete(identify, new ErrorInfo(error), null);
}
});
/**
* 服務(service)的模式
*/
// "startChaResService"為服務的標識符
} else if ("startChaResService".equals(identify)) {
Map<String, ValueWrapper> inputParams = (Map<String, ValueWrapper>) ((InputParams) result).getData();
// TODO 根據服務入參inputParams執行設備邏輯,比如啟動充電
// 充電完成後,向雲端返回輸出參數
OutputParams outputParams = new OutputParams();
// key為"charm"屬於物模型中"startChaResService"服務出參標識符,value為出參值遵循數據規範:int類型,數據範圍1~100之間;
outputParams.put("charm", new ValueWrapper.IntValueWrapper(20));
itResResponseCallback.onComplete(identify, null, outputParams);
} else {
// 根據不同的服務做不同的處理,跟具體的服務有關係
OutputParams outputParams = new OutputParams();
// 根據特定服務,按照服務規範返回服務的出參。
itResResponseCallback.onComplete(identify, null, outputParams);
}
} catch (Exception e) {
e.printStackTrace();
ALog.d(TAG, "雲端返回數據格式異常");
}
}
public void onSuccess(Object o, OutputParams outputParams) {
ALog.d(TAG, "onSuccess() called with: o = [" + o + "], outputParams = [" + outputParams + "]");
ALog.d(TAG, "註冊服務成功");
}
public void onFail(Object o, ErrorInfo errorInfo) {
ALog.d(TAG, "onFail() called with: o = [" + o + "], errorInfo = [" + errorInfo + "]");
ALog.d(TAG, "註冊服務失敗");
}
});
}
}
}
說明:
• 上報屬性成功,雲端會返回REPLY,有以下日誌說明設備到雲,雲到設備的鏈路全部走通。
• 設備收到屬性設置指令,在完成物理設備屬性修改後,建議將最新屬性同步上報雲端。
2.2 不使用SDK開發
1.協議準備。
“2.1 使用設備SDK開發”介紹了使用阿里雲提供的SDK進行設備開發,當然您也可以選擇不使用SDK,完全基於Alink協議(設備和雲交互協議)開發。
Alink協議文檔:https://help.aliyun.com/document_detail/90459.html
重點關注物模型協議部分:https://help.aliyun.com/document_detail/89301.html 。裡面包含了物模型相關所有Topic介紹(物模型Topic列表在控制檯也可以查看,如下圖)。
文檔詳細介紹了設備端如何向雲端上報“屬性”、“事件”,如何訂閱雲端向下發送的“服務”指令。
Topic和Payload都基於客戶定義的物模型進行標準化和規範化,從而使得客戶設備與雲交互方式不會隨著設備類型變化而改變,滿足擴展性要求。
2.環境準備。
根據自己選型選擇合適的MQTT客戶端,本文選擇eclipse paho。
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.1</version>//可以選擇您需要的版本
</dependency>
3.開發。
物模型複用“2.1 使用設備SDK開發”中“開發前準備”給出的。
關於免訂閱能力介紹:
有些設備最資源比較敏感,為了避免初始化訂閱大量Alink協議中系統Topic帶來的性能開銷,平臺提供了免訂閱能力,即平臺幫設備進行Topic訂閱。
SDK只有3.1.0及以後版本支持免訂閱能力,並且默認打開該能力。
如果不使用SDK開發,可以通過設備端在MQTT的連接報文中的clientId部分, 新增_ss=1表示開啟自動訂閱, 建連成功後服務端會自動訂閱上以下表格中的topic, 若傳遞 _ss=0 或者不傳遞該字段, 則不會發生服務端自動訂閱動作。
4.上報屬性。
String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT2*****";
String deviceSecret = "****";
// MQTT連接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);
client.connect();
String setTopic = "/thing/event/property/post";
String setTopicReply = "/thing/event/property/post_reply";
// 上報屬性,雲端會返回REPLY,進行訂閱。(為了節省端側訂閱開銷,可以開通免訂閱)
// 此處client進行了封裝,您根據自己的業務進行封裝即可,也可以直接使用MQTT Client subscribe
client.sysTopic(setTopicReply).subscribe();
// 封裝Alink協議系統參數
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保證設備端一段時間內唯一
payload.put("params", params);
payload.put("method", "thing.event.property.post");
// 組裝屬性payload
String propKey = "acOutMeterIty";
int statusValue = 30;
Map<String, Object> proValue = new HashMap<>();
proValue.put("value", statusValue);
proValue.put("time", System.currentTimeMillis());
params.put(propKey, proValue);
// 上報(client進行了封裝,您根據自己的業務進行封裝即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));
// 打印雲端返回的Reply(client進行了封裝,您根據自己的業務進行封裝即可,也可以直接使用MQTT Client監聽訂閱消息)
client.sysTopic(setTopicReply).readTopic(10000);
client.disconnect();
日誌打印的設備請求和響應。
5.上報事件。
String productKey = "a1nhbEV****";
String deviceName = "7mBP6Dd6IT27*****";
String deviceSecret = "***";
// MQTT連接
MqttTestClient client;
client = new MqttTestClient(productKey, deviceName, deviceSecret);
client.connect();
// topic中為"startChaResEvt"屬於物模型事件標識符。
String setTopic = "/thing/event/startChaResEvt/post";
String setTopicReply = "/thing/event/startChaResEvt/post_reply";
// 報事件,雲端會返回REPLY,進行訂閱。(為了節省端側訂閱開銷,可以開通免訂閱)
client.sysTopic(setTopicReply).subscribe();
// 封裝Alink協議系統參數
Map<String, Object> payload = new HashMap<String, Object>();
Map<String, Object> params = new HashMap<String, Object>();
payload.put("id", 11);//id需要保證設備端一段時間內唯一
payload.put("params", params);
payload.put("method", "thing.event.startChaResEvt.post");
// 組裝屬性payload
Map<String, Object> dataValue = new HashMap<>();
// key為物模型中事件參數的標識符"gunNum", value為事件參數值需要遵循數值規範:int類型,取值範圍0~100之間;
dataValue.put("gunNum", 59);
params.put("value", dataValue);
params.put("time", System.currentTimeMillis());
// 上報(client進行了封裝,您根據自己的業務進行封裝即可,也可以直接使用MQTT Client publish消息)
client.sysTopic(setTopic).publish(JSON.toJSONString(payload));
// 打印雲端返回的Reply(client進行了封裝,您根據自己的業務進行封裝即可,也可以直接使用MQTT Client監聽訂閱消息)
client.sysTopic(setTopicReply).readTopic(10000);
client.disconnect();
6.服務調用。
此處為一段偽代碼。可以在MQTT建連的時候通過callback監聽雲端下發的控制指令或消息。
前提:已經對下行的TOPIC進行訂閱過,免訂閱能力參考上面介紹。
mqttClient = new MqttClient(url, clientId, persistence);
final MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setMqttVersion(4);
connOpts.setAutomaticReconnect(true);
connOpts.setCleanSession(false);
connOpts.setUserName(mqttUsername);
connOpts.setPassword(mqttPassword.toCharArray());
connOpts.setKeepAliveInterval(65);
LogUtil.log(clientId + "進行連接, 目的地: " + url);
// 此處訂閱雲端下發的消息
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
LogUtil.log("connection lost, cause:" + cause);
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
TopicChannel topicChannel = getTopic(topic);
LogUtil.log("receive message, channel:" + topicChannel
+ ",topic:" + topic
+ ", payload:" + new String(message.getPayload(), "UTF-8") + "");
topicChannel.put(message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//如果是qos 0消息 token.resp是沒有回覆的
LogUtil.log("sent, " + ((token == null || token.getResponse() == null) ? "null"
: token.getResponse().getKey()));
}
});
mqttClient.connect(connOpts);
重點說明:
• 所有被訂閱的下行Topic都會被監聽到。物模型相關的主要包括:屬性上報Reply、屬性下行設置、服務下行控制。
• 設置設備屬性(https://help.aliyun.com/document_detail/89301.html#title-wmh-y2e-18r),默認異步方式返回結果。
• 訂閱的Topic為Alink協議標準Topic:“/sys/{productKey}/{deviceName}/thing/service/property/set”
• 服務控制(https://help.aliyun.com/document_detail/89301.html#title-3pt-nfy-jys),同異步方式取決於物模型中service配置的調用模式。
• 服務異步方式訂閱的Topic為Alink協議標準Topic:“/sys/{productKey}/{deviceName}/thing/service/{tsl.service.identifier}”
• 服務同步方式訂閱的Topic需要遵循RRPC Topic模式:詳見文檔https://help.aliyun.com/document_detail/90568.html
注意:僅設備側需要感知RRPC特殊TOPIC,設備上雲後,數據流轉、開放API面向的還是Alink協議編程。
2.3 在線調試
設備開發後之後,如何快速模擬業務服務器給設備下發指令,調試設備能力?IoT平臺提供了“在線調試”的功能,可以模擬設備或模擬應用端到端調試。
此處使用“在線調試”裡面“調試真實設備”能力。通過控制檯下發設備控制指令,分兩類:1)屬性設置;2)服務調用。
1.服務調用調試。
雲端下發後,可以到設備端查看控制Log是否打印,以判斷指令達到端側。
從圖中可見設備收到startChaResService服務,同時向雲端返回了輸出參數。
2.屬性設置調試。
說明:
• “獲取”:暫不支持到設備,只能從雲端獲取設備最新屬性。
• “設置”:指令直接到設備端,設備修改本地屬性之後,上報雲端最新屬性;到設備上的設置指令為"set"。
• “設置期望值”:如果設備在線,會直接下發設備,如果設備離線,指令在雲端進行緩存,待上線後下發設備端,下發之後,設備修改本地屬性之後,同樣上報雲端最新屬性;到設備上的設置指令同樣為"set"。如果您希望使用物模型期望值能力,可點擊查看最佳實踐。
雲端下發後,可以到設備端查看控制Log是否打印,以判斷指令達到端側。
從圖中可見設備收到set指令,返回了服務響應,同時向雲端上報了最新屬性。
說明:服務結果還可以通過“2.4 查看物模型數據”章節中獲取。
2.4 查看物模型數據
DEMO運行之後,可以看到設備已經“在線”狀態。
“運行狀態”展示設備上報的屬性值;
“事件管理”展示設備上報的事件;
“服務調用”展示雲端下發設備的控制服務;
上報屬性結構化展示。
上報事件,包括事件參數展示。
屬性設置、服務調用兩類服務的雲端下發入參、設備響應出參都有展示,如上證明設備收到雲端指令,並且正常返回響應。
2.5 查看日誌服務
設備在運行過程,可能會出現一些異常,比如連接失敗、認證失敗、數據異常等等,為了便於排查,可以查看日誌服務。舉例設備上報數據可能會不符合物模型規範,比如事件參數"gunNum"對應值的數據範圍為0~100之間,而真實上報了50000。日誌服務會展示設備錯誤詳情。
可以看到日誌內容為“{"Reason":"tsl parse: int value is bigger than max 100 -> gunNum"}”,說明gunNum對應值超過物模型規範最大值100的限制。物模型規範詳情到“物模型TSL”查看。
同時可以通過“日誌轉儲”中“日誌報表”進一步查看設備大盤,包括設備上下線次數、設備上線IP區域分佈、設備消息量、設備消息量Top列表、物模型錯誤分佈、雲端API錯誤分佈等多維度指標。
日誌服務介紹文檔請參見這裡。
3 服務端開發
設備連接到阿里雲IoT平臺,設備數據會保存在IoT平臺時序數據庫。同時IoT平臺提供兩種方式供客戶獲取設備數據:方式1)通過服務端訂閱或者規則引擎實時流轉到客戶服務器;2)通過開放API供客戶調用獲取。
3.1 服務端調用API開發(P0)
1.環境準備。
SDK下載文檔:https://help.aliyun.com/document_detail/30581.html
API接口列表:https://help.aliyun.com/document_detail/69579.html
重點關注物模型使用相關API
2.以下示例為設置設備屬性API,設備異步返回結果,客戶需要通過“數據流轉”方式獲取。
String accessKey = "***";
String accessSecret = "***";
try {
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
} catch (Exception e) {
System.out.println("DefaultProfile exception");
}
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKey, accessSecret);
DefaultAcsClient defaultAcsClient = new DefaultAcsClient(profile);
SetDevicePropertyRequest setDevicePropertyRequest = new SetDevicePropertyRequest();
// 如果使用實例,此處傳入真實實例id;如果公共實例,不需要設置。
//createProductRequest.setIotInstanceId("iothub-test-xxx");
setDevicePropertyRequest.setProductKey(pk);
setDevicePropertyRequest.setDeviceName(dn);
Map<String, Integer> properties = new HashMap<>();
// key為物模型中屬性標識符"acOutMeterIty",value需要遵循屬性值規範:int類型,取值範圍在0~200之間;
properties.put("acOutMeterIty", 98);
setDevicePropertyRequest.setItems(JSON.toJSONString(properties));
SetDevicePropertyResponse response = null;
try {
response = defaultAcsClient.getAcsResponse(setDevicePropertyRequest);
} catch (Exception e) {
Log.error("執行失敗:e:" + e.getMessage());
}
System.out.println("===============");
System.out.println("setDeviceProperty request : " + JSON.toJSONString(setDevicePropertyRequest));
System.out.println("setDeviceProperty response : " + JSON.toJSONString(response.getData()));
System.out.println("setDeviceProperty requestId : " + response.getRequestId());
System.out.println("===============");
重點說明:
下行控制如果為異步服務,需要通過訂閱數據流轉獲取設備返回結果,訂閱方式和數據結構詳見“3.2 數據流轉”章節介紹。
關聯介紹:“3.2.1 服務端訂閱”中“重點說明”。
3.2 數據流轉
平臺提供兩種數據流轉方式:方式1)服務端訂閱;方式2)規則引擎;
3.2.1服務端訂閱(P0)
服務端訂閱配置
“推送消息類型”選擇“設備上報消息”,包括物模型屬性上報、事件上報、設備下行指令結果(包括屬性設置響應、服務控制響應)等消息。
消息格式詳見文檔:https://help.aliyun.com/document_detail/73736.html
服務端訂閱DEMO
接入說明:https://help.aliyun.com/document_detail/143601.html
/**
* AMQP服務端訂閱
*/
//參數說明,請參見AMQP客戶端接入說明文檔。
String accessKey = "***";
String accessSecret = "***";
String consumerGroupId = "***";
//iotInstanceId:購買的實例請填寫實例ID,公共實例請填空字符串""。
String iotInstanceId = "";
long timeStamp = System.currentTimeMillis();
//簽名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//控制檯服務端訂閱中消費組狀態頁客戶端ID一欄將顯示clientId參數。
//建議使用機器UUID、MAC地址、IP等唯一標識等作為clientId。便於您區分識別不同的客戶端。
String clientId = "TESTClientID";
//userName組裝方法,請參見AMQP客戶端接入說明文檔。
String userName = clientId + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + accessKey
+ ",iotInstanceId=" + iotInstanceId
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
String signContent = "authId=" + accessKey + "×tamp=" + timeStamp;
String password = doSign(signContent,accessSecret, signMethod);
//接入域名,請參見AMQP客戶端接入說明文檔。
String connectionUrl = "amqps://${uid}.iot-amqp.${regionId}.aliyuncs.com:5671?amqp.idleTimeout=80000";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF",connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
// Create Connection
Connection connection = cf.createConnection(userName, password);
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息後,需要手動調用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//1.收到消息之後一定要ACK。
// 推薦做法:創建Session選擇Session.AUTO_ACKNOWLEDGE,這裡會自動ACK。
// 其他做法:創建Session選擇Session.CLIENT_ACKNOWLEDGE,這裡一定要調message.acknowledge()來ACK。
// message.acknowledge();
//2.建議異步處理收到的消息,確保onMessage函數裡沒有耗時邏輯。
// 如果業務處理耗時過程過長阻塞住線程,可能會影響SDK收到消息後的正常回調。
executorService.submit(() -> processMessage(message));
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
/**
* 在這裡處理您收到消息後的具體業務邏輯。
*/
private static void processMessage(Message message) {
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
System.out.println("AMQP receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
} catch (Exception e) {
logger.error("processMessage occurs error ", e);
}
}
private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
/**
* 連接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 嘗試過最大重試次數之後,最終連接失敗。
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 連接中斷。
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 連接中斷後又自動重連上。
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
@Override
public void onSessionClosed(Session session, Throwable cause) {}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {}
};
/**
* 計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
}
日誌打印出訂閱到的流轉消息如下,符合預期。
重點說明:
下行控制如果為異步服務,需要通過訂閱數據流轉獲取設備返回結果。訂閱Topic為"/sys/{productKey}/{deviceName}/thing/downlink/reply/message",需要根據"requestId"關聯請求和響應。
關聯介紹:“3.1 服務端調用API開發”中“重點說明”。
3.2.2 規則引擎數據訂閱。
配置SQL
SQL介紹文檔這裡。
調試SQL
Payload數據格式文檔這裡。
可以查看“調試結果”
符合配置的SQL結果。
轉發數據
可以轉發到客戶以下多種雲產品中,本文選擇AMQP作為示例驗證。
創建完成後,需要到規則列表頁“啟動”改規則。
訂閱數據
服務端訂閱代碼可以複用上面“3.1”服務端訂閱代碼。差別就是服務端訂閱,訂閱的是Topic對應的完整Payload;而規則引擎流轉AMQP,在消息流轉過程可以對Payload做一些規則過濾或簡單計算。
以下日誌精簡報文是通過規則引擎過濾後獲取的數據。
說明:同一組數據不要同時開通規則引擎和服務端訂閱兩種訂閱模式,避免消息干擾。
4 設備運行時
設備量產之後,到達消費者手上,會開始激活上線進入到設備運行時。由於不屬於開發態流程,本章節僅做簡單介紹,目的是能讓開發者知道開發態的配置在運行態如何產生作用,對設備接上阿里雲IoT平臺後的流程有個簡單的認識。
本文通過物模型接入流程,介紹了平臺設備連接、物模型規範校驗、物模型數據、規則引擎、服務端訂閱、開放API六大基礎能力。
設備全生命週期過程中,還有不少設備管理能力供客戶選擇,其中包括設備標籤、設備分組、設備檢索、OTA、設備運維、設備分發、文件上傳、遠程配置等,歡迎使用。
4.1 連接
設備連接過程,雲端會對設備進行身份認證。
4.2 物模型規範校驗
由於目前物模型配置僅提供強校驗模式,物模型規範校驗主要對設備上報的報文進行Alink協議解析、物模型數據規範校驗。平臺後續會陸續開放弱校驗、免校驗、數據去重能力。
關聯閱讀:1.3 物模型配置
4.3 設備管理能力
4.3.1 設備標籤
介紹文檔:https://help.aliyun.com/document_detail/73733.html
4.3.2 設備分組
介紹文檔:https://help.aliyun.com/document_detail/90386.html
4.3.3 OTA
介紹文檔:https://help.aliyun.com/document_detail/85700.html
4.3.4 設備分發
介紹文檔:https://help.aliyun.com/document_detail/143450.html