host.name是当前计算机的ip地址
发布时间:2025-06-24 19:02:58 作者:北方职教升学中心 阅读量:769
安装jdk可以参考:Windows和Linux安装jdk,此处使用kafka自带的zookeeper,不单独安装。查看消费者组以及消息是否积压
查看消费者组的命令:
bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list
查看某个消费者组的消息是否有积压的命令:
bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group 消费者组的名称
查看所有消费者组的消息是否有积压的命令:
#查看所有组的积压情况(Linux命令)bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
上图中GROUP表示消费者组,TOPIC表示消息主题,PARTITION表示分区,CURRENT-OFFSET表示当前消费的消息条数,LOG-END-OFFSET表示kafka中生产的消息条数,LAG表示kafka中有多少条消息还未消费,也就是有多少条积压的消息。
1.数据丢失:若开启自动提交,且自动提交的间隔时间(默认是5秒)到了,消费者会将拉取的这批数据的offset保存到_consumer_offsets中。
package com.example.kafka.producer;import com.example.kafka.config.MyKafkaConfig;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;/** * kafka生产者 * 创建kafka生产者并生产消息的步骤: * 1.启动zookeeper和kafka * 2.创建topic * 3.启动producer * * @Author: 倚天照海 */public class MyProducer { /** * 1.创建topic:进入到kafka安装目录的bin目录下,执行kafka-topics.sh(Linux系统)或windows\kafka-topics.bat(windows系统)脚本 * Linux系统: bin/kafka-topics.sh --zookeeper 192.168.10.188:2181/kafka --create --replication-factor 2 --partitions 2 --topic testKafka * windows系统: bin\windows\kafka-topics.bat --zookeeper 192.168.10.188:2181 --create --replication-factor 2 --partitions 2 --topic testKafka * 2.启动producer * Linux系统: bin/kafka-console-producer.sh --broker-list 192.168.10.188:9092 --topic testKafka * windows系统: bin\windows\kafka-console-producer.bat --broker-list 192.168.10.188:9092 --topic testKafka */ public void produce() throws ExecutionException, InterruptedException { MyKafkaConfig kafkaConfig = new MyKafkaConfig(); Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer()); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaConfig.getKeySerializerClass()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaConfig.getValueSerializerClass()); String topic = kafkaConfig.getTopic(); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); doProduce(producer, topic); } private void doProduce(KafkaProducer<String, String> producer, String topic) throws ExecutionException, InterruptedException { while (true) { for (int i = 0; i < 3; i++) { for (int j = 0; j < 3; j++) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item" + j, "price" + i); Future<RecordMetadata> future = producer.send(record); RecordMetadata recordMetadata = future.get(); int partition = recordMetadata.partition(); long offset = recordMetadata.offset(); System.out.println("key=" + record.key() + ", value=" + record.value() + ", partition=" + partition + ", offset=" + offset); } } } }}
3、编写producer
根据公司业务逻辑编写producer,用于生产消息。
bin\windows\kafka-console-consumer.bat --bootstrap-server 192.168.10.188:9092 --topic kjTest --from-beginning
参数--zookeeper 192.168.10.188:2181中的ip和port是zookeeper节点的ip和zookeeper的port,参数--bootstrap-server 192.168.10.188:9092中的ip和port是kafka节点的ip和kafka的port。编写consumer
编写consumer,用于接受消息。自动提交是异步提交,开启自动提交可能会造成数据丢失或重复消费数据 // properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交的间隔时间(多长时间会触发自动提交),默认是5秒 // properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000"); // kafka的消费者是按批次拉取数据,该参数是设置一批最多拉取多少条数据 // properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); List<String> topics = Collections.singletonList(kafkaConfig.getTopic()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); doConsume(consumer, topics); } private void doConsume(KafkaConsumer<String, String> consumer, List<String> topics) { // 消费者订阅主题消息,多个consumer会动态负载均衡多个分区 // 例如有两个分区,最开始只启动一个consumer,会给这个consumer分配两个分区,它会消费两个分区的数据。
在kafka中,消费者是按批次拉取数据的,每一批次拉取的数据条数是0-n条,每个消费者可以拉取多个分区的数据,但是一个分区的数据只能被同一个消费者组中的一个消费者拉取。后来的版本都统一由broker管理,所以在启动consumer时就用bootstrap-server。producer和consumer都相当于这个服务端的客户端。配置kafka相关参数
package com.example.kafka.config;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.common.serialization.StringSerializer;/** * 此处为了简化,直接将kafka配置信息写到代码中, * 实际项目中需要从application.yml配置文件中读取 * * @Author: 倚天照海 */public class MyKafkaConfig { /** * kafka集群地址,多个地址用逗号分隔 */ private String bootstrapServer = "localhost:9092"; private String topic = "testKafka"; /** * 消费者组 */ private String consumerGroupId = "consumer-group-01"; /** * kafka中保存的是将数据序列化后的字节数组,需要指定key和value的序列化方式 */ private String keySerializerClass = StringSerializer.class.getName(); private String valueSerializerClass = StringSerializer.class.getName(); /** * kafka中key和value的反序列化方式 */ private String keyDeserializerClass = StringDeserializer.class.getName(); private String valueDeserializerClass = StringDeserializer.class.getName(); public String getBootstrapServer() { return bootstrapServer; } public String getTopic() { return topic; } public String getConsumerGroupId() { return consumerGroupId; } public String getKeySerializerClass() { return keySerializerClass; } public String getValueSerializerClass() { return valueSerializerClass; } public String getKeyDeserializerClass() { return keyDeserializerClass; } public String getValueDeserializerClass() { return valueDeserializerClass; }}
2、listeners、异常
启动consumer时可能会报下面的异常:
kafka的consumer:java.nio.channels.ClosedChannelException
解决方法:
出现以上异常是由于服务器没有做kafka的主机名与ip的映射,
linux的目录是/etc/hosts
windows的目录是C:\Windows\System32\drivers\etc\hosts
二、host.name、旧版本(0.9以前)的kafka,消费的进度(offset)是写在zk中的,所以启动consumer需要知道zk的地址。
3.bootstrap-servers指的是kafka目标集群的服务器地址,这和broker-list功能一样,不过在启动producer时要求用broker-list,在启动consumer时用bootstrap-servers。启动zk和kafka
启动zk:
进入到kafka的安装目录kafka_2.11-1.0.0下,同时按住shift和鼠标右键,选择“打开命令窗口”选项,或者win+R输入cmd,打开命令行窗口。
启动kafka:
bin\windows\kafka-server-start.bat config\server.properties
4、编写测试类
package com.example.kafka;import com.example.kafka.consumer.MyConsumer;import com.example.kafka.producer.MyProducer;import org.junit.Test;import java.util.concurrent.ExecutionException;/** * @Author: 倚天照海 */public class Main { MyProducer myProducer = new MyProducer(); MyConsumer myConsumer = new MyConsumer(); @Test public void produceTest() { produceData(); } @Test public void consumeTest() { consumeData(); } private void produceData() {// MyProducer myProducer = new MyProducer(); try { myProducer.produce(); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } private void consumeData() {// MyConsumer myConsumer = new MyConsumer(); myConsumer.consume();}}
测试结果:
生产者:
消费者:
Kafka中消息积压情况: