发布时间:2025-06-24 20:02:12  作者:北方职教升学中心  阅读量:531


文章目录

  • 一、消费者
    • 1.引入库
    • 2.配置类
      • PublicConfig.java
      • MessageConsumer.java
    • 3.业务类

一、

生产者

1.引入库

引入需要依赖的jar包,引入POM文件:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2.配置文件

配置Kafka的相关参数(或者你项目的cacos或者yaml文件里添加)

以下是一个示例配置:application.properties

ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092ccm.kafka.topics.xxx:xxx_content_dev

Tip:建议topic命名规则:租户简称+项目关键词+系统环境的方式,更容易区分

3.配置类

PublicConfig.java

@Data@Configuration@ConfigurationProperties(prefix ="ccm.kafka")//配置信息nacos中配置publicclassPublicConfig{privateStringservers;privateStringalertTopic;}

MessageProducer.java

@Slf4j@ComponentpublicclassMessageProducer{privateProducerproducerKafka;@AutowiredPublicConfigpublicConfig;/**     * 初始化方法     */@PostConstructpublicStringinit(){Propertiesprops =newProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"PLAINTEXT");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,String.valueOf(30*1000));props.put(ProducerConfig.ACKS_CONFIG,"all");producerKafka =newKafkaProducer(props);log.info("kafka message channel created successfully");return"OK";}publicResponseDatasend(Stringcontent,Stringtopic){longstartTime =System.currentTimeMillis();try{Stringkey =UUID.randomUUID().toString().replace("-","");ProducerRecord<String,String>kafkaMessage =newProducerRecord<>(topic,key,content);log.info("MessageProducer send key {},message{}",key,content);Future<RecordMetadata>send =producerKafka.send(kafkaMessage);send.get();log.info("MessageProducer send cost time:{}",System.currentTimeMillis()-startTime);}catch(Exceptione){log.error("MessageProducer Failed to push message:{}",e.getMessage());returnResponseData.errorWithMsg("MessageProducer Failed to push message:"+e.getMessage());}returnnull;}}

4.业务处理类

示例代码的业务场景:定时生成预警消息发送给下游系统调用。消费者

1.引入库

在消费者工程pom文件中配置依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2.配置类

同样根据该项目情况编写配置类,示例代码中仍为读取naco配置

PublicConfig.java

@Data@Configuration@Slf4j@ConfigurationProperties(prefix ="xman.kafka")publicclassPublicConfig{privateStringservers;privateMap<String,String>topics;publicStringgetTopic(StringappCode){if(Objects.isNull(topics)||topics.isEmpty()){returnnull;}returntopics.get(appCode);}privateStringalertTopic;privateStringgroup;}

MessageConsumer.java

@Slf4j@ComponentpublicabstractclassMessageConsumer{// 用于持续监听kafka消息的专用线程池privateExecutorServicethreadPool;// 用于持续消费kafka消息的专用线程池privateExecutorServiceconsumerThreadPool;@ResourceprivatePublicConfigpublicConfig;/**     * 初始化方法     */@PostConstructpublicStringinit(){MessageConfigFieldmessageConfig =MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if(StringUtils.isBlank(messageConfig.getServers())){//没有配置kafka信息return"OK";}initThreadPool();KafkaConsumer<String,String>instance =kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(),messageConfig.getTopic(),messageConfig.getClientName(),messageConfig.getUsername(),messageConfig.getPassword());startListen(instance);log.info("ccm kafka消息订阅成功:clientId:"+messageConfig.getClientName());return"OK";}privatevoidinitThreadPool(){if(null==threadPool){log.info("initThreadPool start");threadPool =Executors.newFixedThreadPool(1);log.info("initThreadPool done");}}privatevoidstartListen(KafkaConsumer<String,String>consumer){threadPool.submit(()->{TenantContext.setContextCode(CommonConstants.TENANT_CODE);while(true){try{ConsumerRecords<String,String>records =consumer.poll(Duration.ofSeconds(10));if(records ==null||records.isEmpty()){continue;}for(ConsumerRecord<String,String>record :records){Optional<String>kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Stringmsg =kafkaMessage.get();if(StringUtils.isNotBlank(msg)){log.info("msgJson:"+msg);consumeMsg(msg);}}}}catch(Exceptione){TimeUnit.SECONDS.sleep(1);log.error("consume error",e);}}});}publicstaticKafkaConsumer<String,String>kafkaInstance(Stringservers,Stringgroup,Stringtopic,StringclientId,Stringusername,Stringpassword){Propertiesprops =newProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);if(StringUtils.isNotBlank(group)){props.put(ConsumerConfig.GROUP_ID_CONFIG,group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer =newKafkaConsumer<>(props);List<String>subscribedTopics =newArrayList<>();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);returnconsumer;}/**     * 核心逻辑,由子类继承实现     *     * @param msgData msg     */publicabstractvoidconsumeMsg(StringmsgData)throwsException;}

3.业务类

@Slf4j@Service@RefreshScopepublicclassCmsInfoConsumerextendsMessageConsumer{@ResourceprivateInfoServiceinfoService;@OverridepublicvoidconsumeMsg(StringmsgData)throwsException{log.info("CmsWeatherConsumer收到mq消息message:{}",msgData);CcmAlertInfoDTOalertInfoDTO =JSONObject.parseObject(msgData,CcmAlertInfoDTO.class);try{//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);}catch(Exceptione){e.printStackTrace();log.info("同步用户消息失败:"+e);}}}

至此,一个简单的通过kafka同步预警消息的应用就开发完了。生产者

    • 1.引入库
    • 2.配置文件
    • 3.配置类
      • PublicConfig.java
      • MessageProducer.java
    • 4.业务处理类
  • 三、

    //启动类注意增加定时注解的支持@SpringBootApplication@MapperScan(basePackages ={"com.xx.xx.mapper","com.xx.xx.crawler.mapper"})@EnableSchedulingpublicclassCATApp{publicstaticvoidmain(String[]args){SpringApplication.run(CATApp.class,args);}}@Service@Slf4jpublicclassCrawlerService{@Scheduled(cron ="${crawler.scheduled.cron:0 */1 * * * ?}")// 每5分钟执行一次//   @Scheduled(cron = "${crawler.scheduled.cron:0 0 0/1 * * ?}") // 每小时执行一次publicvoidcrawlAndSaveAlertInfos(){log.info(">>>>>>>>>>>>> crawlAndSaveAlertInfos  ");//替换成具体的业务场景 List<AlertInfo>alertInfos =fetchAlertInfoList();if(!alertInfos.isEmpty()){for(AlertInfoalertInfo :alertInfos){//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}}/** * * 预警消息通过Kafka异步同步其他应用 */publicinterfaceCrawlerAlertSyncService{voidsendCrawlerAlertMsgKafka(AlertInfoalertInfo);}@Slf4j@ServicepublicclassCrawlerAlertSyncServiceImplimplementsCrawlerAlertSyncService{@AutowiredprivateMessageProducermessageProducer;@ResourceprivatePublicConfigpublicConfig;@OverridepublicvoidsendCrawlerAlertMsgKafka(AlertInfoalertInfo){Stringtopic =publicConfig.getAlertTopic();Stringservers =publicConfig.getServers();log.info("send publish msg to kafka  ,topic:{},bizId:{}",topic,alertInfo.getAlertid());log.info("send publish msg to kafka  ,servers:{}",servers);Stringcontent =JSON.toJSONString(alertInfo);log.info("send publish msg to kafka  ,content:{}",content);if(StringUtils.isNotBlank(topic)){messageProducer.send(content,topic);}}}

    三、