前言
其实,“通过Redis手动更新Ribbon缓存来解决Eureka微服务架构中服务下线感知的问题”是一种解,但不是最优解
1.痛点
上一篇文章的标题是:
通过Redis手动更新Ribbon缓存来解决Eureka微服务架构中服务下线感知的问题
当时在文章的末尾就指出,使用Redis+AOP的方式有很多漏洞,只有在服务调用方发送调用请求的情况下才会触发切面中更新Ribbon缓存的逻辑。但是如果告警接口数量众多,并且无法定位,上述方法就有些不够看了。
2.解决方案
于是,基于此种困境,我想到了用mq的事件驱动模式来推进Ribbon缓存更新(“下线”这一事件驱动,而不是“发送跨服务调用请求”这一事件),具体如下:

即,当服务被调用方中调用了下线接口下线了指定服务,会生产消息到MQ里,服务被调用方会监听这个队列去消费消息,并通过消费消息这一事件(消费下线服务端口信息)去驱动更新Ribbon缓存。可能这里大家看到去改配置有点鸡肋并且在实际场景中也不太现实,但别急,暂时先往下看,后面我会专门写一篇文章来解决这一问题
3.具体实现
3.1配置RabbitMQ
1.配置RabbitMQ(安装这些大家可以去看看平台比较成熟的文章)这里就不写了,我是直接在服务器上用docker容器化运行的:

在调用方与被调用方都配好MQ
3.2生产下线消息
首先声明一个队列:
@Configuration@EnableRabbitpublicclassRabbitMqConfig{@BeanpublicQueuetheQueue(){returnnewQueue("SERVER_LIST");}}
服务下线接口处,生产下线消息到MQ,向这接口/service-down-list
发送GET请求,传递指定的下线服务实例信息即可下线服务,即http://localhost:8081/control/service-down-list?portParams=8083
就下线了8083服务实例
@Value("${eureka-server.ipAddress}")privateStringipAddress;@Value("${eureka-server.appName}")privateStringappName;@Value("${DIY_QUEUE.VALUE}")privateStringqueueName;@GetMapping(value ="/service-down-list")publicStringoffLine(@RequestParamList<Integer>portParams){List<Integer>successList =newArrayList<>();List<InstanceInfo>instances =eurekaClient.getInstancesByVipAddress(appName,false);List<Integer>servicePorts =instances.stream().map(InstanceInfo::getPort).collect(Collectors.toList());OkHttpClientclient =newOkHttpClient();log.error("开始时间:{}",System.currentTimeMillis());portParams.parallelStream().forEach(temp ->{if(servicePorts.contains(temp)){Stringurl ="http://"+ipAddress +":"+temp +"/control/service-down";try{Responseresponse =client.newCall(newRequest.Builder().url(url).build()).execute();if(response.code()==200){log.debug(temp +"服务下线成功");successList.add(temp);}else{log.debug(temp +"服务下线失败");}}catch(IOExceptione){log.error(e.toString());}}});HashMap<String,List<Integer>>portInfo =newHashMap<>();portInfo.put(appName,successList);rabbitTemplate.convertAndSend(queueName,portInfo);returnsuccessList +"优雅下线成功";}
这里向MQ的队列里传递了下线的服务实例端口信息
3.3更新Ribbon缓存
服务调用方通过“下线“这一事件驱动Ribbon缓存更新
@Slf4j@ComponentpublicclassConsumer{@ResourceSpringClientFactoryspringClientFactory;@ResourceClearRibbonCacheBeanclearRibbonCacheBean;@RabbitListener(queues ="SERVER_LIST")publicvoidlistenWorkQueue1(HashMap<String,List<Integer>>message){log.debug("消费者1接收到消息——"+message +"时间为:"+LocalTime.now());for(Stringkey :message.keySet()){List<Integer>value =message.get(key);log.debug("Key: "+key);log.debug("Value: "+value);if(ObjectUtils.isNotEmpty(value)){clearRibbonCacheBean.clearRibbonCache(springClientFactory,value.toString(),key);}log.debug("现在的所有服务列表:{}",springClientFactory.getLoadBalancer(key).getAllServers());}}}
清理Ribbon缓存的Bean:
@Configuration@Slf4jpublicclassClearRibbonCacheBean{publicstaticbooleancutDown(List<Integer>ports,Serverindex){returnports.contains(index.getPort());}publicvoidclearRibbonCache(SpringClientFactoryclientFactory,StringportParams,StringappName){ILoadBalancerloadBalancer =clientFactory.getLoadBalancer(appName);List<Server>reachableServers =loadBalancer.getReachableServers();List<Integer>portList =StringChange.stringToList(portParams);List<Server>ableServers =reachableServers.stream().filter(temp ->!cutDown(portList,temp)).collect(Collectors.toList());log.debug("可用服务列表:{}",ableServers);((BaseLoadBalancer)loadBalancer).setServersList(ableServers);}
3.4压测
运行项目,调用下线接口并压测来模拟一下线上场景:
此时我们调用下线接口,下线8083服务实例:

压测结果,均无异常:
观察服务实例的日志输出:
未下线的8081,8084


下线的8083

这说明,Eureka服务下线感知的延迟已经完全被消除
4.优化
以上的MQ还是采用简单队列的模式,即生产者生产一条消息到队列中,该消息也只能被一个消费者消费到。