1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt

Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt

时间:2022-01-25 04:03:03

相关推荐

Python模拟智能开关设备MQTT接入阿里云物联网平台 - PyCharm paho.mqtt

概要

Python 使用paho.mqtt库,利用阿里云物联网平台的设备证书:productKey、deviceName、deviceSecret,自动合成userName、passWord。以MQTT通信协议接入阿里云物联网平台,并模拟智能开关设备上报开关消息。

非常适合作为MQTT物联网设备的客户端模拟。在此Demo基础上可非常方便进行二次开发。

MQTT.fx做客户端固然方便,但如果想对流程或任务进行定制、让其模拟物联网设备的功能、或者多开自动化脚本,应该没什么比Python更方便了吧。

Python脚本使用说明

MQTT接入阿里云物联网平台Demo,使用一机一密的方式。我的代码运行环境为PyCharm,运行时,需安装 paho.mqtt。在 PyCharm 的 File - Settings - Projectxxx - Python Interpreter 中,搜索并安装 paho.mqtt。Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName。代码运行后,会使用设备证书的信息,自动连接阿里云物联网平台。并以5s为间隔,自动上报开关的状态消息。在阿里云在线调试界面,下发消息,可在客户端收到对应的json报文。

阿里云设备注册 的过程,请参照链接:阿里云MQTT物联网设备注册

Demo源码(IDE推荐用 PyCharm)

Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName。一定要保证这5项与个人注册的设备相匹配。

import hmacfrom hashlib import sha1import timefrom paho.mqtt.client import MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, MQTT_LOG_DEBUGfrom paho.mqtt import client as mqttimport jsonimport randomimport threading'''# 原文链接 - 我的博客,更多内容可查看我的主页。# MQTT接入阿里云物联网平台Demo,使用一机一密的方式# 运行时,需安装 paho.mqtt# 在PyCharm 的 File - Settings - Projectxxx - Python Interpreter 中,搜索并安装 paho.mqtt# 需要根据个人设备进行改动的仅5项:productKey、deviceName、deviceSecret、regionId、modelName'''# 设备证书(ProductKey、DeviceName和DeviceSecret),三元组productKey = 'a1wFylTxYeD'deviceName = 'co_0001'deviceSecret = '7ab0c4b3532b5783df5fdc58a2895d7a'# ClientId Username和 Password 签名模式下的设置方法,参考文档 /document_detail/73742.html?spm=a2c4g.11186623.6.614.c92e3d45d80aqG# MQTT - 合成connect报文中使用的 ClientID、Username、PasswordmqttClientId = deviceName + '|securemode=3,signmethod=hmacsha1|'mqttUsername = deviceName + '&' + productKeycontent = 'clientId' + deviceName + 'deviceName' + deviceName + 'productKey' + productKeymqttPassword = hmac.new(deviceSecret.encode(), content.encode(), sha1).hexdigest()# 接入的服务器地址regionId = 'cn-shanghai'# MQTT 接入点域名brokerUrl = productKey + '.iot-as-mqtt.' + regionId + '.'# Topic,post,客户端向服务器上报消息topic_post = '/sys/' + productKey + '/' + deviceName + '/thing/event/property/post'# Topic,set,服务器向客户端下发消息topic_set = '/sys/' + productKey + '/' + deviceName + '/thing/service/property/set'# 物模型名称的前缀(去除后缀的数字)modelName = 'PowerSwitch_'# 下发的设置报文示例:{"method":"thing.service.property.set","id":"1227667605","params":{"PowerSwitch_1":1},"version":"1.0.0"}# json合成上报开关状态的报文def json_switch_set(num, status):switch_info = {}switch_data = json.loads(json.dumps(switch_info))switch_data['method'] = '/thing/event/property/post'switch_data['id'] = random.randint(100000000,999999999) # 随机数即可,用于让服务器区分开报文switch_status = {modelName + num : status}switch_data['params'] = switch_statusreturn json.dumps(switch_data, ensure_ascii=False)# 开关的状态,0/1onoff = 0# 建立mqtt连接对象client = mqtt.Client(mqttClientId, protocol=mqtt.MQTTv311, clean_session=True)def on_log(client, userdata, level, buf):if level == MQTT_LOG_INFO:head = 'INFO'elif level == MQTT_LOG_NOTICE:head = 'NOTICE'elif level == MQTT_LOG_WARNING:head = 'WARN'elif level == MQTT_LOG_ERR:head = 'ERR'elif level == MQTT_LOG_DEBUG:head = 'DEBUG'else:head = levelprint('%s: %s' % (head, buf))# MQTT成功连接到服务器的回调处理函数def on_connect(client, userdata, flags, rc):print('Connected with result code ' + str(rc))# 与MQTT服务器连接成功,之后订阅主题client.subscribe(topic_post, qos=0)client.subscribe(topic_set, qos=0)# 向服务器发布测试消息client.publish(topic_post, payload='test msg', qos=0)# MQTT接收到服务器消息的回调处理函数def on_message(client, userdata, msg):print('recv:', msg.topic + ' ' + str(msg.payload))def on_disconnect(client, userdata, rc):if rc != 0:print('Unexpected disconnection %s' % rc)def mqtt_connect_aliyun_iot_platform():client.on_log = on_logclient.on_connect = on_connectclient.on_message = on_messageclient.on_disconnect = on_disconnectclient.username_pw_set(mqttUsername, mqttPassword)print('clientId:', mqttClientId)print('userName:', mqttUsername)print('password:', mqttPassword)print('brokerUrl:', brokerUrl)# ssl设置,并且port=8883# client.tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)try:client.connect(brokerUrl, 1883, 60)except:print('阿里云物联网平台MQTT服务器连接错误,请检查设备证书三元组、及接入点的域名!')client.loop_forever()def publish_loop():while 1:time.sleep(5)global onoffonoff = 1-onoffswitchPost = json_switch_set('1', onoff)client.publish(topic_post, payload=switchPost, qos=0)if __name__ == '__main__':# 建立线程t1:mqtt连接阿里云物联网平台# 建立线程t2:定时向阿里云发布消息:5s为间隔,变化开关状态t1 = threading.Thread(target=mqtt_connect_aliyun_iot_platform, )t2 = threading.Thread(target=publish_loop, )t1.start()t2.start()

运行现象(IDE使用 PyCharm)

代码运行后,会在调试窗口输出提示内容。

已经连接阿里云物联网平台,并定时上发开关状态的报文。

阿里云物联网平台,设备后台,可以看到设备上线。

查看客户端上发的开关状态:物模型-打开实时刷新

点击查看数据,可以图标形式查看历史数据。

利用在线调试,模拟向客户端下发控制指令。

客户端收到服务器的控制指令。

异常处理

代码已验证无误,如有错误,只能是设备证书填写有误。Demo中需要根据个人设备进行改动的仅5项:productKeydeviceNamedeviceSecretregionIdmodelName。一定要保证这5项与个人注册的设备相匹配。脚本运行时,会调试输出clientId、userName、passWordbrokerUrl等关键信息。可自行对照。

productKeyregionId填写有误,会导致brokerUrl合成错误,进而域名解析失败,MQTT不能正常连接阿里云服务器,引发异常。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。