java Kafka批量消费和单消费消息
时间:2025-06-24 12:31:35 来源:新华社
【字体:  

1、批量消费新闻。

1.1 配置参数。

fetch.min.bytes:提取的最小字节数。

fetch.max.bytes:提取的最大字节数。

max.partition.fetch.bytes:分区拉取的最大字节数。

fetch.max.wait.ms:最大等待时间拉动操作。

max.poll.records:最大记录数量的拉取操作。

上述参数决定了一次消费能消费多少条消息。

@Bean    public KafkaListenerContainerFactory kafkaBatchConsumerFactory() {         return batchFactory(consumerConfig(KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_BATCH_GROUP, KafkaConsumerResetOffsetEnum.EARLIEST.getType()));    }。

 1.2 消费代码。

@KafkaListener(groupId = KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_BATCH_GROUP,            topics = KafkaTopicConstant.KAFKA_TOPIC,            containerFactory = "kafkaBatchConsumerFactory")    public void batchConsume(List> records, Acknowledgment ack) {         String topicName = KafkaTopicConstant.KAFKA_TOPIC;        try {             List valueList = records.stream()                    .map(ConsumerRecord::value).collect(Collectors.toList());            for (String value : valueList) {                 log.info("topic批量消费:{ }, value:{ }", topicName, value);            }            ack.acknowledge();        } catch (Exception e) {             log.error("kafka消费{ }:", topicName, e);        }    }。

1.3 消费结果 。

1.3 消费结果 。

2、单一消费信息。

2.1 配置参数。

@Bean public KafkaListenerContainerFactory kafkaSingleConsumerFactory() { ConcurrentKafkaListenerContainerFactory resultFactory = batchFactory(consumerConfig(KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_SINGLE_GROUP, KafkaConsumerResetOffsetEnum.EARLIEST.getType())); resultFactory.setBatchListener(false); return resultFactory; }。

2.2 消费代码。

@KafkaListener(groupId = KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_SINGLE_GROUP, topics = KafkaTopicConstant.KAFKA_TOPIC, containerFactory = "kafkaSingleConsumerFactory") public void singleConsume(ConsumerRecord records, Acknowledgment ack) { String topicName = KafkaTopicConstant.KAFKA_TOPIC; try { log.info("单个消费 topic:{ }, value:{ }", topicName, records.value()); ack.acknowledge(); } catch (Exception e) { log.error("kafka消费{ }:", topicName, e); } }。

2.3 消费结果 。

2.3 消费结果 。

注:icon-default.png?t=O83A有些代码在博客中无法反映,点击以下链接跳转到Giteexudongbase项目的kafka分支。

xudongbase: 主要是项目中常用的方法,easyexcel现有分支正在不断更新。欢迎Star和Issues提交。easyexcel分支:批量设置风格󿀌批量添加批注󿀌批量合并单元格󿀌设置冻结行和列,设置行高列宽󿀌隐藏行和列󿀌绑定下拉框数据,设置水印󿀌插入图片 - Gitee.com。https://gitee.com/xudong_master/xudongbase/tree/kafka/。

[责任编辑:百度一下]
检察日报数字报 | 正义网 |
Copyrights©最高人民检察院 All Rights Reserved.