通过 mqtt 协议上报传感器数据到 jetlinks 平台
由于本人在做物联网方面的数据采集与展示工作,需要用到 jetlinks 平台,之前的数据采集与传输一直使用的是 http 协议与 websocket 协议,而 jetlinks 社区版不支持 http 与 websocket 协议,于是将之前的 nanopc 数据发送协议修改为 mqtt 协议。通过修改与调试,现已能成功通过程序自动上报相关数据,特此记录。同时使用 jetlinks 平台自带的通知模板实现了超阈值自动报警等功能。但 mqtt 协议与 jetlinks 平台使用尚不熟练,部分功能暂无实现,希望后期能进一步了解其原理。
使用 javascript
和 python
将属性值发送到 jetlinks
物联网平台
1. 使用javascript
实现与平台通信
var mqtt = require('mqtt')
var md5 = require('md5-node')
// 用户设置的账号与密码
var secureId= 'ahau'
var secureKey='ahau'
var now = new Date().getTime(); //当前时间戳
// 转换账号与密码
var username_1 = secureId+"|"+now; // 拼接用户密码
var password_1 = md5(username_1+"|"+secureKey); //使用md5生成摘要
const options = {
clean: true, // 保留回话
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: 'ZN_001_001',
username: username_1,
password: password_1,
}
}
// http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html
// 设置链接的mqtt服务器地址
// const connectUrl = 'mqtt://1.15.120.107:1883';
const connectUrl = 'mqtt://172.17.14.191:1883';
const client = mqtt.connect(connectUrl, options);
wendu=30
shidu=40
client.on('connect', function () {
console.log(111);
setInterval(function(){
// publish(topic,msg) 上报属性数据 msg为str格式
client.publish("/ZN_001/ ZN_001_001/properties/read/reply",' {"properties": {"wendu":'+ wendu+', "shidu":'+shidu+'}, "deviceId": "ZN_001_001","success": "true"}');
wendu+=1
shidu+=1
},3000)
console.log(222);
})
client.on('message', (topic, payload, packet) => {
console.log('packet:', packet)
console.log('Received Message:', topic, payload.toString())
})
2. 使用python
实现与平台通信
# python 3.6
import random
import time
import hashlib
from paho.mqtt import client as mqtt_client
import time
t = time.time()
def str2md5(str):
m = hashlib.md5() # 创建md5对象
str_en = str.encode(encoding='utf-8') # str必须先encode
m.update(str_en) # 传入字符串并加密
str_md5 = m.hexdigest() # 将MD5 hash值转换为16进制数字字符串
return str_md5
# 13位 时间戳 (毫秒)
int(round(t * 1000))
# 设置用户名密码
secureId= 'ahau'
secureKey='ahau'
now = int(round(t * 1000)); #//当前时间戳
username_1 = secureId+"|"+str(now); #// 拼接用户密码
password_1 = str2md5(username_1+"|"+secureKey); #//使用md5生成摘要
print("username_1="+username_1)
print("password_1="+password_1)
# 设置mqtt服务器地址与端口
broker = 'iot.gitnote.cn'
port = 1883
topic = "/ZN_001/ZN_001_001/properties/report"
# generate client ID with pub prefix randomly
client_id = "ZN_001_001"
rc=0
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username_1,password_1)
client.on_connect = on_connect
client.connect(broker, port,60)
return client
# 模拟假数据
wendu=33.3
shidu=50
guangzhao=300
co2=150
NH3=14
# 发送的属性数据需要以字符串格式发送
msg=str({"properties": {"wendu": wendu,"shidu":shidu,"guangzhao":guangzhao,"co2":co2,"NH3":NH3},"deviceId": "ZN_001_001","success": 'true'})
def publish(client):
msg_count = 0
while True:
time.sleep(3)
result = client.publish(topic, msg)
global wendu
global shidu
wendu+=1.0
shidu+=1
print("wendu=",wendu,"shidu=",shidu)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
3. 在nanopc开发板上运行传感器采集与发送程序
import serial
import time
# 打开串口COM4,进行通信,四合一传感器,光照、湿度、二氧化碳、温度
ser = serial.Serial("/dev/ttyUSB1", 9600)
# 打开串口COM5,进行通信,氨气参数
serN = serial.Serial("/dev/ttyUSB0", 9600)
i=1
# python 3.6
import random
import time
import hashlib
from paho.mqtt import client as mqtt_client
import time
t = time.time()
def str2md5(str):
'''使用MD5对字符串进行加密
Args:
str (str): 需要加密的字符串
Returns:
[str]: 32位字符串
'''
m = hashlib.md5() # 创建md5对象
str_en = str.encode(encoding='utf-8') # str必须先encode
m.update(str_en) # 传入字符串并加密
str_md5 = m.hexdigest() # 将MD5 hash值转换为16进制数字字符串
return str_md5
# 13位 时间戳 (毫秒)
int(round(t * 1000))
secureId= 'ahau'
secureKey='ahau'
now = int(round(t * 1000)); #//当前时间戳
username_1 = secureId+"|"+str(now); #// 拼接用户密码
password_1 = str2md5(username_1+"|"+secureKey); #//使用md5生成摘要
print("username_1="+username_1)
print("password_1="+password_1)
# broker = 'iot.gitnote.cn'
broker = '172.17.14.191'
port = 1883
topic = "/ZN_001/ZN_001_001/properties/report"
# generate client ID with pub prefix randomly
client_id = "ZN_001_001"
rc=0
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username_1,password_1)
client.on_connect = on_connect
client.connect(broker, port,60)
return client
# wendu=33.3
# shidu=50
# guangzhao=300
# co2=150
# NH3=14
# 传感器数据采集函数
def getData():
if ser.is_open and serN.is_open:
# 四合一传感器的控制命令
send_data = bytes.fromhex('01 03 00 00 00 04 44 09')
# 氨气传感器的控制命令
send_dataN = bytes.fromhex('01 03 00 00 00 01 84 0A')
# 向四合一传感器发送命令
ser.write(send_data)
# 向氨气传感器发送命令
serN.write(send_dataN)
# sleep中的单位为秒,传感器在300ms内响应
time.sleep(1)
# 四合一传感器读到的长度
len_return_data = ser.inWaiting()
print(len_return_data)
# 氨气传感器读到的长度
len_return_dataN = serN.inWaiting()
print(len_return_dataN)
if len_return_data and len_return_dataN:
# 四合一传感器读到的数据
return_data = ser.read(len_return_data)
# 氨气传感器读到的数据
return_dataN = serN.read(len_return_dataN)
# 四合一传感器转为十六进制
str_return_data = str(return_data.hex())
# 氨气传感器转为十六进制
str_return_dataN = str(return_dataN.hex())
serData={}
# 数据转化部分
# CO2data
global i
print(i)
i+=1
CO2data = (int(str_return_data[6:8], 16)) * 256 + int(str_return_data[8:10], 16)
serData['co2']=CO2data
# print('二氧化碳:%s ppm' % CO2data)
# a1.value=CO2data
#a1[0]=CO2data
# wendata
if int(str_return_data[10:12], 16) < 127:
wendata = (int(str_return_data[10:12], 16) * 256 + int(str_return_data[12:14], 16)) / 100
# print('温度:%s 度' % wendata)
else:
wendata = (((int(str_return_data[10:12], 16) & 0x7F)) * 256 + int(str_return_data[12:14], 16)) / 100
# print('温度:%s' % wendata)
serData['wendu']=wendata
#a1[1]=wendata
# a2.value=wendata
# 湿度
shidudata = ((int(str_return_data[14:16], 16)) * 256 + int(str_return_data[16:18], 16)) / 100
# print('湿度:%s' % shidudata + '%RH')
serData['shidu']=shidudata
# a3.value=shidudata
# 光照
guangzhaodata = ((int(str_return_data[18:20], 16)) * 256 + int(str_return_data[20:22], 16)) * 4
# print('光照:%s lx' % guangzhaodata)
serData['guangzhao']=guangzhaodata
# a4.value=guangzhaodata
# 氨气
NH3data = int(str_return_dataN[6:8], 16) + int(str_return_dataN[8:10], 16)
# print('氨气:%s ppm' % NH3data)
serData['NH3']=NH3data
# a5.value=NH3data
return serData
# msg=str({"properties": {"wendu": wendu,"shidu":shidu,"guangzhao":guangzhao,"co2":co2,"NH3":NH3},"deviceId": "ZN_001_001","success": 'true'})
# 将采集到的数据转换为字符串格式
msg=str({"properties": getData()});
def publish(client):
msg_count = 0
while True:
time.sleep(3)
# mqtt 发布
result = client.publish(topic, str({"properties": getData()}))
# global wendu
# global shidu
# wendu+=1.0
# shidu+=1
# print("wendu=",wendu,"shidu=",shidu)
print(msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()