1500字范文,内容丰富有趣,写作好帮手!
1500字范文 > 阿里云物联网平台云端采用AMQP方式接入同时订阅与发布消息

阿里云物联网平台云端采用AMQP方式接入同时订阅与发布消息

时间:2023-11-01 22:04:13

相关推荐

阿里云物联网平台云端采用AMQP方式接入同时订阅与发布消息

工业互联网需要采集设备底层数据进行实时数据展示与状态预警,由于底层硬件设备无法采用http等重量级请求发送数据,工业上常常采用MQTT协议进行数据传输,本次基于阿里云物联网平台进行数据采集,本次主要云端收集信息与数据下发,云端采用AMQP方式接入,该方式使得云端服务同时具体发布与订阅功能;设备端采用阿里云IOT平台MQTT.fx模拟接入

环境:JDK1.8+maven+springboot

GitHub源码地址:/hou296498161/amqp

1.添加以下依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>3.7.1</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-iot</artifactId><version>6.11.0</version></dependency><dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version></dependency>

1)、阿里云创建产品,定义自定义topic,类型为发布订阅

2)、创建产品、

3)、创建服务端订阅

2、服务端订阅消息

package com.ali.amqp.subscribe;import mons.codec.binary.Base64;import org.apache.qpid.jms.JmsConnection;import org.apache.qpid.jms.JmsConnectionListener;import org.apache.qpid.jms.message.JmsInboundMessageDispatch;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import javax.jms.*;import javax.naming.Context;import javax.naming.InitialContext;import .URI;import java.util.Hashtable;import java.util.UUID;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/*** 云端订阅消息*/public class AmqpJavaClient {private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClient.class);//业务处理异步线程池,线程池参数可以根据您的业务特点调整;或者您也可以用其他异步方式处理接收到的消息private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void subscribe() throws Exception {//参数说明参见阿里云文档:AMQP客户端接入说明。String accessKey = "你的阿里云accessKey";String accessSecret = "你的阿里云 accessSecret";//消费组IDString consumerGroupId = "DEFAULT_GROUP";long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5,hmacsha1和hmacsha256String signMethod = "hmacsha1";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。String clientId = UUID.randomUUID().toString().replaceAll("-","");//UserName组装方法,请参见阿里云文档:AMQP客户端接入说明。String userName = clientId + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",consumerGroupId=" + consumerGroupId+ "|";//password组装方法,请参见上一篇文档:AMQP客户端接入说明。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent,accessSecret, signMethod);//按照qpid-jms的规范,组装连接URL。String connectionUrl = "failover:(amqps://1090243284576461.iot--:5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// Create ConnectionConnection connection = cf.createConnection(userName, password);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message.getBody(byte[].class)==null){logger.error("收到一条空的消息,topic->{}",message.getStringProperty("topic"));}else {logger.info("收到消息:topic->{}:{}",message.getStringProperty("topic"),new String(message.getBody(byte[].class)));}//1.收到消息之后一定要ACK// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() ->System.out.println(""));} catch (Exception e) {logger.error("submit task occurs exception ", e);}}};private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** password签名计算方法,请参见阿里云文档:AMQP客户端接入说明。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}}

3服务端发布

package com.ali.amqp.publish;import com.ali.amqp.subscribe.AmqpJavaClient;import com.aliyuncs.DefaultAcsClient;import com.aliyuncs.IAcsClient;import com.aliyuncs.iot.model.v0120.PubRequest;import com.aliyuncs.iot.model.v0120.PubResponse;import com.aliyuncs.profile.DefaultProfile;import mons.codec.binary.Base64;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.*;import .Socket;import java.nio.charset.StandardCharsets;import java.text.SimpleDateFormat;/*** 云端下发消息*/public class PopPubServer {private final static Logger logger = LoggerFactory.getLogger(PopPubServer.class);public static boolean sendToTopic(String topic) throws UnsupportedEncodingException {String regionId = "cn-shanghai";String accessKey = "你的accessKey";String accessSecret = "你的accessSecret";final String productKey = "设备productKey";//设置client的参数DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret);IAcsClient autoClient = new DefaultAcsClient(profile);PubRequest request = new PubRequest();request.setQos(0);//设置发布消息的topicrequest.setTopicFullName("/a14UPaWxJCF/test_device/user/test");request.setProductKey(productKey);//设置消息的内容,一定要用base64编码,否则乱码Base64 base64 = new Base64();request.setMessageContent(base64.encodeToString("hello".getBytes("UTF-8")));try {PubResponse response = autoClient.getAcsResponse(request);Boolean success = response.getSuccess();return success;} catch (Exception e) {logger.warn("阿里云消息发送异常,topic:{},异常信息:{}",topic,e.getMessage());return false;}}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。