发布时间:2025-06-24 17:23:52  作者:北方职教升学中心  阅读量:316


系统中需要对接第三方的硬件比如摄像头报警系统,触发报警时会通过mqtt协议发送json格式的报警数据。service层代码等可自行根据表使用代码生成工具等生成。

然后使用一个对应json字段的实体对象接收,后面模仿业务层面的处理,转换为需要入库的DTO,

分别进行存储mysql数据库和reids操作。用户名、密码等需要配置的信息。

Mqtt的Broker的搭建使用如下方式:

Windows上Mqtt的Broker/服务端的搭建-使用mosquitto:

Windows上Mqtt的Broker/服务端的搭建-使用mosquitto-CSDN博客

注:

博客:
霸道流氓气质-CSDN博客

实现

新建SpringBoot项目并添加项目依赖

mqtt所需的依赖

        <!-- mqtt -->        <dependency>            <groupId>org.eclipse.paho</groupId>            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>            <version>1.2.4</version>        </dependency>

线程池使用guava依赖

        <!-- guava -->        <dependency>            <groupId>com.google.guava</groupId>            <artifactId>guava</artifactId>            <version>32.1.2-jre</version>            <scope>provided</scope>        </dependency>

当然也可使用Java自带的依赖,关于guava的使用参考如下

Java工具库Guava并发相关工具类的使用示例:

Java工具库Guava并发相关工具类的使用示例_moreexecutors.listeningdecorator-CSDN博客

Json解析相关的依赖

        <!-- 阿里JSON解析器 -->        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.75</version>        </dependency>

Lombok工具集依赖

        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>        </dependency>

其它连接mysql、回调方法实现

完整项目目录参考如下,新建三个mqtt相关的三个类

第一个类MqttReceiver,用来连接代理服务器,订阅主题

package com.badao.demo.mqtt;import com.badao.demo.config.MqttConfig;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.context.annotation.DependsOn;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;import javax.websocket.ClientEndpoint;import java.util.Random;/** * @ClassName: MqttReceiver * @Description: mqtt连接代理服务器,订阅主题 */@Component@ClientEndpoint@DependsOn("MqttConfig")public class MqttReceiver {    // mqtt配置    public  String host;    public  String port;    public  String user;    public  String password;    public static MqttClient client;    public static MqttService videoMqttService;    public static String clientId = "badao_" + new Random().nextInt(999999999);    public void start() {        try {            // broker为代理端,clientId即的客户端id,MemoryPersistence设置clientId的保存形式,默认为以内存保存            String broker = "tcp://"+this.host+":"+this.port;            client = new MqttClient(broker, clientId, new MemoryPersistence());            videoMqttService = new MqttService();            // MQTT连接设置            MqttConnectOptions connOpts = new MqttConnectOptions();            // 修复过多发布bug            connOpts.setMaxInflight(1000);            // 设置用户名和密码            connOpts.setUserName(this.user);            connOpts.setPassword(this.password.toCharArray());            // 设置心跳时间间隔,服务端实时了解客户端是否与其保持连接的情况            connOpts.setKeepAliveInterval(5);            connOpts.setMaxReconnectDelay(10);            connOpts.setConnectionTimeout(10);            connOpts.setAutomaticReconnect(true);            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,            // 这里设置为true表示每次连接到服务器都以新的身份连接            connOpts.setCleanSession(true);            // 设置回调            client.setCallback(new MqttMessageCallback());            // 建立连接            System.out.println("连接到mqtt服务器: " + broker);            client.connect(connOpts);            // 订阅主题            videoMqttService.subscribe(client);       } catch (MqttException me) {            me.printStackTrace();       }    }    public void setHost(String host) {        this.host = host;   }    public void setPort(String port) {        this.port = port;   }    public void setUser(String user) {        this.user = user;   }    public void setPassword(String password) {        this.password = password;   }    /**     * @Description: Spring实例化该Bean之后马上执行此方法     * @param:     * @Return: void     * @Exception:     */    @PostConstruct    void init() {        // 实例化mqtt对象        MqttReceiver client = new MqttReceiver();        // 从yml文件设置参数        // 实例化mqtt配置对象        MqttConfig mqttConfig =new MqttConfig();        client.setHost(mqttConfig.getHost());        client.setPort(mqttConfig.getPort());        client.setUser(mqttConfig.getUser());        client.setPassword(mqttConfig.getPassword());        // 启动        client.start();   }}

第二个类MqttService用来配置接收到订阅消息后,用来对消息的处理

package com.badao.demo.mqtt;import com.alibaba.fastjson.JSON;import com.badao.demo.entity.TVideoMsg;import com.badao.demo.entity.VideoAlarm;import com.badao.demo.service.serviceImpl.TVideoMsgServiceImpl;import com.badao.demo.utils.RedisCache;import com.badao.demo.utils.SpringUtils;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import java.util.Date;/** * @ClassName: MqttService * @Description: mqtt订阅消息处理 */@Slf4jpublic class MqttService {    private final static String MQTT_VIDEO_TOPIC = "badao";    private TVideoMsgServiceImpl tVideoMsgService= SpringUtils.getBean(TVideoMsgServiceImpl.class);    private RedisCache redisCache= SpringUtils.getBean("redisCache");    /**     * @Description: mqtt接受到订阅消息后,对消息的处理     * @param: topic     * @param: message     * @Return: void     * @Exception:     */    public void getMqttMessage(String topic, MqttMessage message) {        try {            System.out.println("received topic:"+topic+",message:"+message.toString());            if (MqttTopic.isMatched(MQTT_VIDEO_TOPIC, topic)) {                String msg = new String(message.getPayload());                //序列化json数据到实体                VideoAlarm videoAlarm = JSON.parseObject(msg, VideoAlarm.class);                //根据自己业务进行数据转换为需要入库的数据实体                TVideoMsg tVideoMsgDTO= new TVideoMsg();                tVideoMsgDTO.setTimestamp(videoAlarm.getTimestamp());                tVideoMsgDTO.setAlgId(videoAlarm.getAlg_id());                tVideoMsgDTO.setCameraId(videoAlarm.getCamera_id());                tVideoMsgDTO.setImagePath(videoAlarm.getImage_path());                tVideoMsgDTO.setAlarmDate(new Date());                //mysql入库存储逻辑                tVideoMsgService.insertTVideoMsg(tVideoMsgDTO);                //redis缓存入库逻辑                String reKeyTemp="badaoTest:" + videoAlarm.getCamera_id();                redisCache.setCacheObject(reKeyTemp,tVideoMsgDTO);           }        } catch (Exception e) {            log.error(e.getMessage(), e);       }    }    /**     * @Description: 订阅主题     * @param: client     * @Return: void     * @Exception:     */    public void subscribe(MqttClient client) {        try {            client.subscribe(MQTT_VIDEO_TOPIC);       } catch (Exception e) {            e.printStackTrace();       }    }}

上面这个类为业务处理关键类,配置的主题为badao,然后这里需要在非Spring管理环境中获取bean,比如需要获取redis

private RedisCache redisCache= SpringUtils.getBean("redisCache");

以及获取插入到mysql中的service

后面附SpringUtiles的具体实现以及service相关实现。

getMqttMessage用来对收到对应主题消息的处理,首先进行json序列化。端口、 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)     *     * @param name     * @return boolean     * @throws NoSuchBeanDefinitionException     *     */    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.isSingleton(name);   }    /**     * @param name     * @return Class 注册对象的类型     * @throws NoSuchBeanDefinitionException     *     */    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.getType(name);   }    /**     * 如果给定的bean名字在bean定义中有别名,则返回这些别名     *     * @param name     * @return     * @throws NoSuchBeanDefinitionException     *     */    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException    {        return beanFactory.getAliases(name);   }    /**     * 获取aop代理对象     *     * @param invoker     * @return     */    @SuppressWarnings("unchecked")    public static <T> T getAopProxy(T invoker)    {        return (T) AopContext.currentProxy();   }}

首先建立一个mysql的表

然后上面收到mqtt消息对应的实体类TVideoMsg

package com.badao.demo.entity;import java.util.Date;public class TVideoMsg{    private static final long serialVersionUID = 1L;    private Long id;    private Long timestamp;    private Long algId;    private Long cameraId;    private String algName;    private String confirm;    private String path;    private String imagePath;    private String alarmRule;    private String status;    private String handlePeople;    private Date handleDate;    private Date alarmDate;    private String handleContent;    public Long getId() {        return id;   }    public void setId(Long id) {        this.id = id;   }    public Long getTimestamp() {        return timestamp;   }    public void setTimestamp(Long timestamp) {        this.timestamp = timestamp;   }    public Long getAlgId() {        return algId;   }    public void setAlgId(Long algId) {        this.algId = algId;   }    public Long getCameraId() {        return cameraId;   }    public void setCameraId(Long cameraId) {        this.cameraId = cameraId;   }    public String getAlgName() {        return algName;   }    public void setAlgName(String algName) {        this.algName = algName;   }    public String getConfirm() {        return confirm;   }    public void setConfirm(String confirm) {        this.confirm = confirm;   }    public String getPath() {        return path;   }    public void setPath(String path) {        this.path = path;   }    public String getImagePath() {        return imagePath;   }    public void setImagePath(String imagePath) {        this.imagePath = imagePath;   }    public String getAlarmRule() {        return alarmRule;   }    public void setAlarmRule(String alarmRule) {        this.alarmRule = alarmRule;   }    public String getStatus() {        return status;   }    public void setStatus(String status) {        this.status = status;   }    public String getHandlePeople() {        return handlePeople;   }    public void setHandlePeople(String handlePeople) {        this.handlePeople = handlePeople;   }    public Date getHandleDate() {        return handleDate;   }    public void setHandleDate(Date handleDate) {        this.handleDate = handleDate;   }    public Date getAlarmDate() {        return alarmDate;   }    public void setAlarmDate(Date alarmDate) {        this.alarmDate = alarmDate;   }    public String getHandleContent() {        return handleContent;   }    public void setHandleContent(String handleContent) {        this.handleContent = handleContent;   }}

消息数据转换后的DTO类VideoAlarm

import lombok.Data;@Datapublic class VideoAlarm {    private Long id;    private Long timestamp;    private Long alg_id;    private Long camera_id;    private String alg_name;    private Integer confirm;    private String path;    private String image_path;    private String alarm_rule;    private boolean status = false;}

其它mapper层、

其底层也是有对org.eclipse.paho.client.mqttv3的封装和引用。

MQTT相关连接、mosquitto、

第三个类是MqttMessageCallback用来对建立连接、

yml文件添加mqtt配置

application.yml文件中添加mqtt连接相关的ip、

测试mqtt连接、

项目启动后需要连接配置的mqtt的broker的地址,并订阅指定主题,当收到mqtt消息时进行解析并存储进mysql和redis中。

下面直接对org.eclipse.paho.client.mqttv3进行集成和简单的业务示例梳理。

#mqtt数据配置badao-mqtt:  host: 127.0.0.1  port: 1883  user: admin  password: 123456

然后新建配置类,用于获取yml中配置的内容

package com.badao.demo.config;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;/** * Class Name: mqttConfig * Description: 类功能说明 */@Component("MqttConfig")@ConfigurationProperties(prefix = "badao-mqtt")public class MqttConfig {    /**     * mqqt地址     */    private static String host;    /**     * mqtt端口     */    private static String port;    /**     * mqtt用户     */    private static String user;    /**     * mqtt密码     */    private  static String password;    public String getHost() {        return host;   }    public void setHost(String host) {        MqttConfig.host = host;   }    public String getPort() {        return port;   }    public void setPort(String port) {        MqttConfig.port = port;   }    public String getUser() {        return user;   }    public void setUser(String user) {        MqttConfig.user = user;   }    public String getPassword() {        return password;   }    public void setPassword(String password) {        MqttConfig.password = password;   }}

注意这里注解的prefix要与yml中的前缀一致。mybatis、接受json数据解析入库存储流程

启动并连接mqtt服务端

测试断连重连

入库存储测试

全部代码资源、自动重连的回调处理。redis的依赖省略,见文末示例代码。断线重连、

场景

SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送):

SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)_服务端接收mqtt消息-CSDN博客

上面SpringBoot集成MQTT使用的是spring-integration-mqtt依赖,也是经常使用的方式。缓存相关的类

上面收到mqtt消息的处理中SpringUtils的代码实现

package com.badao.demo.utils;import org.springframework.aop.framework.AopContext;import org.springframework.beans.BeansException;import org.springframework.beans.factory.NoSuchBeanDefinitionException;import org.springframework.beans.factory.config.BeanFactoryPostProcessor;import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;import org.springframework.stereotype.Component;/** * spring工具类 方便在非spring管理环境中获取bean * */@Componentpublic final class SpringUtils implements BeanFactoryPostProcessor{    /** Spring应用上下文环境 */    private static ConfigurableListableBeanFactory beanFactory;    @Override    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException    {        SpringUtils.beanFactory = beanFactory;   }    /**     * 获取对象     *     * @param name     * @return Object 一个以所给名字注册的bean的实例     * @throws BeansException     *     */    @SuppressWarnings("unchecked")    public static <T> T getBean(String name) throws BeansException    {        return (T) beanFactory.getBean(name);   }    /**     * 获取类型为requiredType的对象     *     * @param clz     * @return     * @throws BeansException     *     */    public static <T> T getBean(Class<T> clz) throws BeansException    {        T result = (T) beanFactory.getBean(clz);        return result;   }    /**     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true     *     * @param name     * @return boolean     */    public static boolean containsBean(String name)    {        return beanFactory.containsBean(name);   }    /**     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。

这里@Component("MqttConfig")注解指定了名称,后面使用。

实现mqtt消息入库、sql文件资源打包下载

下载地址:

https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/90283310

断开连接、收到消息、

package com.badao.demo.mqtt;import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @ClassName: MqttMessageCallback * @Description: mqtt回调函数 */@Slf4jpublic class MqttMessageCallback implements MqttCallbackExtended {    public static MqttService videoMqttService;    ExecutorService newFixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors()            ,Runtime.getRuntime().availableProcessors()            , 60            , TimeUnit.SECONDS            , new LinkedBlockingDeque<>(500)            , new ThreadFactoryBuilder().setNameFormat("mqtt-%d").build()            , new ThreadPoolExecutor.AbortPolicy());    /**     * @Description: 服务器断开与客户机的连接时客户机将调用此方法     * @param: cause     * @Return: void     * @Exception:     */    @Override    public void connectionLost(Throwable cause) {        log.error("MQTT丢失连接");        try {            MqttReceiver.client.reconnect();       } catch (MqttException e) {            e.printStackTrace();       }    }    /**     * @Description: 当客户机的与预订主题相匹配的预订到达时,就会调此方法     * @param: topic     * @param: message     * @Return: void     * @Exception:     */    @Override    public void messageArrived(String topic, MqttMessage message) {        videoMqttService = new MqttService();        try {            newFixedThreadPool.execute(() -> videoMqttService.getMqttMessage(topic, message));       } catch (Exception e) {            e.printStackTrace();       }    }    /**     * @Description: 由mqtt客户机调用以将传递令牌传回客户机应用程序     * @param: token     * @Return: void     * @Exception:     */    @Override    public void deliveryComplete(IMqttDeliveryToken token) {   }    @Override    public void connectComplete(boolean b, String s) {        log.error("MQTT触发自动重连");        try {            videoMqttService = new MqttService();            videoMqttService.subscribe(MqttReceiver.client);            if (MqttReceiver.client.isConnected()) {                log.error("MQTT重新连接成功!");           } else {                log.error("MQTT重连失败!");           }        } catch (Exception e) {            e.printStackTrace();       }    }}

注意这里在收到消息的回调messageArrived中,使用线程池支持高并发的处理。