发布时间:2025-06-24 17:23:52 作者:北方职教升学中心 阅读量:316
系统中需要对接第三方的硬件比如摄像头报警系统,触发报警时会通过mqtt协议发送json格式的报警数据。service层代码等可自行根据表使用代码生成工具等生成。
然后使用一个对应json字段的实体对象接收,后面模仿业务层面的处理,转换为需要入库的DTO,
分别进行存储mysql数据库和reids操作。用户名、密码等需要配置的信息。
Mqtt的Broker的搭建使用如下方式:
Windows上Mqtt的Broker/服务端的搭建-使用mosquitto:
Windows上Mqtt的Broker/服务端的搭建-使用mosquitto-CSDN博客
注:
博客:
霸道流氓气质-CSDN博客
实现
新建SpringBoot项目并添加项目依赖
mqtt所需的依赖
<!-- mqtt --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency>
线程池使用guava依赖
<!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>32.1.2-jre</version> <scope>provided</scope> </dependency>
当然也可使用Java自带的依赖,关于guava的使用参考如下
Java工具库Guava并发相关工具类的使用示例:
Java工具库Guava并发相关工具类的使用示例_moreexecutors.listeningdecorator-CSDN博客
Json解析相关的依赖
<!-- 阿里JSON解析器 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
Lombok工具集依赖
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
其它连接mysql、回调方法实现
完整项目目录参考如下,新建三个mqtt相关的三个类
第一个类MqttReceiver,用来连接代理服务器,订阅主题
package com.badao.demo.mqtt;import com.badao.demo.config.MqttConfig;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.context.annotation.DependsOn;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.ClientEndpoint;import java.util.Random;/** * @ClassName: MqttReceiver * @Description: mqtt连接代理服务器,订阅主题 */@Component@ClientEndpoint@DependsOn("MqttConfig")public class MqttReceiver { // mqtt配置 public String host; public String port; public String user; public String password; public static MqttClient client; public static MqttService videoMqttService; public static String clientId = "badao_" + new Random().nextInt(999999999); public void start() { try { // broker为代理端,clientId即的客户端id,MemoryPersistence设置clientId的保存形式,默认为以内存保存 String broker = "tcp://"+this.host+":"+this.port; client = new MqttClient(broker, clientId, new MemoryPersistence()); videoMqttService = new MqttService(); // MQTT连接设置 MqttConnectOptions connOpts = new MqttConnectOptions(); // 修复过多发布bug connOpts.setMaxInflight(1000); // 设置用户名和密码 connOpts.setUserName(this.user); connOpts.setPassword(this.password.toCharArray()); // 设置心跳时间间隔,服务端实时了解客户端是否与其保持连接的情况 connOpts.setKeepAliveInterval(5); connOpts.setMaxReconnectDelay(10); connOpts.setConnectionTimeout(10); connOpts.setAutomaticReconnect(true); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new MqttMessageCallback()); // 建立连接 System.out.println("连接到mqtt服务器: " + broker); client.connect(connOpts); // 订阅主题 videoMqttService.subscribe(client); } catch (MqttException me) { me.printStackTrace(); } } public void setHost(String host) { this.host = host; } public void setPort(String port) { this.port = port; } public void setUser(String user) { this.user = user; } public void setPassword(String password) { this.password = password; } /** * @Description: Spring实例化该Bean之后马上执行此方法 * @param: * @Return: void * @Exception: */ @PostConstruct void init() { // 实例化mqtt对象 MqttReceiver client = new MqttReceiver(); // 从yml文件设置参数 // 实例化mqtt配置对象 MqttConfig mqttConfig =new MqttConfig(); client.setHost(mqttConfig.getHost()); client.setPort(mqttConfig.getPort()); client.setUser(mqttConfig.getUser()); client.setPassword(mqttConfig.getPassword()); // 启动 client.start(); }}
第二个类MqttService用来配置接收到订阅消息后,用来对消息的处理
package com.badao.demo.mqtt;import com.alibaba.fastjson.JSON;import com.badao.demo.entity.TVideoMsg;import com.badao.demo.entity.VideoAlarm;import com.badao.demo.service.serviceImpl.TVideoMsgServiceImpl;import com.badao.demo.utils.RedisCache;import com.badao.demo.utils.SpringUtils;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import java.util.Date;/** * @ClassName: MqttService * @Description: mqtt订阅消息处理 */@Slf4jpublic class MqttService { private final static String MQTT_VIDEO_TOPIC = "badao"; private TVideoMsgServiceImpl tVideoMsgService= SpringUtils.getBean(TVideoMsgServiceImpl.class); private RedisCache redisCache= SpringUtils.getBean("redisCache"); /** * @Description: mqtt接受到订阅消息后,对消息的处理 * @param: topic * @param: message * @Return: void * @Exception: */ public void getMqttMessage(String topic, MqttMessage message) { try { System.out.println("received topic:"+topic+",message:"+message.toString()); if (MqttTopic.isMatched(MQTT_VIDEO_TOPIC, topic)) { String msg = new String(message.getPayload()); //序列化json数据到实体 VideoAlarm videoAlarm = JSON.parseObject(msg, VideoAlarm.class); //根据自己业务进行数据转换为需要入库的数据实体 TVideoMsg tVideoMsgDTO= new TVideoMsg(); tVideoMsgDTO.setTimestamp(videoAlarm.getTimestamp()); tVideoMsgDTO.setAlgId(videoAlarm.getAlg_id()); tVideoMsgDTO.setCameraId(videoAlarm.getCamera_id()); tVideoMsgDTO.setImagePath(videoAlarm.getImage_path()); tVideoMsgDTO.setAlarmDate(new Date()); //mysql入库存储逻辑 tVideoMsgService.insertTVideoMsg(tVideoMsgDTO); //redis缓存入库逻辑 String reKeyTemp="badaoTest:" + videoAlarm.getCamera_id(); redisCache.setCacheObject(reKeyTemp,tVideoMsgDTO); } } catch (Exception e) { log.error(e.getMessage(), e); } } /** * @Description: 订阅主题 * @param: client * @Return: void * @Exception: */ public void subscribe(MqttClient client) { try { client.subscribe(MQTT_VIDEO_TOPIC); } catch (Exception e) { e.printStackTrace(); } }}
上面这个类为业务处理关键类,配置的主题为badao,然后这里需要在非Spring管理环境中获取bean,比如需要获取redis
private RedisCache redisCache= SpringUtils.getBean("redisCache");
以及获取插入到mysql中的service
后面附SpringUtiles的具体实现以及service相关实现。
getMqttMessage用来对收到对应主题消息的处理,首先进行json序列化。端口、 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注册对象的类型 * @throws NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果给定的bean名字在bean定义中有别名,则返回这些别名 * * @param name * @return * @throws NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 获取aop代理对象 * * @param invoker * @return */ @SuppressWarnings("unchecked") public static <T> T getAopProxy(T invoker) { return (T) AopContext.currentProxy(); }}
首先建立一个mysql的表
然后上面收到mqtt消息对应的实体类TVideoMsg
package com.badao.demo.entity;import java.util.Date;public class TVideoMsg{ private static final long serialVersionUID = 1L; private Long id; private Long timestamp; private Long algId; private Long cameraId; private String algName; private String confirm; private String path; private String imagePath; private String alarmRule; private String status; private String handlePeople; private Date handleDate; private Date alarmDate; private String handleContent; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public Long getAlgId() { return algId; } public void setAlgId(Long algId) { this.algId = algId; } public Long getCameraId() { return cameraId; } public void setCameraId(Long cameraId) { this.cameraId = cameraId; } public String getAlgName() { return algName; } public void setAlgName(String algName) { this.algName = algName; } public String getConfirm() { return confirm; } public void setConfirm(String confirm) { this.confirm = confirm; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public String getImagePath() { return imagePath; } public void setImagePath(String imagePath) { this.imagePath = imagePath; } public String getAlarmRule() { return alarmRule; } public void setAlarmRule(String alarmRule) { this.alarmRule = alarmRule; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getHandlePeople() { return handlePeople; } public void setHandlePeople(String handlePeople) { this.handlePeople = handlePeople; } public Date getHandleDate() { return handleDate; } public void setHandleDate(Date handleDate) { this.handleDate = handleDate; } public Date getAlarmDate() { return alarmDate; } public void setAlarmDate(Date alarmDate) { this.alarmDate = alarmDate; } public String getHandleContent() { return handleContent; } public void setHandleContent(String handleContent) { this.handleContent = handleContent; }}
消息数据转换后的DTO类VideoAlarm
import lombok.Data;@Datapublic class VideoAlarm { private Long id; private Long timestamp; private Long alg_id; private Long camera_id; private String alg_name; private Integer confirm; private String path; private String image_path; private String alarm_rule; private boolean status = false;}
其它mapper层、
其底层也是有对org.eclipse.paho.client.mqttv3的封装和引用。
MQTT相关连接、mosquitto、
第三个类是MqttMessageCallback用来对建立连接、
yml文件添加mqtt配置
application.yml文件中添加mqtt连接相关的ip、
测试mqtt连接、
项目启动后需要连接配置的mqtt的broker的地址,并订阅指定主题,当收到mqtt消息时进行解析并存储进mysql和redis中。
下面直接对org.eclipse.paho.client.mqttv3进行集成和简单的业务示例梳理。
#mqtt数据配置badao-mqtt: host: 127.0.0.1 port: 1883 user: admin password: 123456
然后新建配置类,用于获取yml中配置的内容
package com.badao.demo.config;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;/** * Class Name: mqttConfig * Description: 类功能说明 */@Component("MqttConfig")@ConfigurationProperties(prefix = "badao-mqtt")public class MqttConfig { /** * mqqt地址 */ private static String host; /** * mqtt端口 */ private static String port; /** * mqtt用户 */ private static String user; /** * mqtt密码 */ private static String password; public String getHost() { return host; } public void setHost(String host) { MqttConfig.host = host; } public String getPort() { return port; } public void setPort(String port) { MqttConfig.port = port; } public String getUser() { return user; } public void setUser(String user) { MqttConfig.user = user; } public String getPassword() { return password; } public void setPassword(String password) { MqttConfig.password = password; }}
注意这里注解的prefix要与yml中的前缀一致。mybatis、接受json数据解析入库存储流程
启动并连接mqtt服务端
测试断连重连
入库存储测试
全部代码资源、自动重连的回调处理。redis的依赖省略,见文末示例代码。断线重连、场景
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送):
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)_服务端接收mqtt消息-CSDN博客
上面SpringBoot集成MQTT使用的是spring-integration-mqtt依赖,也是经常使用的方式。缓存相关的类
上面收到mqtt消息的处理中SpringUtils的代码实现
package com.badao.demo.utils;import org.springframework.aop.framework.AopContext;import org.springframework.beans.BeansException;import org.springframework.beans.factory.NoSuchBeanDefinitionException;import org.springframework.beans.factory.config.BeanFactoryPostProcessor;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.stereotype.Component;/** * spring工具类 方便在非spring管理环境中获取bean * */@Componentpublic final class SpringUtils implements BeanFactoryPostProcessor{ /** Spring应用上下文环境 */ private static ConfigurableListableBeanFactory beanFactory; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } /** * 获取对象 * * @param name * @return Object 一个以所给名字注册的bean的实例 * @throws BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 获取类型为requiredType的对象 * * @param clz * @return * @throws BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。这里@Component("MqttConfig")注解指定了名称,后面使用。
实现mqtt消息入库、sql文件资源打包下载

下载地址:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/90283310
断开连接、收到消息、package com.badao.demo.mqtt;import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @ClassName: MqttMessageCallback * @Description: mqtt回调函数 */@Slf4jpublic class MqttMessageCallback implements MqttCallbackExtended { public static MqttService videoMqttService; ExecutorService newFixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() ,Runtime.getRuntime().availableProcessors() , 60 , TimeUnit.SECONDS , new LinkedBlockingDeque<>(500) , new ThreadFactoryBuilder().setNameFormat("mqtt-%d").build() , new ThreadPoolExecutor.AbortPolicy()); /** * @Description: 服务器断开与客户机的连接时客户机将调用此方法 * @param: cause * @Return: void * @Exception: */ @Override public void connectionLost(Throwable cause) { log.error("MQTT丢失连接"); try { MqttReceiver.client.reconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * @Description: 当客户机的与预订主题相匹配的预订到达时,就会调此方法 * @param: topic * @param: message * @Return: void * @Exception: */ @Override public void messageArrived(String topic, MqttMessage message) { videoMqttService = new MqttService(); try { newFixedThreadPool.execute(() -> videoMqttService.getMqttMessage(topic, message)); } catch (Exception e) { e.printStackTrace(); } } /** * @Description: 由mqtt客户机调用以将传递令牌传回客户机应用程序 * @param: token * @Return: void * @Exception: */ @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectComplete(boolean b, String s) { log.error("MQTT触发自动重连"); try { videoMqttService = new MqttService(); videoMqttService.subscribe(MqttReceiver.client); if (MqttReceiver.client.isConnected()) { log.error("MQTT重新连接成功!"); } else { log.error("MQTT重连失败!"); } } catch (Exception e) { e.printStackTrace(); } }}
场景
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送):
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)_服务端接收mqtt消息-CSDN博客
上面SpringBoot集成MQTT使用的是spring-integration-mqtt依赖,也是经常使用的方式。缓存相关的类
上面收到mqtt消息的处理中SpringUtils的代码实现
package com.badao.demo.utils;import org.springframework.aop.framework.AopContext;import org.springframework.beans.BeansException;import org.springframework.beans.factory.NoSuchBeanDefinitionException;import org.springframework.beans.factory.config.BeanFactoryPostProcessor;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.stereotype.Component;/** * spring工具类 方便在非spring管理环境中获取bean * */@Componentpublic final class SpringUtils implements BeanFactoryPostProcessor{ /** Spring应用上下文环境 */ private static ConfigurableListableBeanFactory beanFactory; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } /** * 获取对象 * * @param name * @return Object 一个以所给名字注册的bean的实例 * @throws BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 获取类型为requiredType的对象 * * @param clz * @return * @throws BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。这里@Component("MqttConfig")注解指定了名称,后面使用。
实现mqtt消息入库、sql文件资源打包下载

下载地址:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/90283310
package com.badao.demo.mqtt;import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @ClassName: MqttMessageCallback * @Description: mqtt回调函数 */@Slf4jpublic class MqttMessageCallback implements MqttCallbackExtended { public static MqttService videoMqttService; ExecutorService newFixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() ,Runtime.getRuntime().availableProcessors() , 60 , TimeUnit.SECONDS , new LinkedBlockingDeque<>(500) , new ThreadFactoryBuilder().setNameFormat("mqtt-%d").build() , new ThreadPoolExecutor.AbortPolicy()); /** * @Description: 服务器断开与客户机的连接时客户机将调用此方法 * @param: cause * @Return: void * @Exception: */ @Override public void connectionLost(Throwable cause) { log.error("MQTT丢失连接"); try { MqttReceiver.client.reconnect(); } catch (MqttException e) { e.printStackTrace(); } } /** * @Description: 当客户机的与预订主题相匹配的预订到达时,就会调此方法 * @param: topic * @param: message * @Return: void * @Exception: */ @Override public void messageArrived(String topic, MqttMessage message) { videoMqttService = new MqttService(); try { newFixedThreadPool.execute(() -> videoMqttService.getMqttMessage(topic, message)); } catch (Exception e) { e.printStackTrace(); } } /** * @Description: 由mqtt客户机调用以将传递令牌传回客户机应用程序 * @param: token * @Return: void * @Exception: */ @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectComplete(boolean b, String s) { log.error("MQTT触发自动重连"); try { videoMqttService = new MqttService(); videoMqttService.subscribe(MqttReceiver.client); if (MqttReceiver.client.isConnected()) { log.error("MQTT重新连接成功!"); } else { log.error("MQTT重连失败!"); } } catch (Exception e) { e.printStackTrace(); } }}
注意这里在收到消息的回调messageArrived中,使用线程池支持高并发的处理。