由于本人在做物联网方面的数据采集与展示工作,需要用到 jetlinks 平台,之前的数据采集与传输一直使用的是 http 协议与 websocket 协议,而 jetlinks 社区版不支持 http 与 websocket 协议,于是将之前的 nanopc 数据发送协议修改为 mqtt 协议。通过修改与调试,现已能成功通过程序自动上报相关数据,特此记录。同时使用 jetlinks 平台自带的通知模板实现了超阈值自动报警等功能。但 mqtt 协议与 jetlinks 平台使用尚不熟练,部分功能暂无实现,希望后期能进一步了解其原理。

1. 使用javascript实现与平台通信

var mqtt = require('mqtt')
var md5 = require('md5-node')
// 用户设置的账号与密码
var secureId= 'ahau'
var secureKey='ahau'
var now = new Date().getTime(); //当前时间戳
// 转换账号与密码
var username_1 = secureId+"|"+now; // 拼接用户密码
var password_1 = md5(username_1+"|"+secureKey); //使用md5生成摘要
const options = {
    clean: true, // 保留回话
    connectTimeout: 4000, // 超时时间
    // 认证信息
    clientId: 'ZN_001_001',
    username: username_1,
    password: password_1,
    }
  }
//   http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html

// 设置链接的mqtt服务器地址
// const connectUrl = 'mqtt://1.15.120.107:1883';
const connectUrl = 'mqtt://172.17.14.191:1883';
const client = mqtt.connect(connectUrl, options);
wendu=30
shidu=40

client.on('connect', function () {
    console.log(111);
    setInterval(function(){
        // publish(topic,msg) 上报属性数据  msg为str格式
        client.publish("/ZN_001/ ZN_001_001/properties/read/reply",'    {"properties":    {"wendu":'+ wendu+',  "shidu":'+shidu+'},    "deviceId":    "ZN_001_001","success": "true"}');
        wendu+=1
        shidu+=1
    },3000)
    console.log(222);
})

client.on('message', (topic, payload, packet) => {
    console.log('packet:', packet)
    console.log('Received Message:', topic, payload.toString())
  })

2. 使用python实现与平台通信

# python 3.6

import random
import time
import hashlib
from paho.mqtt import client as mqtt_client

import time
 
t = time.time()

def str2md5(str):
    
    m = hashlib.md5()  # 创建md5对象
    
    str_en = str.encode(encoding='utf-8')  # str必须先encode
    m.update(str_en)  # 传入字符串并加密
    str_md5 = m.hexdigest()  # 将MD5 hash值转换为16进制数字字符串
    return str_md5

# 13位 时间戳 (毫秒)
int(round(t * 1000))
# 设置用户名密码
secureId= 'ahau'
secureKey='ahau'
now = int(round(t * 1000)); #//当前时间戳
username_1 = secureId+"|"+str(now); #// 拼接用户密码
password_1 = str2md5(username_1+"|"+secureKey); #//使用md5生成摘要

print("username_1="+username_1)
print("password_1="+password_1)

# 设置mqtt服务器地址与端口
broker = 'iot.gitnote.cn'
port = 1883
topic = "/ZN_001/ZN_001_001/properties/report"
# generate client ID with pub prefix randomly
client_id = "ZN_001_001"
rc=0

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.username_pw_set(username_1,password_1)
    client.on_connect = on_connect
    client.connect(broker, port,60)
    return client

# 模拟假数据
wendu=33.3
shidu=50
guangzhao=300
co2=150
NH3=14

# 发送的属性数据需要以字符串格式发送
msg=str({"properties": {"wendu": wendu,"shidu":shidu,"guangzhao":guangzhao,"co2":co2,"NH3":NH3},"deviceId": "ZN_001_001","success": 'true'})
def publish(client):
    msg_count = 0
    while True:
        time.sleep(3)
        result = client.publish(topic, msg)
        global wendu
        global shidu
        wendu+=1.0
        shidu+=1
        print("wendu=",wendu,"shidu=",shidu)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()

3. 在nanopc开发板上运行传感器采集与发送程序

import serial
import time

# 打开串口COM4,进行通信,四合一传感器,光照、湿度、二氧化碳、温度
ser = serial.Serial("/dev/ttyUSB1", 9600)
# 打开串口COM5,进行通信,氨气参数
serN = serial.Serial("/dev/ttyUSB0", 9600)
i=1

# python 3.6

import random
import time
import hashlib
from paho.mqtt import client as mqtt_client

import time
 
t = time.time()

def str2md5(str):
    '''使用MD5对字符串进行加密

    Args:
        str (str): 需要加密的字符串

    Returns:
        [str]: 32位字符串
    ''' 
    m = hashlib.md5()  # 创建md5对象
    
    str_en = str.encode(encoding='utf-8')  # str必须先encode
    m.update(str_en)  # 传入字符串并加密
    str_md5 = m.hexdigest()  # 将MD5 hash值转换为16进制数字字符串
    return str_md5

 
# 13位 时间戳 (毫秒)
int(round(t * 1000))

secureId= 'ahau'
secureKey='ahau'
now = int(round(t * 1000)); #//当前时间戳
username_1 = secureId+"|"+str(now); #// 拼接用户密码
password_1 = str2md5(username_1+"|"+secureKey); #//使用md5生成摘要

print("username_1="+username_1)
print("password_1="+password_1)

# broker = 'iot.gitnote.cn'
broker = '172.17.14.191'
port = 1883
topic = "/ZN_001/ZN_001_001/properties/report"
# generate client ID with pub prefix randomly
client_id = "ZN_001_001"
rc=0

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    
    client = mqtt_client.Client(client_id)
    client.username_pw_set(username_1,password_1)
    client.on_connect = on_connect
    client.connect(broker, port,60)
    return client

# wendu=33.3
# shidu=50
# guangzhao=300
# co2=150
# NH3=14

# 传感器数据采集函数
def getData():
    if ser.is_open and serN.is_open:
        # 四合一传感器的控制命令
        send_data = bytes.fromhex('01 03 00 00 00 04 44 09')
        # 氨气传感器的控制命令
        send_dataN = bytes.fromhex('01 03 00 00 00 01 84 0A')
        # 向四合一传感器发送命令
        ser.write(send_data)
        # 向氨气传感器发送命令
        serN.write(send_dataN)
        # sleep中的单位为秒,传感器在300ms内响应
        time.sleep(1)
        # 四合一传感器读到的长度
        len_return_data = ser.inWaiting()
        print(len_return_data)
        # 氨气传感器读到的长度
        len_return_dataN = serN.inWaiting()
        print(len_return_dataN)
        if len_return_data and len_return_dataN:
            # 四合一传感器读到的数据
            return_data = ser.read(len_return_data)
            # 氨气传感器读到的数据
            return_dataN = serN.read(len_return_dataN)
            # 四合一传感器转为十六进制
            str_return_data = str(return_data.hex())
            # 氨气传感器转为十六进制
            str_return_dataN = str(return_dataN.hex())

            serData={}
            # 数据转化部分
            # CO2data
            global i
            print(i)
            i+=1
            CO2data = (int(str_return_data[6:8], 16)) * 256 + int(str_return_data[8:10], 16)
            serData['co2']=CO2data
            # print('二氧化碳:%s ppm' % CO2data)
            # a1.value=CO2data
            #a1[0]=CO2data
            # wendata
            if int(str_return_data[10:12], 16) < 127:
                wendata = (int(str_return_data[10:12], 16) * 256 + int(str_return_data[12:14], 16)) / 100
                # print('温度:%s 度' % wendata)
            else:
                wendata = (((int(str_return_data[10:12], 16) & 0x7F)) * 256 + int(str_return_data[12:14], 16)) / 100
                # print('温度:%s' % wendata)
            serData['wendu']=wendata
            #a1[1]=wendata
            # a2.value=wendata
            # 湿度
            shidudata = ((int(str_return_data[14:16], 16)) * 256 + int(str_return_data[16:18], 16)) / 100
            # print('湿度:%s' % shidudata + '%RH')
            serData['shidu']=shidudata
            # a3.value=shidudata
            # 光照
            guangzhaodata = ((int(str_return_data[18:20], 16)) * 256 + int(str_return_data[20:22], 16)) * 4
            # print('光照:%s lx' % guangzhaodata)
            serData['guangzhao']=guangzhaodata
            # a4.value=guangzhaodata
            # 氨气
            NH3data = int(str_return_dataN[6:8], 16) + int(str_return_dataN[8:10], 16)
            # print('氨气:%s ppm' % NH3data)
            serData['NH3']=NH3data
            # a5.value=NH3data
            return serData


# msg=str({"properties": {"wendu": wendu,"shidu":shidu,"guangzhao":guangzhao,"co2":co2,"NH3":NH3},"deviceId": "ZN_001_001","success": 'true'})

# 将采集到的数据转换为字符串格式
msg=str({"properties": getData()});

def publish(client):
    msg_count = 0
    while True:
        time.sleep(3)
        # mqtt 发布
        result = client.publish(topic, str({"properties": getData()}))
        # global wendu
        # global shidu
        # wendu+=1.0
        # shidu+=1
        # print("wendu=",wendu,"shidu=",shidu)
        print(msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()
nanopc2jetlinks4mqtt-1645671049382-2022224105049.png