发布时间:2025-06-24 21:08:56 作者:北方职教升学中心 阅读量:564
二、
上述情况的优点在于:可以明确topic和消费者,启动时程序主动就创建好对应topic的消费容器和消费方法,直接消费即可。
Listener相关包含了:容器的创建、总结: kafka动态的topic创建和设置主要是通过客户端adminClient来操作,而监听容器则是通过监听注册的kafkaListenerEndpointRegistry来进行创建和管理。代码展示<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId</dependency>
importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.*;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;/** *@ClassName KafkaConfig *@Description: TODO kafka的配置类 **/@Configuration@EnableKafkapublicclassKafkaConfig{privatestaticfinalStringkafkaServer ="kafka-ip:9092";//kafka地址/** * @Title producerFactory * @Description TODO 生产者工厂类,设置生产者相关配置 * @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object> */@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object>props =newHashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaServer);//kafka 地址props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//序列化props.put(ProducerConfig.ACKS_CONFIG,"all");//确认机制,all是所有副本确认,1是一个副本确认,0是不需要副本确认props.put(ProducerConfig.BATCH_SIZE_CONFIG,"10");//批量发送大小props.put(ProducerConfig.LINGER_MS_CONFIG,"1");//批量发送等待时间 和上面的batch-size谁先到先发送returnnewDefaultKafkaProducerFactory<>(props);}/** * @Title kafkaTemplate * @Description TODO kafka生产者工具类 * @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object> */@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}/** * @Title consumerFactory * @Description TODO 消费者工厂类,配置消费者的一些配置 * @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object> */@BeanpublicConsumerFactory<String,Object>consumerFactory(){Map<String,Object>props =newHashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaServer);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,5*1024*1024);//每次抓取消息的大小props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//是否自动提交props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,50*1000*1000);//请求超时时间props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);returnnewDefaultKafkaConsumerFactory<>(props);}/** * @Title kafkaListenerContainerFactory * @Description TODO 监听容器的工厂类,创建监听容器时使用 * @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object> */@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object>factory =newConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());returnfactory;}/** * @Title adminClient * @Description TODO kafka客户端 * @return org.apache.kafka.clients.admin.AdminClient */@BeanpublicAdminClientadminClient(){Propertiesprops =newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaServer);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);AdminClientadminClient =AdminClient.create(props);returnadminClient;}}
importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.config.ConfigResource;importorg.apache.kafka.common.config.TopicConfig;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerEndpointRegistry;importorg.springframework.kafka.config.MethodKafkaListenerEndpoint;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.listener.MessageListenerContainer;importorg.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;importorg.springframework.stereotype.Component;importjava.lang.reflect.Method;importjava.util.Collections;importjava.util.Set;/** * @ClassName: KafkaUtil * @Description: TODO 用于创建kafka Topic队列和listener监听容器的工具类 **/@Component@Slf4jpublicclassKafkaUtil{privatestaticAdminClientadminClient;privatestaticKafkaListenerEndpointRegistrykafkaListenerEndpointRegistry;privatestaticKafkaTemplatekafkaTemplate;/** * @Title KafkaUtil * @Description 构造函数注入 * @param adminClient kafka客户端对象 * @param kafkaListenerEndpointRegistry kafka监听容器注册对象 * @param kafkaListenerEndpointRegistry kafka生产者工具类 * @return */@AutowiredpublicKafkaUtil(AdminClientadminClient,KafkaListenerEndpointRegistrykafkaListenerEndpointRegistry,KafkaTemplatekafkaTemplate){KafkaUtil.adminClient =adminClient;KafkaUtil.kafkaListenerEndpointRegistry =kafkaListenerEndpointRegistry;KafkaUtil.kafkaTemplate =kafkaTemplate;}//region topic相关方法/** * @Title createTopic * @Description 创建kafka topic * @param topicName topic名 * @param partitions 分区数 * @param replicas 副本数(short) * @return void */publicstaticvoidcreateTopic(StringtopicName,intpartitions,shortreplicas)throwsException{NewTopicnewTopic =newNewTopic(topicName,partitions,replicas);CreateTopicsResulttopics =adminClient.createTopics(Collections.singleton(newTopic));topics.all().get();log.info("【{}】topic创建成功",topicName);}/** * @Title deleteTopic * @Description 删除topic * @param topicName topic名称 * @return void */publicstaticvoiddeleteTopic(StringtopicName)throwsException{DeleteTopicsResultdeleteTopicsResult =adminClient.deleteTopics(Collections.singleton(topicName));deleteTopicsResult.all().get();log.info("【{}】topic删除成功",topicName);}/** * @Title updateTopicRetention * @Description 修改topic的过期时间 * @param topicName topic名称 * @param ms 过期时间(毫秒值) * @return void */publicstaticvoidupdateTopicRetention(StringtopicName,Stringms)throwsException{ConfigResourceresource =newConfigResource(ConfigResource.Type.TOPIC,topicName);ConfigEntryconfigEntry =newConfigEntry(TopicConfig.RETENTION_MS_CONFIG,ms);Configconfig =newConfig(Collections.singleton(configEntry));// 创建AlterConfigsOptionsAlterConfigsOptionsalterConfigsOptions =newAlterConfigsOptions().timeoutMs(10000);// 执行修改操作adminClient.alterConfigs(Collections.singletonMap(resource,config),alterConfigsOptions).all().get();log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒",topicName,ms);}/** * @Title listTopic * @Description 获取topic列表 * @return java.util.Set<java.lang.String> */publicstaticSet<String>listTopic()throwsException{ListTopicsResultlistTopicsResult =adminClient.listTopics();Set<String>strings =listTopicsResult.names().get();returnstrings;}/** * @Title existTopic * @Description topic是否存在 * @param topicName topic名称 * @return boolean */publicstaticbooleanexistTopic(StringtopicName)throwsException{Set<String>strings =listTopic();if(strings ==null||strings.isEmpty()){returnfalse;}returnstrings.contains(topicName);}//endregion//region 生产者发送消息示例/** * @Title sendMsg * @Description 通过注册信息找到对应的容器并启动 * @param topic 队列名称 * @param msg 消息 * @return void */publicstaticvoidsendMsg(Stringtopic,Objectmsg)throwsException{kafkaTemplate.send(topic,msg);//kafkaTemplate.send(topic,2,"key",msg);//带有分区和key值的}//endregion//region 消费者监听容器相关方法/** * @Title existListenerContainer * @Description TODO 根据ID查询容器是否存在 * @param id 监听容器id * @return boolean */publicstaticbooleanexistListenerContainer(Stringid)throwsException{Set<String>listenerIds =kafkaListenerEndpointRegistry.getListenerContainerIds();returnlistenerIds.contains(id);}/** * @Title registerListener * @Description TODO 创建kafka监听容器并注册到注册信息中,一次可以注册多个topic的监听容器 * @param id 容器id,自定义 * @param consumerGroupId 消费者组id自定义 * @param processBean 处理消息的类 * @param processMethod 处理消息的方法 * @param topics 需要监听的topic数组 * @return void */publicstaticvoidregisterListenerContainer(Stringid,StringconsumerGroupId,ObjectprocessBean,MethodprocessMethod,String...topics)throwsException{//判断id是否存在if(existListenerContainer(id)){//如果当前id的容器已存在,不添加log.info("当前id为{}的容器已存在,不进行添加操作!",id);return;}//判断所有队列是否存在for(Stringtopic :topics){if(!existTopic(topic)){//如果存在topic不存在,不添加log.info("【{}】topic不存在,不进行添加操作!",topic);return;}}MethodKafkaListenerEndpoint<String,String>endpoint =newMethodKafkaListenerEndpoint<>();//设置监听器端点相关信息//设置Idendpoint.setId(id);//设置消费者组endpoint.setGroupId(consumerGroupId);//设置要监听的topic数组,可以是多个endpoint.setTopics(topics);//设置每个监听器线程数endpoint.setConcurrency(3);//设置批量监听endpoint.setBatchListener(true);//设置消息处理工厂类,这里用的是默认工厂endpoint.setMessageHandlerMethodFactory(newDefaultMessageHandlerMethodFactory());//设置实际处理的Bean对象,即实际的对象,比如new Class();endpoint.setBean(processBean);//设置实际处理的方法(包含方法名和参数)endpoint.setMethod(processMethod);//注册Container并启动,startImmediately表示立马启动kafkaListenerEndpointRegistry.registerListenerContainer(endpoint,SpringUtil.getBean(KafkaListenerContainerFactory.class),true);log.info("Kafka监听容器操作:ID为{}的容器已【注册】,监听的topics:{}",id,topics);// for (String topicName : topics) {// if (!KafkaConfig.notExistTopicCreateContainerFlag && !nameTopics.contains(topicName)) {// log.info("【{}】topic不存在,不创建容器!", topicName);// continue;//}// //创建一个kafka监听器端点对象// MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();// //设置监听器端点相关信息// //设置Id// endpoint.setId(topicName);// //设置消费者组// endpoint.setGroupId(topicName + "_consumer_group");// //设置主题// endpoint.setTopics(topicName);// //设置每个监听器线程数// endpoint.setConcurrency(3);// //设置批量监听// endpoint.setBatchListener(true);// //设置默认处理工厂// endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());// //设置实际处理的Bean对象// endpoint.setBean(new ConsumerController());// //设置实际处理的方法名和参数类型// endpoint.setMethod(ConsumerController.class.getMethod("consumeMessage", String.class));// //注册Container并启动// kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true);// log.info("Kafka监听容器操作:ID为{}的容器已【注册】", topicName);//}}/** * @Title startListenerContainer * @Description 根据id开启监听容器的运行状态 * @param id 监听容器的id * @return void */publicstaticvoidstartListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!",id);return;}listenerContainer.start();log.info("Kafka监听容器操作:ID为{}的容器已【开启】",id);}/** * @Title stopListenerContainer * @Description TODO 根据id停止监听容器的运行状态 * @param id 监听容器的id * @return void */publicstaticvoidstopListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!",id);return;}listenerContainer.stop();log.info("Kafka监听容器操作:ID为{}的容器已【停止】",id);}/** * @Title pauseListenerContainer * @Description TODO 根据id暂停监听容器的监听状态 * @param id 监听容器的id * @return void */publicstaticvoidpauseListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!",id);return;}listenerContainer.pause();log.info("Kafka监听容器操作:ID为{}的容器已【暂停】",id);}/** * @Title resumeListenerContainer * @Description TODO 根据id恢复监听容器的监听状态 * @param id 监听容器的id * @return void */publicstaticvoidresumeListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!",id);return;}listenerContainer.resume();log.info("Kafka监听容器操作:ID为{}的容器已【恢复】",id);}/** * @Title isNormalStateListenerContainer * @Description 是否是正常状态的容器 * (kafka监听容器的运行状态标志是running,监听状态标志是pauseRequested,停止是关闭了资源,暂停是停止消费) * 只有running是true,并且pauseRequested是false,监听容器才能正常消费消息 * @param id 监听容器的id * @return boolean */publicstaticbooleanisNormalStateListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);//如果不存在此id容器,则返回falseif(listenerContainer ==null){returnfalse;}//存在则返回容器的运行状态和非暂停状态returnlistenerContainer.isRunning()&&!listenerContainer.isPauseRequested();}/** * @Title getPauseStateListenerContainer * @Description 获取监听容器的暂停状态(监听的状态) * @param id 监听容器id * @return boolean */publicstaticbooleangetPauseStateListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){returntrue;}returnlistenerContainer.isPauseRequested();}/** * @Title getRunningStateListenerContainer * @Description 获取监听容器的运行状态(容器的状态) * @param id 监听容器id * @return boolean */publicstaticbooleangetRunningStateListenerContainer(Stringid)throwsException{MessageListenerContainerlistenerContainer =kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){returnfalse;}returnlistenerContainer.isRunning();}/** * @Title setStateNormalListenerContainer * @Description 使容器的运行状态和监听状态都是正常 * @param id 监听容器的id * @return boolean 正常返回true,非正常返回false */publicstaticbooleansetStateNormalListenerContainer(Stringid)throwsException{if(!existListenerContainer(id)){log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!",id);returnfalse;}//先判断容器运行状态是否正常,如果不正常,则开启if(!getRunningStateListenerContainer(id)){startListenerContainer(id);}//再判断容器监听状态是否正常,如果不正常,则恢复if(getPauseStateListenerContainer(id)){resumeListenerContainer(id);}//设置完后,再查询状态并返回。
🙉在小小的电脑上面敲呀敲呀敲,写短短的代码,埋小小的坑🙈
🙉在大大的电脑上面敲呀敲呀敲,写大大的代码,埋大大的坑🙈
🙉在特别大的电脑上面敲呀敲呀敲,写特别大的代码,埋特别大的坑🙈
🙉优秀的你肯定是一个不爱写Bug并且爱点赞关注的靓仔吧!🙈