EN
/video/26558734.html

Kafka 简介、应用场景及集群搭建与测试

2025-06-24 12:46:05 来源: 新华社
字号:默认 超大 | 打印 |

一、Kafka 简介

Apache Kafka 是一款分布式发布 - 订阅消息系统,由 LinkedIn 公司在 2010 年贡献给 Apache 基金会,并成为顶级开源项目。它有着独特的定位与特点,是一种快速、可扩展,且内在设计就是分布式、分区的以及可复制的提交日志服务。需要注意的是,Kafka 并未遵循 JMS 规范,仅提供发布和订阅这一通讯方式。其官方中文网站为 http://kafka.apachecn.org/quickstart.html,方便大家进一步了解相关信息。

(一)Kafka 的优点

高吞吐量、低延迟:Kafka 具备强大的消息处理能力,每秒能够处理几十万条消息,并且其延迟极低,最低可达到几毫秒。每个 topic(主题)可以划分成多个 partition(分区),consumer group(消费者组)能够对 partition 进行 consume(消费)操作,以此来实现高效的数据流转。
可扩展性:Kafka 集群支持热扩展,这意味着在不影响现有业务运行的情况下,可以方便地增加节点,以应对不断增长的数据量和业务需求。
持久性、可靠性:消息会被持久化存储到本地磁盘,同时还支持数据备份,从而有效防止数据丢失,保障了数据的安全性和完整性,为各类对数据可靠性要求较高的应用场景提供了有力支撑。
容错性:允许集群中节点出现故障,若副本数量设定为 n,则可以允许 n - 1 个节点失败,这种容错机制使得 Kafka 集群在面对部分节点异常时,依然能够稳定地提供服务。
高并发:能够支持数千个客户端同时进行读写操作,很好地满足了大规模并发业务场景下的数据交互需求。

(二)Kafka 的存储策略

以 topic 管理消息:Kafka 以 topic 来进行消息管理,每个 topic 包含多个 partition,每个 partition 对应一个逻辑 log,该逻辑 log 由多个 segment 组成,以此构建起层次分明的消息存储结构。
消息定位机制:每个 segment 中存储多条消息,消息的 id 由其逻辑位置决定,通过消息 id 可直接定位到消息的存储位置,避免了额外的 id 到位置的映射操作,提高了消息查找和读取的效率。
内存索引机制:每个 part 在内存中对应一个 index,用于记录每个 segment 中的第一条消息偏移,方便在处理消息时快速定位到相应 segment 的起始位置。
消息分布与写入规则:发布者发到某个 topic 的消息会被均匀地分布到多个 partition 上(也可根据用户指定的路由规则进行分布)。broker 收到发布消息后,会往对应 partition 的最后一个 segment 上添加该消息。当某个 segment 上的消息条数达到配置值,或者消息发布时间超过规定值时,segment 上的消息会被 flush(刷新)到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到。而且当 segment 达到一定的大小后,将不会再往该 segment 写数据,broker 会创建新的 segment 继续存储消息。

(三)Kafka 的相关名词解释

Broker:即 Kafka 节点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群,共同承担消息的处理和存储等任务。
Topic:代表一类消息,可理解为消息存放的目录,也就是主题。例如 page view 日志、click 日志等都能以 topic 的形式存在,Kafka 集群能够同时负责多个 topic 的分发工作。
Message:是 Kafka 中最基本的传递对象,承载着实际需要传递的数据内容。
Partition:为 topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列,有助于提高消息处理的并行度和效率。
Segment:partition 物理上由多个 segment 组成,每个 Segment 存储着 message 信息,是消息存储的具体单元。
Producer:指生产者,负责生产 message 并发送到 topic,是消息流入 Kafka 系统的源头。
Consumer:即消费者,通过订阅 topic 来消费 message,在实际运行中可以作为一个线程来执行消费操作。
Consumer Group:消费者组,一个 Consumer Group 包含多个 consumer,组内的消费者可以协同工作,共同消费 topic 中的消息,实现负载均衡和消息的灵活分配。
Offset:偏移量,可简单理解为消息 partition 中的索引,用于记录消费者消费消息的位置,方便实现消息的顺序消费以及断点续传等功能。

二、Zookeeper 集群环境搭建

在这里插入图片描述

(一)安装 JDK 1.8 环境

在每台服务器节点上安装 JDK 1.8 环境,安装完成后使用 java -version 命令进行测试,确保 JDK 环境安装正确且可正常使用。

(二)安装 Zookeeper

下载并安装 Zookeeper 安装包
通过如下命令下载 Zookeeper 安装包(以 Zookeeper 3.9.3 版本为例):

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.9.3/apache-zookeeper-3.9.3-bin.tar.gz

解压安装包
使用以下命令解压下载好的安装包:

tar -zxvf apache-zookeeper-3.9.3.tar.gz

重命名:
将解压后的文件夹重命名为 zookeeper,方便后续操作和管理,命令如下:

mv apache-zookeeper-3.9.3 zookeeper

注意:Zookeeper 从 3.5 版本以后,命名规则有所改变。形如 apache-zookeeper-3.5.5.tar.gz 的是未编译的包,而 apache-zookeeper-3.5.5-bin.tar.gz 这样命名的才是已编译的包。

(三)搭建 Zookeeper 集群环境

修改配置文件:
首先进入配置文件所在目录,然后将示例配置文件 zoo_sample.cfg 重命名为 zoo.cfg,并对其进行修改。具体操作如下:

cd /usr/local/zookeeper/confmv zoo_sample.cfg zoo.cfgvi zoo.cfg

修改内容主要有两处:

  • 设置 dataDir 参数,指定 Zookeeper 存储数据的目录,例如:dataDir=/usr/local/zookeeper/data(同时需要在 Zookeeper 中创建 data 目录)。
  • 在文件最后面添加集群节点信息,格式如下:
server.0=192.168.2.208:2888:3888server.1=192.168.2.216:2888:3888server.2=192.168.2.202:2888:3888

创建服务器标识:

  • 创建用于存放服务器标识的文件夹 data:
mkdir data
  • 创建文件 myid 并填写相应内容(内容为服务器标识,如当前节点为 0,则填写 0):
vi myid

复制 Zookeeper:
将配置好的 zookeeper 目录复制到其他相关服务器节点(如 192.168.2.216 和 192.168.2.202),同时复制 /etc/profile 文件。并且要将 192.168.2.216、192.168.2.202 中的 myid 文件里的值分别修改为 1 和 2(修改路径为 vi /usr/local/zookeeper/data/myid)。
关闭防火墙
在每台服务器节点上关闭防火墙,以避免防火墙规则对 Zookeeper 集群通信造成干扰,执行命令如下:

systemctl stop firewalld.service

添加环境变量
在系统环境中添加 Zookeeper 的环境变量,编辑 /etc/profile 文件,添加如下内容:

export ZOOKEEPER_HOME=/usr/local/zookeeperexport PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

启动 Zookeeper:
进入 Zookeeper 的启动脚本所在路径 /usr/local/zookeeper/bin,执行启动命令:

zkServer.sh start

需要注意的是,集群中的每台机器都要执行此启动操作。启动后,可以通过 zkServer.sh status 命令在三个节点上检验 Zookeeper 的运行模式(会有一个 leader 和两个 follower)。

三、Kafka 集群环境搭建

(一)下载与解压 Kafka 压缩包

在 3 台虚拟机上均执行以下操作,先进入 /usr/local/ 目录,下载 Kafka 压缩包(以 Kafka 3.9.0 版本为例),然后解压并重命名:

cd /usr/local/wget https://downloads.apache.org/kafka/3.9.0/kafka_2.12-3.9.0.tgztar -zxvf kafka_2.12-3.9.0.tgzmv kafka_2.12-3.9.0 kafka

(二)修改配置文件

分别在不同的虚拟机上,根据其自身的 IP 地址对 ./kafka/config/server.properties 文件进行修改,具体如下:
对于 192.168.2.208 这台虚拟机

broker.id=0listeners=PLAINTEXT://192.168.2.208:9092zookeeper.connect=192.168.2.208:2181,192.168.2.216:2181,192.168.2.202:2181

对于 192.168.2.216 这台虚拟机:

broker.id=1listeners=PLAINTEXT://192.168.2.216:9092zookeeper.connect=192.168.2.208:2181,192.168.2.216:2181,192.168.2.202:2181

对于 192.168.2.202 这台虚拟机:

broker.id=2listeners=PLAINTEXT://192.168.2.202:9092zookeeper.connect=192.168.2.208:2181,192.168.2.216:2181,192.168.2.202:2181

(三)配置系统环境变量

编辑 /etc/profile 文件,在文件最下方添加 Kafka 路径相关的环境变量,内容如下:

export KAFKA_HOME=/usr/local/kafkaexport PATH=${ KAFKA_HOME}/bin:$PATH

在这里插入图片描述

添加完成后,执行 source /etc/profile 命令使修改后的环境变量生效。

四、Kafka 集群环境测试

(一)启动 Zookeeper 程序并查看状态

在 3 台虚拟机上分别开启 Zookeeper 程序:

/usr/local/zookeeper/bin/zkServer.sh start

启动成功后,通过以下命令查看 Zookeeper 集群的状态:

/usr/local/zookeeper/bin/zkServer.sh status

若出现 Mode:follower或是 Mode:leader,则代表 Zookeeper 启动成功。

(二)启动 Kafka 程序并检查启动情况

在后台开启 3 台虚拟机的 Kafka 程序(先进入 /usr/local/kafka 目录):

./bin/kafka-server-start.sh -daemon config/server.properties

-daemon 参数表示 Kafka 服务器将在后台运行,不会占用当前的终端会话。启动后,可以通过 ps aux | grep ‘kafka’ 命令查看 Kafka 是否启动成功。

(三)创建 topic 并进行消息发送与消费测试

在这里插入图片描述

创建 topic:
在其中一台虚拟机(如 192.168.2.208)上创建 topic,执行命令如下:

/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.2.208:9092 --replication-factor 1 --partitions 3 --topic test

各参数含义如下:

  • –create:指定正在创建一个新的主题。
  • –bootstrap-server:9092:指定 kafka 实例的地址。
  • –replication-factor 1:每个分区的复制因子为 1,即每个分区将有一个副本。
  • -partitions:指定 topic 分区主体一共有多少个。
  • –topic:指定 topic 主体名称。
    查看 topic 信息:
    通过以下命令查看创建的 topic 信息:
./bin/kafka-topics.sh --list --bootstrap-server 192.168.2.208:9092

在这里插入图片描述

发送消息:
在 192.168.2.208 这台虚拟机上发送一些消息到创建的 topic 中,执行命令如下:

bin/kafka-console-producer.sh --broker-list 192.168.2.208:9092 --topic test

在这里插入图片描述

消费消息:
在另一台虚拟机(如 192.168.2.216)上启动一个 consumer(消费者)来消费消息,执行命令如下:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.216:9092 --topic test --from-beginning

在这里插入图片描述

通过以上一系列的操作,完成了从 Zookeeper 集群环境搭建、Kafka 集群环境搭建到最终的集群环境测试,能够验证 Kafka 集群是否可以正常工作,实现消息的生产与消费等功能。

【我要纠错】责任编辑:新华社