1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 技能梳理1@树莓派3B+bh1750+震动+dht11+onenet+本地保存

技能梳理1@树莓派3B+bh1750+震动+dht11+onenet+本地保存

时间:2020-09-20 18:06:19

相关推荐

技能梳理1@树莓派3B+bh1750+震动+dht11+onenet+本地保存

树莓派采集温湿度、震动传感器检测的声音、光照强度,并发送到云平台OneNET进行曲线绘制,保存传感器数据到树莓派本地

1、项目简介

2、实现逻辑

#通过i2c协议读取光照传感器bh1750检测的光照数据

#通过单总线协议获取温度传感器dht11检测的温湿度数据

#设置输入引脚,检测震动传感器由于震动导致的引脚跳变

#设置好onenet线上环境及ui页面,代码中设置好密钥等信息,按照固定格式将传感器数据发送到onenet

#设置保存间隔,打开本地的txt文件,将光照,温湿度等传感器数据带着保存那刻的时间戳存入本地路径

#各线程进行初始化配置及开始运行,做相应的线程卡死处理及最终硬件连接

3、应用场景

#远程室内环境检测

#开关门及开关灯检测(震动*光照)

#火灾及漏水检测

#绿植土壤干燥度检测

4、核心代码梳理

# 硬件连接 传感器 信号引脚 信号引脚电源正电源负# BH1750光照采集相关程序__DEV_ADDR=0x23 #BH1750地址#BH1750控制字 具体含义和手册对照__CMD_PWR_OFF=0x00 #关机__CMD_PWR_ON=0x01 #开机__CMD_RESET=0x07 #重置__CMD_CHRES=0x10 #持续高分辨率检测__CMD_CHRES2=0x11 #持续高分辨率模式2检测__CMD_CLHRES=0x13 #持续低分辨率检测__CMD_THRES=0x20 #一次高分辨率__CMD_THRES2=0x21 #一次高分辨率模式2__CMD_TLRES=0x23 #一次分辨率__CMD_SEN100H=0x42 #灵敏度100%,高位__CMD_SEN100L=0X65 #灵敏度100%,低位__CMD_SEN50H=0x44 #50%__CMD_SEN50L=0x6A #50%__CMD_SEN200H=0x41 #200%__CMD_SEN200L=0x73 #200%bus = smbus.SMBus(1) #使用树莓派iic总线1bus.write_byte(__DEV_ADDR, __CMD_PWR_ON) #向bh1750下发各种指令bus.write_byte(__DEV_ADDR, __CMD_RESET)bus.write_byte(__DEV_ADDR, __CMD_SEN100H)bus.write_byte(__DEV_ADDR, __CMD_SEN100L)bus.write_byte(__DEV_ADDR, __CMD_PWR_OFF)def getIlluminance(): # 获取光照强度bus.write_byte(__DEV_ADDR, __CMD_PWR_ON)bus.write_byte(__DEV_ADDR, __CMD_THRES2)time.sleep(0.2) # 稍微稳定一下res = bus.read_word_data(__DEV_ADDR,0) # 读取光照强度数据# read_word_datares=((res>>8)&0xff)|(res<<8)&0xff00 # 数据处理res=round(res/(2*1.2),2) # bh1750原始数据换算成物理光照强度 公式看手册# result="光照强度: "+str(res)+"lx"return resdef threadSOUND(): # 线程启动和保护GPIO.setmode(GPIO.BOARD)time.sleep(1)# 设置声音校测引脚为输入GPIO.setup(xx, GPIO.IN)threadCOLECT = threading.Thread(target=colectSOUND)threadCOLECT.start()while True:if not threadCOLECT.is_alive(): # 如果声音采集线程卡死 隔60s再次开启threadCOLECT = threading.Thread(target=colectSOUND)threadCOLECT.start()time.sleep(60)def colectSOUND(): # 硬件采集声音global sound_status while True:if(sound_status == 0): # 没有声音/上一次检测结果发送完毕才进行下一次检测while GPIO.input(xx) == GPIO.HIGH: # 如果是高电平就一直死循环continuesound_status = 1 # 物理低电平代表有声音# time.sleep(1)# print("colectSOUND")# 发送和保存传感器数据相关程序 #def threadDATAPRO(): # 线程数据处理threaddatapro = threading.Thread(target=dataCheck)threaddatapro.start()while True:if not threaddatapro.is_alive(): # 如果发送和保存线程卡死 隔60s再次开启threaddatapro = threading.Thread(target=dataCheck)threaddatapro.start()time.sleep(60)def dataCheck(): # 检测global sound_statusglobal temglobal humglobal luxsound_status = 0while True:hum_temp, tem_temp = Adafruit_DHT.read_retry(xx, xx) # 参数里面 xx是指DHT11 xx 指gpioxx 在树莓派上是pinxxif ((hum_temp < 100) and (tem_temp < 100)):tem_temp = '{:0.1f}'.format(tem_temp) # 保留小数点后1位hum_temp = '{:0.1f}'.format(hum_temp)tem = float(tem_temp) # 把字符串转换为float类型hum = float(hum_temp) # 把字符串转换为float类型lux = getIlluminance()time.sleep(1)#print("SendSave")def dataSave(): # 保存global sound_statusglobal temglobal humglobal lux# 数据保存CurTime = datetime.datetime.now() # 获取当前时间file_txt = open('/home/pi/xx.txt', 'a', encoding='utf-8') # 'a’是接续写入,在不删除之前文本内容的前提下,写入数据file_txt.write('\ntime:' + str(CurTime) + '\ttem:' + str(tem) + '\thum:' + str(hum) + '\tsound:' + str(sound_status) + '\tlux:' + str(lux) + '\n') # 写入操作file_txt.close() # 关闭文件def dataSend(): # 发送global sound_statusglobal temglobal humglobal lux# 数据发送resp = http_put()sound_status = 0 # 开始下一周期声音检测print("OneNET result:\n %s" % resp)# onenet云平台通信相关程序 #def http_put():global sound_statusglobal temglobal humglobal luxssl._create_default_https_context = ssl._create_unverified_context# 当使用urllib.urlopen一个 https 的时候会验证一次 SSL证书 通过导入ssl模块把证书验证改成不用验证url = '/devices/xxxxxxxx/datapoints'print("the tem is: %d" %tem)print("the hum is: %d" %hum)print("the sound is: %d" %sound_status)print("the lux is: %d" %lux)values = {"datastreams":[{"id":"TEM","datapoints":[{"value":tem}]},{"id":"HUM","datapoints":[{"value":hum}]},{"id":"SOUND","datapoints":[{"value":sound_status}]},{"id":"LUX","datapoints":[{"value":lux}]}]}jdata = json.dumps(values).encode("utf-8")#print(jdata)request = urllib.request.Request(url, jdata)request.add_header('api-key', APIKEY)request.get_method = lambda: 'POST'request = urllib.request.urlopen(request)return request.read()if __name__ == '__main__':threadCollectSound = threading.Thread(target=threadSOUND) # 创建线程threadCollectSound.setDaemon(True) # 守护线程threadCollectSound.start() # 启动线程threadDataPro = threading.Thread(target=threadDATAPRO)threadDataPro.setDaemon(True) # 守护线程threadDataPro.start()schedule.every(1).seconds.do(dataSave) # 每几秒保存一次数据 注意:发送和保存的时间尽量不要一样,否则容易冲突schedule.every(xx).seconds.do(dataSend) # 每几秒发送一次while True:schedule.run_pending() #定时器,在while True死循环中,schedule.run_pending()是保持schedule一直运行,# 去查询上面那一堆的任务,在任务中,就可以设置不同的时间去运行time.sleep(1)

5、部分参考资料

#dht11参考

/hilary0614/p/dht11.html

#bh1750参考

/sirius-swu/p/6682746.html

注意事项

#出现 urllib2.URLError: urlopen error [Errno -3] Temporary failure in name resolution解决方法参考:

/info-detail-2549054.html(设置静态ip和dns)

#出现 http.client remotedisconnected:remote end closed connection without respone原因:太频繁和onenet通信,被onenet认为是爬虫,被关闭远端连接,重新运行即可

#文件保存在同路径 dataSave.txt 里面

#注意上电刚开始可能定时不准确,过几分钟一般会好些

完整可运行项目地址

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。