1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 阿里云 IOT 物联网平台简单使用【随笔】

阿里云 IOT 物联网平台简单使用【随笔】

时间:2020-05-01 08:59:42

相关推荐

阿里云 IOT 物联网平台简单使用【随笔】

Aliyun IOT 使用

服务端订阅

官网:/document_detail/142376.html?spm=a2c4g.11186623.6.622.46b92cf0vmZSwV

准备工作

一、首先开通并进入阿里云物联网平台,创建一个公共实例/企业版实例【公共实例可用于测试,生产最好用企业版实例】

二、点击进入实例,创建产品,参数根据实际情况输入。

三、产品创建完成后,添加设备

四、若想测试发布订阅消息,还需要添加自定义的 topic 主题【当然也可以用阿里官方提供端】以及 创建服务端订阅

服务端订阅编码实现【Java】

工程下导入 pom 依赖

<!--aliyun core--><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.6</version></dependency><!--aliyun Iot--><!-- /artifact/com.aliyun/aliyun-java-sdk-iot --><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-iot</artifactId><version>7.16.0</version></dependency><!-- IOT用于监听阿里平台消息 --><dependency><groupId>com.aliyun.openservices</groupId><artifactId>iot-client-message</artifactId><version>1.1.5</version></dependency><!-- amqp 1.0 qpid client --><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency>

在项目目录下创建一个config.properties配置文件,具体参数如下:

## 阿里云密钥IDiot.accessKeyID=*************# 阿里云密钥iot.accessKeySecret=*************iot.uid=**********# 要访问的iot的regionId 目前支持的 cn-shanghai(华东2)、ap-southeast-1(新加坡) 、us-west-1(美西)iot.regionId=cn-shenzhen#iot套件对应的产品code 保持不变即可iot.productCode=Iot#Iot api的服务地址 跟regionId对应 这是华东2的iot.domain=iot.${iot.regionId}.#Iot api 的版本iot.version=-01-20# 消费组IDiot.consumerGroupId=DEFAULT_GROUP# 企业版iot实例ID,公共版没有iot.iotInstanceId=*************# 签名方法:支持hmacmd5、hmacsha1和hmacsha256。iot.signMethod=hmacsha1# iot公网终端节点url,企业版iot.amqp.connectionUrl=*************

参数介绍:

【accessKeyID 、accessKeySecret 、uid】 为阿里云用户信息;建议自己创建一个只具有 IOT 权限的用户步骤:创建用户,设置权限,创建 accessKeySecret

【regionId、productCode、domain、version】regionId 为区域ID,productCode 固定为 iot,domain 为区域对应的访问地址,version 为版本号;这些参数都很好获取就不做介绍了。

【consumerGroupId、iotInstanceId、signMethod,connectionUrl】consumerGroupId 为分组id,在上面创建订阅时右边的就是;iotInstanceId:为企业版实例的ID,公共实例不需要;signMethod 为加密方式,固定为上面给出的即可;connectionUrl:为企业版实例 AMQP 接入方式的 url

创建 IOT 连接工具类

import com.mon.util.aliyun.iot.util.LogUtil;import com.aliyun.openservices.iot.api.Profile;import com.aliyun.openservices.iot.api.message.MessageClientFactory;import com.aliyun.openservices.iot.api.message.api.MessageClient;import com.aliyuncs.DefaultAcsClient;import com.aliyuncs.profile.DefaultProfile;import com.aliyuncs.profile.IClientProfile;import mons.codec.binary.Base64;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import javax.jms.*;import javax.naming.Context;import javax.naming.InitialContext;import java.io.IOException;import java.util.Hashtable;import java.util.Properties;/*** iot相关配置信息,以及client的生产* @author akieay*/public class IotConnectionUtil {/*** 阿里云*/private static String accessKeyID;private static String accessKeySecret;private static String uid;private static String regionId;private static String product;private static String domain;private static String version;private static String consumerGroupId;private static String iotInstanceId;private static String signMethod;private static String amqpConnectionUrl;private static String CONNECTION_NAME= "SBCF";static {Properties prop = new Properties();try {prop.load(Object.class.getResourceAsStream("/config.properties"));accessKeyID = prop.getProperty("iot.accessKeyID");accessKeySecret = prop.getProperty("iot.accessKeySecret");uid = prop.getProperty("iot.uid");regionId = prop.getProperty("iot.regionId");product = prop.getProperty("iot.productCode");domain = prop.getProperty("iot.domain");version = prop.getProperty("iot.version");consumerGroupId = prop.getProperty("iot.consumerGroupId");iotInstanceId = prop.getProperty("iot.iotInstanceId");signMethod = prop.getProperty("iot.signMethod");amqpConnectionUrl = prop.getProperty("iot.amqp.connectionUrl");} catch (IOException e) {e.printStackTrace();}}public static String getAmqpConnectionUrl() {//企业版实例// return "failover:(amqps://"+amqpConnectionUrl+":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";//公共实例return "failover:(amqps://"+uid+".iot-amqp."+regionId+".:5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";}public static Connection getConnectionByEnterpriseInstance( Hashtable<String, String> hashtable ) throws Exception {Connection connection = null;try {long timeStamp = System.currentTimeMillis();String userName = uid + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKeyID// //公共版不需要该参数,企业版必须填写该参数//+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";String signContent = "authId=" + accessKeyID + "&timestamp=" + timeStamp;String password = doSign(signContent,accessKeySecret, signMethod);Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup(CONNECTION_NAME);connection = cf.createConnection(userName, password);} catch (Exception e) {LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());}return connection;}public static MessageClient getMessageClientByPublicInstance() {MessageClient messageClient = null;try {String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".";// 连接配置Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKeyID, accessKeySecret);// 构造客户端messageClient = MessageClientFactory.messageClient(profile);} catch (Exception e) {LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());}return messageClient;}/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}public static DefaultAcsClient getClient() {DefaultAcsClient client = null;try {IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret);DefaultProfile.addEndpoint(regionId, product, domain);// 初始化clientclient = new DefaultAcsClient(profile);} catch (Exception e) {LogUtil.print("初始化client失败!exception:" + e.getMessage());}return client;}public static String getRegionId() {return regionId;}public static void setRegionId(String regionId) {IotConnectionUtil.regionId = regionId;}public static String getDomain() {return domain;}public static void setDomain(String domain) {IotConnectionUtil.domain = domain;}public static String getVersion() {return version;}public static void setVersion(String version) {IotConnectionUtil.version = version;}}

创建服务端订阅业务类

import com.mon.util.aliyun.iot.connection.IotConnectionUtil;import lombok.extern.slf4j.Slf4j;import org.apache.qpid.jms.JmsConnection;import org.apache.qpid.jms.JmsConnectionListener;import org.apache.qpid.jms.message.JmsInboundMessageDispatch;import javax.jms.*;import javax.naming.Context;import javax.naming.InitialContext;import .URI;import java.util.Hashtable;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/*** @author akieay* @Date: /11/18 11:09*/@Slf4jpublic class ServerSideSubscription {/*** 业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。*/private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void main(String[] args) throws Exception {Hashtable<String, String> hashtable = new Hashtable<>();String connectionUrl = IotConnectionUtil.getAmqpConnectionUrl();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Connection connection = IotConnectionUtil.getConnectionByEnterpriseInstance(hashtable);Context context = new InitialContext(hashtable);Destination queue = (Destination)context.lookup("QUEUE");((JmsConnection) connection).addConnectionListener(jmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}/*** 消息监听器*/private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() -> processMessage(message));} catch (Exception e) {log.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");byte[] body = message.getBody(byte[].class);String content = null;if (null != body) {content = new String(body);}System.out.println("receive message"+ ", topic = " + topic+ ", messageId = " + messageId+ ", content = " + content);} catch (Exception e) {log.error("processMessage occurs error ", e);}}/*** 连接状态监听器*/private static JmsConnectionListener jmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {log.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {log.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {log.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};}

设备端接入

以下介绍的设备端接入为 java 版,主要用于配合服务端订阅的调试

官网:/document_detail/97331.html?spm=a2c4g.11186623.6.675.7636277c2CMzcp

设备端接入编码实现

导入 pom 依赖

<!--aliyun client--><dependency><groupId>com.aliyun.alink.linksdk</groupId><artifactId>iot-linkkit-java</artifactId><version>1.2.0.1</version><scope>compile</scope></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.1</version><scope>compile</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.25</version><scope>compile</scope></dependency>

客户端接入业务类

import com.mon.util.aliyun.iot.constant.TopicConstant;import com.aliyun.alink.dm.api.DeviceInfo;import com.aliyun.alink.dm.api.InitResult;import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;import com.aliyun.alink.linkkit.api.LinkKit;import com.aliyun.alink.linkkit.api.LinkKitInitParams;import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;import com.aliyun.alink.linksdk.cmp.core.base.AMessage;import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;import com.aliyun.alink.linksdk.tools.AError;import lombok.extern.slf4j.Slf4j;/*** @author akieay* @Date: /11/18 16:59*/@Slf4jpublic class ClientSideSubscription {public static void main(String[] args) {/*** 主要注意参数 channelHost,公共实例格式为:{productKey}+".iot-as-mqtt."+{regionId}+".:1883"* 企业版实例格式为:实现详情下的公网终端节点(Endpoint)中的 AMQP 的路径 + ":1883"*/new ClientSideSubscription().init("产品Key", "设备deviceName","设备deviceSecret", "你的channelHost");}public void init(String productKey, String deviceName, String deviceSecret, String channelHost){LinkKitInitParams params = new LinkKitInitParams();/*** 设置 Mqtt 初始化参数*/IoTMqttClientConfig config = new IoTMqttClientConfig();config.productKey = productKey;config.deviceName = deviceName;config.deviceSecret = deviceSecret;config.channelHost = channelHost;/*** 是否接受离线消息* 对应 mqtt 的 cleanSession 字段*/config.receiveOfflineMsg = false;params.mqttClientConfig = config;/*** 设置初始化三元组信息,用户传入*/DeviceInfo deviceInfo = new DeviceInfo();deviceInfo.productKey = productKey;deviceInfo.deviceName = deviceName;deviceInfo.deviceSecret = deviceSecret;params.deviceInfo = deviceInfo;//初始化连接LinkKit.getInstance().init(params, new ILinkKitConnectListener() {@Overridepublic void onError(AError aError) {log.info("Mqtt connect fail");}@Overridepublic void onInitDone(InitResult initResult) {log.info("Mqtt connect success");// 订阅MqttSubscribeRequest request = new MqttSubscribeRequest();// topic 用户根据实际场景填写request.topic = "/" + productKey + "/" + deviceName + TopicConstant.TEST_TOPIC;request.isSubscribe = true;LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {@Overridepublic void onSuccess() {// 订阅成功log.info("subscribe topic " + request.topic + " success");// 注册下行监听,包括长连接的状态和云端下行的数据LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {@Overridepublic void onNotify(String connectId, String topic, AMessage aMessage) {// 云端下行数据回调// connectId 连接类型 topic 下行 topic; aMessage 下行数据String data = new String((byte[]) aMessage.data);log.info("topic: " + topic + " \t data: " + data);// pushData 示例 {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}// method 服务类型; params 下推数据内容}@Overridepublic boolean shouldHandle(String connectId, String topic) {// 选择是否不处理某个 topic 的下行数据// 如果不处理某个topic,则onNotify不会收到对应topic的下行数据return true; //TODO 根基实际情况设置}@Overridepublic void onConnectStateChange(String connectId, ConnectState connectState) {log.info(connectId, connectState);// 对应连接类型的连接状态变化回调,具体连接状态参考 SDK ConnectState}});}@Overridepublic void onFailure(AError aError) {// 订阅失败log.info("onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));}});}});}}

演示工程:/download/qq_39668819/13124188

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。