Kafka简介kafka核心概念二、

发布时间:2025-06-24 20:08:02  作者:北方职教升学中心  阅读量:299


分区允许数据水平分布和并行处理。

1.2 安装包下载

官方下载地址:http://kafka.apache.org/downloads.html
在这里插入图片描述
我这里下载的是:kafka_2.12-3.6.0.tgz

# 在线下载安装包wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.12-3.6.0.tgz

2.安装KafKa

将安装包传送到服务器并解压,这里我放到opt下面

1. 解压安装包
cd  opttar -zxvf kafka_2.12-3.6.0.tgz
2. 配置kafka

在kafka解压目录同一路径下创建

mkdir -p /opt/software/kafkamkdir -p /opt/software/kafka/zookeeper  #zookeeper数据目录mkdir -p /opt/software/kafka/log        #kafka日志mkdir -p /opt/software/kafka/zookeeper/log  #zookeeper日志
3 进入配置文件目录
cd /opt/kafka_2.12-3.6.0/config/
4 修改配置文件server.properties,添加下面内容
broker.id=0port=9092#端口号host.name=localhost #服务器IP地址,修改为自己的服务器IPlog.dirs=/opt/software/kafka/log  #日志存放路径,上面创建的目录zookeeper.connect=localhost:2181#zookeeper地址和端口,单机配置部署,localhost:2181
5 配置zookeeper服务 zookeeper.properties
dataDir=/opt/software/kafka/zookeeper   #zookeeper数据目录dataLogDir=/opt/software/kafka/zookeeper/log #zookeeper日志目录clientPort=2181maxClientCnxns=100tickTimes=2000initLimit=10syncLimit=5
6 创建启动和关闭的 kafka 执行脚本
6.1 创建启动脚本
cd /opt/kafka_2.12-3.6.0/vi kafkaStart.sh

配置启动脚本 kafkaStart.sh

#启动zookeeper/opt/kafka_2.12-3.6.0/bin/zookeeper-server-start.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties &sleep 3#等3秒后执行 #启动kafka/opt/kafka_2.12-3.6.0/bin/kafka-server-start.sh /opt/kafka_2.12-3.6.0/config/server.properties &
6.2 创建关闭脚本 kafkaStop.sh
cd /opt/kafka_2.12-3.6.0/vi kafkaStop.sh

配置关闭脚本 kafkaStop.sh

#关闭zookeeper/opt/kafka_2.12-3.6.0/bin/zookeeper-server-stop.sh /opt/kafka_2.12-3.6.0/config/zookeeper.properties &sleep 3#等3秒后执行 #关闭kafka/opt/kafka_2.12-3.6.0/bin/kafka-server-stop.sh /opt/kafka_2.12-3.6.0/config/server.properties &
7 启动脚本,关闭脚本赋予权限
chmod 777kafkaStart.shchmod 777kafkaStop.sh

启动和关闭kafka

cd /opt/kafka_2.12-3.6.0/sh kafkaStart.sh #启动sh kafkaStop.sh  #关闭
8 创建生产者 topic 和 消费者 topic

cd /opt/kafka_2.12-3.6.0/bin/#进入kafka目录./kafka-console-producer.sh --broker-list localhost:9092--topic test  #创建生产者 test你要建立的topic名./kafka-console-consumer.sh --bootstrap-server localhost:9092--topic test #创建消费者

​ 查看 kafka 是否启动

[root@localhost kafka_2.12-3.6.0]# jps21324QuorumPeerMain 15211Jps21215Kafka

里面有QuorumPeerMain和kafkas说明启动成功了

查看当前的一些topic

cd /opt/kafka_2.12-3.6.0/bin/./kafka-topics.sh --zookeeper localhost:2181--list./kafka-topics.sh --list --bootstrap-server localhost:9092
9 Spring boot集成Kafka

1、安装Kafka

    • 1.准备工作
            • 1.1 Java
            • 1.2 安装包下载
    • 2.安装KafKa
          • 1. 解压安装包
          • 2. 配置kafka
          • 3 进入配置文件目录
          • 4 修改配置文件server.properties,添加下面内容
          • 5 配置zookeeper服务 zookeeper.properties
          • 6 创建启动和关闭的 kafka 执行脚本
            • 6.1 创建启动脚本
            • 6.2 创建关闭脚本 kafkaStop.sh
          • 7 启动脚本,关闭脚本赋予权限
          • 8 创建生产者 topic 和 消费者 topic
          • 9 Spring boot集成Kafka

  • 前言

    例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。生产者将消息发布到主题,消费者从主题中订阅消息。


    提示:以下是本篇文章正文内容,下面案例可供参考

    一、 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    文章目录

    • 前言
    • 一、

      二、

      Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。

      kafka核心概念

      在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:

      Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。

      Topic: 消息发布的主题,是数据流的类别。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka简介

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。

    Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。

    Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。

    Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。pom依赖

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>

    2.消费者

    @Autowiredprivate KafkaTemplate<String,String>kafkaTemplate;@RequestMapping("/userGets")public Object gets(){// send 第一个参数为topic的名称,第二个参数为我们要发送的信息kafkaTemplate.send("topic.quick.default","1231235");return"发送成功";}@KafkaListener(topics ={"topic1"})public voidonMessage(ConsumerRecord<?,?>record){System.out.println(record.value());}@KafkaListener(topics ={"topic2"})public voidgetMessage(ConsumerRecord<String,String>record){String key =record.key();String value =record.value();}
    1. 测试
    //生产者public staticvoidmain(String[]args){Properties properties =new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String,String>producer =new KafkaProducer<>(properties);String topic ="test-topic";for(inti =0;i <10;i++){String message ="Message "+i;producer.send(new ProducerRecord<>(topic,message));System.out.println("Sent: "+message);}producer.close();}//消费者public staticvoidmain(String[]args){Properties properties =new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());KafkaConsumer<String,String>consumer =new KafkaConsumer<>(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList("test-topic"));//循环while(true){//每次拉取 1千条消息ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,String>record :records){System.out.println("=============> 消费kafka消息:"+record.value());}}}

    4.配置文件

    server:port:8080spring:kafka:bootstrap-servers:localhost :9093producer:# 生产者            retries:3# 设置大于 0的值,则客户端会将发送失败的记录重新发送            batch-size:16384buffer-memory:33554432acks:1# 指定消息key和消息体的编解码方式            key-serializer:org.apache.kafka.common.serialization.StringSerializer            value-serializer:org.apache.kafka.common.serialization.StringSerializer        consumer:group-id:default-group            enable-auto-commit:false            auto-offset-reset:earliest            key-deserializer:org.apache.kafka.common.serialization.StringDeserializer            value-deserializer:org.apache.kafka.common.serialization.StringDeserializer            max-poll-records:500listener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交        #RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交        #BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交        #TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交        #COUNT#TIME |COUNT 有一个条件满足时提交#COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,手动调用Acknowledgment.acknowledge()后提交        #MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种        #MANUAL_IMMEDIATEack-mode:MANUAL_IMMEDIATE

    可视化工具地址

    https://www.kafkatool.com/download.html