5.1 综合案例

编程入门 行业动态 更新时间:2024-10-19 13:19:14

5.1 综合<a href=https://www.elefans.com/category/jswz/34/1770649.html style=案例"/>

5.1 综合案例

综合案例-将温湿度数据发送到云端

  • 1.测试代码
  • 2.测试结果
  • 3.实现步骤

1.测试代码

  • main.py
# coding=utf-8
from driver import GPIO
import network
import ujson
import utime as time
import modem
from  linksdk import Device
from driver import KV
from ahtx0 import AHT10
import otaglobal  g_connect_status,net,device,deviceSecret,deviceName,productKey,productSecret,device_dyn_resigter_succed,module_name,default_ver,info
g_connect_status= False
net= None
device = None
deviceSecret = None
deviceName = None
productKey = "a1dSGk3qrOO"
productSecret = "W5MuFOqE5YShGi24"
device_dyn_resigter_succed = False
module_name = 'default'
default_ver = '1.0.0'# 定义升级包的下载和安装路径,其中url,hash_type和hash 会通过服务端推送被保存下来info = {'url': '','store_path': '/data/pyamp/app.zip','install_path': '/data/pyamp/','length': 0,'hash_type': '','hash': ''}# ota 消息推送的接受函数
def on_trigger(data):global info# 保存服务端推送的ota信息info['url'] = data['url']info['length'] = data['length']info['module_name'] = data['module_name']info['version'] = data['version']info['hash'] = data['hash']info['hash_type'] = data['hash_type']# 开始ota 包下载dl_data = {}dl_data['url'] = info['url']dl_data['store_path'] = info['store_path']ota.download(dl_data)# ota 升级包下载结果回调函数
def on_download(data):global infoif data >= 0:print('Ota download succeed')# 开始ota包校验param = {}param['length'] = info['length']param['store_path'] = info['store_path']param['hash_type'] = info['hash_type']param['hash'] = info['hash']ota.verify(param)# ota 升级包校验结果回调函数
def on_verify(data):global infoprint(data)if data >= 0 :print('Ota verify succeed')print('Start Upgrade')# 开始ota升级param = {}param['length'] = info['length']param['store_path'] = info['store_path']param['install_path'] = info['install_path']ota.upgrade(param)# ota 升级包结果回调函数
def on_upgrade(data):if data >= 0 :print('Ota succeed')#ota升完级后 重启设备reboot()#当iot设备连接到物联网平台的时候触发'connect' 事件
def on_connect(data):global module_name,default_ver,productKey,deviceName,deviceSecret,on_trigger,on_download,on_verify,on_upgradeprint('***** connect lp succeed****')data_handle = {}data_handle['device_handle'] = data['handle']# 初始化ota服务ota.init(data_handle)# ota 回调函数注册ota.on(1,on_trigger)ota.on(2,on_download)ota.on(3,on_verify)ota.on(4,on_upgrade)report_info = {"device_handle": data['handle'] ,"product_key": productKey ,"device_name": deviceName ,"module_name": module_name ,"version": default_ver}
# 上报本机ota相关信息,上报版本信息返回以后程序返回,知道后台推送ota升级包,才会调用on_trigger函数ota.report(report_info)#当iot云端下发属性设置时,触发'props'事件
def on_props(request):print('clound req data is {}'.format(request))#当iot云端调用设备service时,触发'service'事件
def on_service(id,request):print('clound req id  is {} , req is {}'.format(id,request))#当设备跟iot平台通信过程中遇到错误时,触发'error'事件
def on_error(err):print('err msg is {} '.format(err))#网络连接的回调函数
def on_4g_cb(args):global g_connect_statuspdp = args[0]netwk_sta = args[1]if netwk_sta == 1:g_connect_status = Trueelse:g_connect_status = False#网络连接
def connect_network():global net,on_4g_cb,g_connect_status#NetWorkClient该类是一个单例类,实现网络管理相关的功能,包括初始化,联网,状态信息等 = network.NetWorkClient()g_register_network = Falseif net._stagecode is not None and net._stagecode == 3 and net._subcode == 1:g_register_network = Trueelse:g_register_network = Falseif g_register_network:#注册网络连接的回调函数on(self,id,func);  1代表连接,func 回调函数  ;return 0 成功net.on(1,on_4g_cb)net.connect(None)else:print('网络注册失败')while True:if g_connect_status:print('网络连接成功')breaktime.sleep_ms(20)#动态注册回调函数
def on_dynreg_cb(data):global deviceSecret,device_dyn_resigter_succeddeviceSecret = datadevice_dyn_resigter_succed = True# 连接物联网平台
def dyn_register_device(productKey,productSecret,deviceName):global on_dynreg_cb,device,deviceSecretkv = KV()key = '_amp_customer_devicesecret'deviceSecretdict = kv.getStorageSync(key)if deviceSecretdict is not None:deviceSecret = deviceSecretdict[key]if deviceSecretdict is None or deviceSecret is None:key_info = {'productKey': productKey  ,'productSecret': productSecret ,'deviceName': deviceName}# 动态注册一个设备,获取设备的deviceSecretdevice.register(key_info,on_dynreg_cb)def main():global g_connect_status,net,device,deviceSecret,deviceName,productKey,productSecret,device_dyn_resigter_succed# 定义需要升级的模块和版本号#连接网络connect_network()#获取设备的IMEI 作为deviceName 进行动态注册deviceName = modem.getDevImei()#初始化物联网平台Device类,获取device实例device = Device()if deviceName is not None and len(deviceName) > 0 :#动态注册一个设备dyn_register_device(productKey,productSecret,deviceName)else:print("获取设备IMEI失败,无法进行动态注册")while deviceSecret is None:time.sleep(0.2)print('动态注册成功:' + deviceSecret)key_info = {'region' : 'cn-shanghai' ,'productKey': productKey ,'deviceName': deviceName ,'deviceSecret': deviceSecret ,'keepaliveSec': 60}#打印设备信息print(key_info)#device.ON_CONNECT 是事件,on_connect是事件处理函数/回调函数device.on(device.ON_CONNECT,on_connect)           device.on(device.ON_PROPS,on_props)  device.on(device.ON_SERVICE,on_service)device.on(device.ON_ERROR,on_error)device.connect(key_info)  imei_iccid={}imei_iccid["IMEI"]=deviceNameimei_iccid["ICCID"]=modem.sim.getIccid()imei_iccid_str=ujson.dumps(imei_iccid)data0={"param":imei_iccid_str}time.sleep(1)device.postProps(data0)#creat an instancea=AHT10()#set mode(℃ or F)a.set_mode(0)tem_hum={}while True:time.sleep(1)#get datas of temperature and humiditytem,hum=a.get_data()print("temperature:{},humidity:{}".format(tem,hum))tem_hum["temperature"]=temtem_hum["humidity"]=humtem_hum_str=ujson.dumps(tem_hum)data={'param':tem_hum_str}device.postProps(data)print('当前App版本号:' + default_ver)print('等待Ota升级包.....')print('--------------------------------------------------------------------')if __name__=="__main__":main()
  • board.json
{"name": "haas506","version": "1.0.0","io": {"ADC0": {"type": "ADC","port": 0,"sampling": 12000000},"ADC1": {"type": "ADC","port": 1,"sampling": 12000000},"OLED": {"type": "I2C","port": 1,"addrWidth": 7,"freq": 400000,"mode": "master","devAddr": 60},"aht10": {"type": "I2C","port": 1,"addrWidth": 7,"freq": 400000,"mode": "master","devAddr": 56},      "KEY1": {"type": "GPIO","port": 44,"dir": "irq","pull": "pullup","intMode": "rising"},      "led1": {"type": "GPIO","port": 7,"dir": "output","pull": "pulldown"},"led_g": {"type": "GPIO","port": 32,"dir": "output","pull": "pulldown"},            "SPI0": {"type": "SPI","port": 0,"mode": "master","freq": 2000000},"serial1": {"type": "UART","port": 0,"dataWidth": 8,"baudRate": 115200,"stopBits": 1,"flowControl": "disable","parity": "none","timeout": 1000},"serial2": {"type": "UART","port": 1,"dataWidth": 8,"baudRate": 9600,"stopBits": 1,"flowControl": "disable","parity": "none","timeout": 1000},"serial3": {"type": "UART","port": 2,"dataWidth": 8,"baudRate": 115200,"stopBits": 1,"flowControl": "disable","parity": "none","timeout": 1000}},"debugLevel": "ERROR","repl": "enable","replPort": 0
}
  • ahtx0.py
import utime as time
from driver import I2C#CONSTANTS
AHT10_ADDRESS = 0x38     # 0111000 (7bit address)
AHT10_READ_DELAY_MS = 75 # Time it takes for AHT to collect data
AHT_TEMPERATURE_CONST = 200
AHT_TEMPERATURE_OFFSET = 50
KILOBYTE_CONST = 1048576
CMD_INITIALIZE = bytearray([0xE1, 0x08, 0x00])
CMD_MEASURE = bytearray([0xAC, 0x33, 0x00])
FARENHEIT_MULTIPLIER = 9/5
FARENHEIT_OFFSET = 32class AHT10:def __init__(self,  mode=0, address=AHT10_ADDRESS):self.i2c = I2C()self.i2c.open('aht10')self.address = addresswriteBuf=bytearray(4)writeBuf[0]=self.addresswriteBuf[1]=CMD_INITIALIZE[0]writeBuf[2]=CMD_INITIALIZE[1]writeBuf[3]=CMD_INITIALIZE[2]self.i2c.write(writeBuf,4)self.readings_raw = bytearray(8)self.results_parsed = [0, 0]self.mode = mode      # 0 for Celsius(摄氏度), 1 for Farenheit(华氏度)def read_raw(self):writeBuf=bytearray(4)writeBuf[0]=self.addresswriteBuf[1]=CMD_MEASURE[0]writeBuf[2]=CMD_MEASURE[1]writeBuf[3]=CMD_MEASURE[2]self.i2c.write(writeBuf,4)time.sleep_ms(AHT10_READ_DELAY_MS)readBuf=bytearray(6)readBuf[0]=AHT10_ADDRESSself.i2c.read(readBuf,6)self.readings_raw[0]=readBuf[0]self.readings_raw[1]=readBuf[1]self.readings_raw[2]=readBuf[2]self.readings_raw[3]=readBuf[3]self.readings_raw[4]=readBuf[4]self.readings_raw[5]=readBuf[5]self.results_parsed[0] = self.readings_raw[1] << 12 | self.readings_raw[2] << 4 | self.readings_raw[3] >> 4self.results_parsed[1] = (self.readings_raw[3] & 0x0F) << 16 | self.readings_raw[4] << 8 | self.readings_raw[5]def humidity(self):self.read_raw()return (self.results_parsed[0] / KILOBYTE_CONST) * 100 def temperature(self):self.read_raw()if self.mode is 0:return (self.results_parsed[1] / KILOBYTE_CONST) * AHT_TEMPERATURE_CONST - AHT_TEMPERATURE_OFFSETelse:return ((self.results_parsed[1] / KILOBYTE_CONST) * AHT_TEMPERATURE_CONST - AHT_TEMPERATURE_OFFSET) * FARENHEIT_MULTIPLIER + FARENHEIT_OFFSETdef set_mode(self, mode):if mode==0 or mode==1:self.mode = modeelse:    raise ValueError('Mode must be either 0 for Celsius or 1 Farenheit')def get_data(self):# print("Temperature: " + str(self.temperature()) + ("C","F")[self.mode] + ", Humidity: " + str(self.humidity()))# return str(self.temperature),str(self.humidity())return self.temperature(),self.humidity()
  • 物模型数据内容,model.json
{"schema": ".json","profile": {"version": "1.0","productKey": "a1nz0jhv9uS"},"properties": [{"identifier": "IMEI","name": "IMEI","accessMode": "r","required": false,"dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "ICCID","name": "ICCID","accessMode": "r","required": false,"dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "temperature","name": "temperature","accessMode": "r","required": false,"dataType": {"type": "double","specs": {"min": "-10","max": "100","unit": "°C","unitName": "摄氏度","step": "0.00000001"}}},{"identifier": "humidity","name": "humidity","accessMode": "r","required": false,"dataType": {"type": "double","specs": {"min": "0","max": "100","unit": "%RH","unitName": "相对湿度","step": "0.0000001"}}}],"events": [{"identifier": "post","name": "post","type": "info","required": true,"desc": "属性上报","method": "thing.event.property.post","outputData": [{"identifier": "IMEI","name": "IMEI","dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "ICCID","name": "ICCID","dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "temperature","name": "temperature","dataType": {"type": "double","specs": {"min": "-10","max": "100","unit": "°C","unitName": "摄氏度","step": "0.00000001"}}},{"identifier": "humidity","name": "humidity","dataType": {"type": "double","specs": {"min": "0","max": "100","unit": "%RH","unitName": "相对湿度","step": "0.0000001"}}}]}],"services": [{"identifier": "set","name": "set","required": true,"callType": "async","desc": "属性设置","method": "thing.service.property.set","inputData": [],"outputData": []},{"identifier": "get","name": "get","required": true,"callType": "async","desc": "属性获取","method": "thing.service.property.get","inputData": ["IMEI","ICCID","temperature","humidity"],"outputData": [{"identifier": "IMEI","name": "IMEI","dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "ICCID","name": "ICCID","dataType": {"type": "text","specs": {"length": "64"}}},{"identifier": "temperature","name": "temperature","dataType": {"type": "double","specs": {"min": "-10","max": "100","unit": "°C","unitName": "摄氏度","step": "0.00000001"}}},{"identifier": "humidity","name": "humidity","dataType": {"type": "double","specs": {"min": "0","max": "100","unit": "%RH","unitName": "相对湿度","step": "0.0000001"}}}]}]
}

2.测试结果

3.实现步骤

(1)复制main.py、board.json、ahtx0.py 这三个文件夹的代码复制到vscode中
(2)在阿里云物联网平台新建一个产品、设置功能定义
功能定义 可以一个一个设置,也可以上传物模型数据,如下所示:

物模型数据可以删除json文件,也可以上传zip压缩包

(3)新建一个设备
(4)将设备的productKey和productSecret复制到main.py代码中的相应位置
(5)烧录
(6)上报数据到云端

更多推荐

5.1 综合案例

本文发布于:2024-03-12 11:42:24,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1731427.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:案例

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!