发布时间:2025-06-24 17:13:51 作者:北方职教升学中心 阅读量:026
我设置的是1秒
- 创建redis配置
@ConfigurationpublicclassRedisConfig{/** * RedisTemplate配置 */@Bean("redisTemplate")publicRedisTemplate<Object,Object>redisTemplate(RedisConnectionFactoryredisConnectionFactory){RedisTemplate<Object,Object>template =newRedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);// 使用fastjson进行序列化处理,提高解析效率FastJsonRedisSerializer<Object>serializer =newFastJsonRedisSerializer<Object>(Object.class);// value值的序列化采用fastJsonRedisSerializertemplate.setValueSerializer(serializer);template.setHashValueSerializer(serializer);// key的序列化采用StringRedisSerializertemplate.setKeySerializer(newStringRedisSerializer());template.setHashKeySerializer(newStringRedisSerializer());template.setConnectionFactory(redisConnectionFactory);// 使用fastjson时需设置此项,否则会报异常not support typeParserConfig.getGlobalInstance().setAutoTypeSupport(true);returntemplate;}/** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * * @param connectionFactory * @return */@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory){RedisMessageListenerContainercontainer =newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);returncontainer;}}
- 序列化
/** * @Description:使用fastjson实现redis的序列化 */publicclassFastJsonRedisSerializer<T>implementsRedisSerializer<T>{publicstaticfinalCharsetDEFAULT_CHARSET=Charset.forName("UTF-8");privateClass<T>clazz;publicFastJsonRedisSerializer(Class<T>clazz){super();this.clazz =clazz;}@Overridepublicbyte[]serialize(Tt)throwsSerializationException{if(t ==null){returnnewbyte[0];}returnJSON.toJSONString(t,SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);}@OverridepublicTdeserialize(byte[]bytes)throwsSerializationException{if(bytes ==null||bytes.length <=0){returnnull;}Stringstr =newString(bytes,DEFAULT_CHARSET);return(T)JSON.parseObject(str,clazz);}}
- 创建消息类 DelayMessage
- 这里定义一个消息类 , 包含消息的id,消息内容,以及到期时间(消息的执行时间) , 代码如下
@Data@AllArgsConstructor@NoArgsConstructorpublicclassDelayMessageimplementsSerializable{/** * 切记实例化 */privatestaticfinallongserialVersionUID =-7671756385477179547L;/** * 消息 id */privateStringid;/** * 消息内容 */privateStringcontent;/** * 消息到期时间(指定当前消息在什么时间开始消费(时间戳)) */privatelongexpireTime;}
- 创建延迟队列类 DelayQueue
- 创建一个延迟队列类 , 提供,添加消息,删除消息,和获取消息的方法 , 具体代码如下
@ComponentpublicclassDelayQueue{/** * key后面拼接当前机器的内网ip : 用于集群区分,解决集群出现的并发问题 */privatestaticfinalStringKEY="delay_queue:"+getHostAddress();@AutowiredprivateRedisTemplateredisTemplate;/** * 添加消息到延时队列中 */publicvoidput(DelayMessagemessage){redisTemplate.opsForZSet().add(KEY,message,message.getExpireTime());}/** * 从延时队列中删除消息 */publicLongremove(DelayMessagemessage){Longremove =redisTemplate.opsForZSet().remove(KEY,message);returnremove;}/** * 获取延时队列中已到期的消息 */publicList<DelayMessage>getExpiredMessages(){// 1 : 获取到开始时间longminScore =0;// 2 : 获取到结束时间longmaxScore =System.currentTimeMillis();// 3 : 获取到指定范围区间的数据列表Set<Object>messages =redisTemplate.opsForZSet().rangeByScore(KEY,minScore,maxScore);if(messages ==null||messages.isEmpty()){returnCollections.emptyList();}// 4 : 把对象进行封装,返回List<DelayMessage>result =newArrayList<>();for(Objectmessage :messages){DelayMessagedelayMessage =JSONObject.parseObject(JSON.toJSONString(message),DelayMessage.class);result.add(delayMessage);}returnresult;}/** * 获取地址(服务器的内网地址)(内网ip) * * @return */publicstaticStringgetHostAddress(){InetAddresslocalHost =null;try{localHost =InetAddress.getLocalHost();}catch(UnknownHostExceptione){e.printStackTrace();}returnlocalHost.getHostAddress();}}
- 创建 DelayMessageHandler 消息处理类
- 创建一个消息处理累, 添加一个处理过期的消息,写个定时任务,间隔1s轮询延时队列中已到期的任务,如果获取不到为空,
则不进行消息处理的逻辑 , 反之继续轮询
@ComponentpublicclassDelayMessageHandler{publicstaticSimpleDateFormatdateTimeFormater =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");@AutowiredprivateDelayQueuedelayQueue;/** * 处理已到期的消息(轮询) */@Scheduled(fixedDelay =1000)publicvoidhandleExpiredMessages(){StringcurrentTime =getCurrentTime();// 1 : 扫描任务,并将需要执行的任务加入到任务队列中List<DelayMessage>messages =delayQueue.getExpiredMessages();List<DelayMessage>messages_2 =delayQueue.getExpiredMessages();System.out.println(currentTime +" 待处理消息数量:"+messages.size());// 2 : 开始处理消息if(!messages.isEmpty()){for(DelayMessagemessage :messages){System.out.println(message.getId()+" --> 消息开始处理");try{// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)Thread.sleep(3000);}catch(Exceptione){e.printStackTrace();}System.out.println(message.getId()+" --> 消息处理结束");// 2.2 : 处理完消息,删除消息delayQueue.remove(message);}}}/** * 获取到的当前时分秒 * * @return */publicstaticStringgetCurrentTime(){Stringformat =dateTimeFormater.format(newDate());returnformat;}}
执行结果 : (我们可以看到 , 消息正在慢慢的被消费)
2023-11-03 15:06:01 待处理消息数量:02023-11-03 15:06:02 待处理消息数量:02023-11-03 15:06:03 待处理消息数量:02023-11-03 15:06:04 待处理消息数量:0# 此处开始调用接口 , 往延迟队列中添加消息2023-11-03 15:06:05 待处理消息数量:42023-11-03 15:06:05 :1 -->消息开始处理2023-11-03 15:06:05 :1 -->消息处理结束2023-11-03 15:06:05 :13 -->消息开始处理2023-11-03 15:06:05 :13 -->消息处理结束2023-11-03 15:06:05 :5 -->消息开始处理2023-11-03 15:06:05 :5 -->消息处理结束2023-11-03 15:06:05 :9 -->消息开始处理2023-11-03 15:06:05 :9 -->消息处理结束2023-11-03 15:06:18 待处理消息数量:122023-11-03 15:06:18 :10 -->消息开始处理2023-11-03 15:06:18 :10 -->消息处理结束2023-11-03 15:06:18 :14 -->消息开始处理2023-11-03 15:06:18 :14 -->消息处理结束2023-11-03 15:06:18 :2 -->消息开始处理2023-11-03 15:06:18 :2 -->消息处理结束2023-11-03 15:06:18 :6 -->消息开始处理
此处我们会发现一个问题 , @Scheduled
注解是轮询执行的 , 如果上一个任务没执行完毕 , 定时器会等待 , 等待上一次执行完毕
也就是说 , @Scheduled
注解表示同步执行的 , 那么就会出现一个问题 , 每一个消息处理都会耗时3秒,
假设有 A B 两条消息 , 消息的过期时间是一致的 , 那么这两个消息会被同时从缓存中取出准备消费 ,假设A消息第一个开始消费 ,
那么B消息,就要等待3秒 , 等A消息执行完成,才开始消费B消息 , 那么就会出现消息堆积,延迟消费的情况 , 本来14:00就要消费的消息,等到了 14:10 才开始消费(可能会更晚) ,
如果消息量足够大的情况下 , 就会出现问题 , 内存泄漏 , 消息堆积 , 延迟消费等情况
解决办法 : 开线程去执行 (使用线程池) , 使用以下代码 , 我们消费一条消息,就需要创建一个线程去后台消费 , 就会解决了上面的问题 ,
(这里需要用到线程池,我为了偷懒 ,就简单模拟了一下)
/** * 处理已到期的消息(轮询) */@Scheduled(fixedDelay =1000)publicvoidhandleExpiredMessages(){StringcurrentTime =getCurrentTime();// 1 : 扫描任务,并将需要执行的任务加入到任务队列中List<DelayMessage>messages =delayQueue.getExpiredMessages();System.out.println(currentTime +" 待处理消息数量:"+messages.size());// 2 : 开始处理消息if(!messages.isEmpty()){for(DelayMessagemessage :messages){// 2.1 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程newThread(()->{System.out.println(currentTime +" :"+message.getId()+" --> 消息开始处理");try{// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)Thread.sleep(3000);}catch(Exceptione){e.printStackTrace();}System.out.println(currentTime +" :"+message.getId()+" --> 消息处理结束");// 2.2 : 处理完消息,删除消息delayQueue.remove(message);}).start();}}}
执行结果 : 开启线程异步执行消息
2023-11-03 15:18:33 待处理消息数量:02023-11-03 15:18:34 待处理消息数量:02023-11-03 15:18:35 待处理消息数量:02023-11-03 15:18:36 待处理消息数量:42023-11-03 15:18:36 :1 -->消息开始处理2023-11-03 15:18:36 :13 -->消息开始处理2023-11-03 15:18:36 :5 -->消息开始处理2023-11-03 15:18:36 :9 -->消息开始处理2023-11-03 15:18:37 待处理消息数量:42023-11-03 15:18:37 :1 -->消息开始处理 // 注意:(此消息被重复消费了)2023-11-03 15:18:37 :13 -->消息开始处理2023-11-03 15:18:37 :5 -->消息开始处理2023-11-03 15:18:37 :9 -->消息开始处理2023-11-03 15:18:38 待处理消息数量:82023-11-03 15:18:38 :1 -->消息开始处理2023-11-03 15:18:38 :5 -->消息开始处理2023-11-03 15:18:38 :9 -->消息开始处理2023-11-03 15:18:38 :13 -->消息开始处理2023-11-03 15:18:38 :10 -->消息开始处理2023-11-03 15:18:38 :6 -->消息开始处理2023-11-03 15:18:38 :2 -->消息开始处理2023-11-03 15:18:38 :14 -->消息开始处理2023-11-03 15:18:36 :9 -->消息处理结束2023-11-03 15:18:36 :5 -->消息处理结束2023-11-03 15:18:36 :1 -->消息处理结束2023-11-03 15:18:36 :13 -->消息处理结束
我们使用了开启新线程的方式来消费消息 , 消息延迟的问题解决了 , 但是又出现了新的问题 , 消息会出现重复消费的情况
问题的原因 : 我们第一次定时 , 取出了符合条件的4条过期的消息 , 我们开启了4个线程去执行 , 当第二秒 , 我们又获取了符合条件的消息 ,
因为第一次获取的消息执行需要时间 , 那么我们第二次拿消息的时候 , 就会有可能把第一次的4条消息 , 也拿出来 , 然后开线程再次消费 , 就会出现重复消费的情况了
解决方案 :
这个问题出现原因是 , 当前线程不知道这个消息已经被其他线程正在处理了 ,只要解决这个问题 ,
当前线程开始处理这个消息,先判断当前消息有没有被其他线程处理 , 如果正在处理,则不进行处理了 , 如果没处理,则开始进行处理
我们知道 redis删除元素的 remove() 方法 , 有一个返回值 , 表示删除的状态 ,
我们可以在消息处理前 , 先 remove() 这个消息 , 如果 remove()成功,则表示当前消息没有被消费 , 如果 remove()失败,则表示该消息已经被消费了
/** * 处理已到期的消息(轮询) */@Scheduled(fixedDelay =1000)publicvoidhandleExpiredMessages(){StringcurrentTime =getCurrentTime();// 1 : 扫描任务,并将需要执行的任务加入到任务队列中List<DelayMessage>messages =delayQueue.getExpiredMessages();System.out.println(currentTime +" 待处理消息数量:"+messages.size());// 2 : 开始处理消息if(!messages.isEmpty()){for(DelayMessagemessage :messages){// 2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费Longremove =delayQueue.remove(message);if(remove >0){// 2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程newThread(()->{System.out.println(currentTime +" :"+message.getId()+" --> 消息开始处理");try{// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)Thread.sleep(3000);}catch(Exceptione){e.printStackTrace();}System.out.println(currentTime +" :"+message.getId()+" --> 消息处理结束");}).start();}}}}
- 执行结果 : 我们会发现 , 重复消费的问题 , 解决了
2023-11-03 15:31:36 待处理消息数量:42023-11-03 15:31:36 :1 -->消息开始处理2023-11-03 15:31:36 :13 -->消息开始处理2023-11-03 15:31:36 :5 -->消息开始处理2023-11-03 15:31:36 :9 -->消息开始处理2023-11-03 15:31:37 待处理消息数量:02023-11-03 15:31:38 待处理消息数量:42023-11-03 15:31:38 :10 -->消息开始处理2023-11-03 15:31:38 :14 -->消息开始处理2023-11-03 15:31:38 :2 -->消息开始处理2023-11-03 15:31:38 :6 -->消息开始处理2023-11-03 15:31:36 :9 -->消息处理结束2023-11-03 15:31:36 :5 -->消息处理结束2023-11-03 15:31:36 :13 -->消息处理结束2023-11-03 15:31:36 :1 -->消息处理结束2023-11-03 15:31:39 待处理消息数量:02023-11-03 15:31:40 待处理消息数量:02023-11-03 15:31:38 :10 -->消息处理结束2023-11-03 15:31:38 :2 -->消息处理结束2023-11-03 15:31:38 :6 -->消息处理结束2023-11-03 15:31:38 :14 -->消息处理结束2023-11-03 15:31:41 待处理消息数量:42023-11-03 15:31:41 :11 -->消息开始处理2023-11-03 15:31:41 :15 -->消息开始处理2023-11-03 15:31:41 :3 -->消息开始处理2023-11-03 15:31:41 :7 -->消息开始处理2023-11-03 15:31:42 待处理消息数量:02023-11-03 15:31:43 待处理消息数量:02023-11-03 15:31:41 :7 -->消息处理结束2023-11-03 15:31:41 :11 -->消息处理结束2023-11-03 15:31:41 :3 -->消息处理结束2023-11-03 15:31:41 :15 -->消息处理结束
但是还会出现问题 , 如果服务重启 , 或者服务宕机 , 那么当前执行中的消息 , 在下次服务启动的时候 , 就会出现消息丢失的情况
我给出的解决方案就是 : 创建一张临时数据表 , 当消息开始消费的时候 ,在表中添加一条记录,当消息消费成功,则把临时表中的记录删除
当服务重启 , 则把临时表中的记录,读到延迟队列中 , 就解决了消息丢失的情况
关键点
- 使用 缓存的key带内网ip的方式,解决了集群,多机器会出现的所有问题.
- 使用 后台线程,线程池,解决了消息堆积,延迟消费的问题.
- 使用 先删除key的方法 , 解决了消息重复消费的问题.
- 把当前处理的消息进行持久化,解决了消息丢失的问题.
这个只是我给出的解决方案 , 并不是完美的 , 如果想实现消息队列 , 最好是使用 RabbitMQ、
如下我先给同学们概括下,针对Spring Boot项目,如何利用Redis实现延迟队列的一些实现步骤?
- 引入相关依赖 (集成redis)
<!--集成redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
- 配置redis
#redis配置Spring:redis:database:0#Redis数据库索引(默认为0)host:127.0.0.1 #redis服务器ip,由于我是搭建在本地,固指向本地ipport:6379#redis服务器连接端口password:#redis服务器连接密码(默认为空)# 连接池配置jedis.pool:max-active:20#连接池最大连接数(使用负值表示没有限制)max-wait:-1#连接池最大阻塞等待时间(使用负值表示没有限制)max-idle:10#连接池中的最大空闲连接min-idle:0#连接池中的最小空闲连接timeout:1000#连接超时时间(毫秒)。Kafka、使用Redis实现延迟队列
实现思路
redis作为一款高性能的NoSQL数据库,具备快熟读写,高并发,数据持久化等特点,非常适用与实现延迟队列 ,redis提供了丰富的数据结构.
其中利用redis的ZSET集合 (有序集合)数据结构就可以实现一个简单的延迟队列
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,
将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。同时,我们还将会给出对应的测试用例和测试结果。ActiveMQ、MetaMq等
详细步骤
本文将介绍如何使用Redis的Sorted Set数据结构来实现延迟队列,并提供一个完整的示例代码。然后另起一个线程,该线程会周期性地从zset中取出score最小
(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,
直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。RocketMQ、ZeroMQ、