大數據

阿里雲物聯網平臺數據轉發到函數計算示例

作者:俏巴

概述

使用物聯網平臺規則引擎的數據流轉功能,可將Topic中的數據消息轉發至其他Topic或其他阿里雲產品進行存儲或處理。本文主要演示通過規則引擎將設備上行消息流轉到函數計算,並通過函數計算髮送消息到釘釘機器人。

Step By Step

產品及設備準備


1、創建產品
_

2、定義物模型
_

3、添加設備
_

_

4、使用SDK 上行消息,參考鏈接:基於開源JAVA MQTT Client連接阿里雲IoT

import com.alibaba.taro.AliyunIoTSignUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IoTDemoPubSubDemo {

<span class="hljs-comment">// 設備三元組信息</span>
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> productKey = <span class="hljs-string">"a16MX********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceName = <span class="hljs-string">"device1"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> deviceSecret = <span class="hljs-string">"YGLHxUr40E1JaWhk3IVAm0uk********"</span>;
public <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> regionId = <span class="hljs-string">"cn-shanghai"</span>;

<span class="hljs-comment">// 物模型-屬性上報topic</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> pubTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post"</span>;
<span class="hljs-comment">// 自定義topic,在產品Topic列表位置定義</span>
private <span class="hljs-keyword">static</span> <span class="hljs-built_in">String</span> subTopic = <span class="hljs-string">"/sys/"</span> + productKey + <span class="hljs-string">"/"</span> + deviceName + <span class="hljs-string">"/thing/event/property/post_reply"</span>;

private <span class="hljs-keyword">static</span> MqttClient mqttClient;

public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> main(<span class="hljs-built_in">String</span> [] args){

    initAliyunIoTClient();
    ScheduledExecutorService scheduledThreadPool = <span class="hljs-keyword">new</span> ScheduledThreadPoolExecutor(<span class="hljs-number">1</span>,
            <span class="hljs-keyword">new</span> ThreadFactoryBuilder().setNameFormat(<span class="hljs-string">"thread-runner-%d"</span>).build());

    scheduledThreadPool.scheduleAtFixedRate(()-&gt;postDeviceProperties(), <span class="hljs-number">10</span>,<span class="hljs-number">5</span>, TimeUnit.SECONDS);

    <span class="hljs-keyword">try</span> {
        mqttClient.subscribe(subTopic); <span class="hljs-comment">// 訂閱Topic</span>
    } <span class="hljs-keyword">catch</span> (MqttException e) {
        System.out.println(<span class="hljs-string">"error:"</span> + e.getMessage());
        e.printStackTrace();
    }

    <span class="hljs-comment">// 設置訂閱監聽</span>
    mqttClient.setCallback(<span class="hljs-keyword">new</span> MqttCallback() {
        @Override
        public <span class="hljs-keyword">void</span> connectionLost(Throwable throwable) {
            System.out.println(<span class="hljs-string">"connection Lost"</span>);

        }

        @Override
        public <span class="hljs-keyword">void</span> messageArrived(<span class="hljs-built_in">String</span> s, MqttMessage mqttMessage) throws Exception {
            System.out.println(<span class="hljs-string">"Sub message"</span>);
            System.out.println(<span class="hljs-string">"Topic : "</span> + s);
            System.out.println(<span class="hljs-keyword">new</span> <span class="hljs-built_in">String</span>(mqttMessage.getPayload())); <span class="hljs-comment">//打印輸出消息payLoad</span>
        }

        @Override
        public <span class="hljs-keyword">void</span> deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }
    });

}

<span class="hljs-comment">/**
 * 初始化 Client 對象
 */</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> initAliyunIoTClient() {

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">// 構造連接需要的參數</span>
        <span class="hljs-built_in">String</span> clientId = <span class="hljs-string">"java"</span> + System.currentTimeMillis();
        <span class="hljs-built_in">Map</span>&lt;<span class="hljs-built_in">String</span>, <span class="hljs-built_in">String</span>&gt; params = <span class="hljs-keyword">new</span> HashMap&lt;&gt;(<span class="hljs-number">16</span>);
        params.put(<span class="hljs-string">"productKey"</span>, productKey);
        params.put(<span class="hljs-string">"deviceName"</span>, deviceName);
        params.put(<span class="hljs-string">"clientId"</span>, clientId);
        <span class="hljs-built_in">String</span> timestamp = <span class="hljs-built_in">String</span>.valueOf(System.currentTimeMillis());
        params.put(<span class="hljs-string">"timestamp"</span>, timestamp);
        <span class="hljs-comment">// cn-shanghai</span>
        <span class="hljs-built_in">String</span> targetServer = <span class="hljs-string">"tcp://"</span> + productKey + <span class="hljs-string">".iot-as-mqtt."</span>+regionId+<span class="hljs-string">".aliyuncs.com:1883"</span>;

        <span class="hljs-built_in">String</span> mqttclientId = clientId + <span class="hljs-string">"|securemode=3,signmethod=hmacsha1,timestamp="</span> + timestamp + <span class="hljs-string">"|"</span>;
        <span class="hljs-built_in">String</span> mqttUsername = deviceName + <span class="hljs-string">"&amp;"</span> + productKey;
        <span class="hljs-built_in">String</span> mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, <span class="hljs-string">"hmacsha1"</span>);

        connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword);

    } <span class="hljs-keyword">catch</span> (Exception e) {
        System.out.println(<span class="hljs-string">"initAliyunIoTClient error "</span> + e.getMessage());
    }
}

public <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connectMqtt(<span class="hljs-built_in">String</span> url, <span class="hljs-built_in">String</span> clientId, <span class="hljs-built_in">String</span> mqttUsername, <span class="hljs-built_in">String</span> mqttPassword) throws Exception {

    MemoryPersistence persistence = <span class="hljs-keyword">new</span> MemoryPersistence();
    mqttClient = <span class="hljs-keyword">new</span> MqttClient(url, clientId, persistence);
    MqttConnectOptions connOpts = <span class="hljs-keyword">new</span> MqttConnectOptions();
    <span class="hljs-comment">// MQTT 3.1.1</span>
    connOpts.setMqttVersion(<span class="hljs-number">4</span>);
    connOpts.setAutomaticReconnect(<span class="hljs-literal">false</span>);

// connOpts.setCleanSession(true);

    connOpts.setCleanSession(<span class="hljs-literal">false</span>);

    connOpts.setUserName(mqttUsername);
    connOpts.setPassword(mqttPassword.toCharArray());
    connOpts.setKeepAliveInterval(<span class="hljs-number">60</span>);

    mqttClient.connect(connOpts);
}

<span class="hljs-comment">/**
 * 彙報屬性
 */</span>
private <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> postDeviceProperties() {

    <span class="hljs-keyword">try</span> {
        <span class="hljs-comment">//上報數據</span>
        <span class="hljs-comment">//高級版 物模型-屬性上報payload</span>
        System.out.println(<span class="hljs-string">"上報屬性值"</span>);
        <span class="hljs-built_in">String</span> payloadJson = <span class="hljs-string">"{\"params\":{\"CurrentTemperature\":13,\"Humidity\":10}}"</span>;
        MqttMessage message = <span class="hljs-keyword">new</span> MqttMessage(payloadJson.getBytes(<span class="hljs-string">"utf-8"</span>));
        message.setQos(<span class="hljs-number">1</span>);
        mqttClient.publish(pubTopic, message);
    } <span class="hljs-keyword">catch</span> (Exception e) {
        System.out.println(e.getMessage());
    }
}

}

5、運行狀態查看
_


函數計算創建與配置

1、創建應用
_

_

2、應用下面添加函數

_

_

3、編輯腳本

const https = require('https');

const accessToken = '填寫accessToken,即釘釘機器人webhook的accessToken';
module.exports.handler = function(event, context, callback) {
var eventJson = JSON.parse(event.toString());
console.log(event.toString());
//釘釘消息格式
const postData = JSON.stringify({
"msgtype": "markdown",
"markdown": {
"title": "設備溫溼度傳感器",
"text": "#### 溫溼度傳感器上報n" +
"> 設備名稱:" + eventJson.deviceName+ "nn" +
"> 實時溫度:" + eventJson.Temperature + "℃nn" +
"> 相對溼度:" + eventJson.Humidity + "%nn" +
"> ###### " + eventJson.time + " 發佈 by 物聯網平臺 n"
},
"at": {
"isAtAll": false
}
});
const options = {
hostname: 'oapi.dingtalk.com',
port: 443,
path: '/robot/send?access_token=' + accessToken,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData)
}
};
const req = https.request(options, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {});
res.on('end', () => {
callback(null, 'success');
});
});
// 異常返回
req.on('error', (e) => {
callback(e);
});
// 寫入數據
req.write(postData);
req.end();
};

釘釘機器人webhook的accessToken獲取參考鏈接:阿里雲IoT Studio服務開發定時關燈功能示例Demo: 2.3 釘釘機器人Webhook獲取 部分。

4、快速測試
_

_


規則引擎配置

1、創建規則引擎
_

2、配置處理數據

_

SQL字段

deviceName() as deviceName, items.Humidity.value as Humidity, items.CurrentTemperature.value as Temperature, timestamp('yyyy-MM-dd HH:mm:ss') as time

3、配置轉發數據

_

4、啟動設備端SDK,週期性上行消息,釘釘群查看通知

_

5、上行日誌查看

_

參考鏈接

溫溼度計上報數據到釘釘群機器人

Leave a Reply

Your email address will not be published. Required fields are marked *