fetch.min.bytes:提取的最小字节数。
fetch.max.bytes:提取的最大字节数。
max.partition.fetch.bytes:分区拉取的最大字节数。
fetch.max.wait.ms:最大等待时间拉动操作。
max.poll.records:最大记录数量的拉取操作。
上述参数决定了一次消费能消费多少条消息。
@Bean public KafkaListenerContainerFactory> kafkaBatchConsumerFactory() { return batchFactory(consumerConfig(KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_BATCH_GROUP, KafkaConsumerResetOffsetEnum.EARLIEST.getType())); }。
@KafkaListener(groupId = KafkaConsumerGroupIdConstant.KAFKA_CONSUMER_BATCH_GROUP, topics = KafkaTopicConstant.KAFKA_TOPIC, containerFactory = "kafkaBatchConsumerFactory") public void batchConsume(List> records, Acknowledgment ack) { String topicName = KafkaTopicConstant.KAFKA_TOPIC; try { List valueList = records.stream() .map(ConsumerRecord::value).collect(Collectors.toList()); for (String value : valueList) { log.info("topic批量消费:{ }, value:{ }", topicName, value); } ack.acknowledge(); } catch (Exception e) { log.error("kafka消费{ }:", topicName, e); } }。
2.1 配置参数。
2.2 消费代码。
2.3 消费结果 。
注:有些代码在博客中无法反映,点击以下链接跳转到Giteexudongbase项目的kafka分支。