发布时间:2025-06-24 19:45:21  作者:北方职教升学中心  阅读量:075


  • 发送消息(可选键)
    • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
    • 架构:
      在这里插入图片描述

      分区(Partitions):

      1. 消息分区:一个主题可以有一个或多个分区,每个分区是一个有序且不可变的消息序列。它提供了一系列方法来发送消息到Kafka主题,包括同步发送、
      2. send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。在分区内部,消息是有序的,在不同的分区之间,消息的顺序是不保证的,如果需要全局有序,则只能使用单个分区。

      常见消息中间件:

      1. Apache Kafka:适用于高吞吐量、可扩展的分布式消息系统;
      2. RabbitMQ:基于AMQP协议的开源消息代理软件,适用于复杂的消息路由;
      3. ActiveMQ:也是一个基于JMS的开源消息代理,支持多种跨语言的通讯协议;
      4. Amazon SQS:亚马逊提供的简单队列服务,适用于云环境中的消息队列;
      5. Apache Pulsar:是一个用于服务器到服务器的消息传递系统,具有高吞吐量、

      2.Kafka介绍

      Kafka是一个由LinkedIn公司开发的分布式流处理平台,它具有高吞吐量、

    • send(String topic, K key, V value): 发送一个键值对到指定主题。一致性、可容错等特点,被广泛用于构建实时的数据管道和流式应用程序。Kafka确保每个分区只由消费者组中的一个消费者来消费,以避免重复消费;
    • 代理(Broker):Kafka集群中的服务器,存储数据并处理客户端的请求;
    • ZooKeeper:Kafka使用ZooKeeper来协调broker,并保持集群配置的一致性。消息中间件允许系统组件之间通过发送和接收消息进行交互,而无需知道彼此的具体实现细节,从而提高了系统的可扩展性、
  • 发送消息和对象
    • send(String topic, Object message, Map<String, Object> headers): 发送一个对象到指定主题,并附带消息头。

  • 访问Kafka-Eagle Web界面: 打开浏览器,访问http://localhost:8048/,使用默认用户名admin和密码123456登录。

  • --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。
  • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。灵活性和可靠性。
  • wurstmeister/kafka: 使用的Docker镜像名字。
  • 发布/订阅消息模型的特点:

    1. 持久化:Kafka将消息持久化到磁盘,即使在系统故障的情况下也不会丢失消息;
    2. 高吞吐量:通过分区和并行处理,Kafka能够处理大量的消息;
    3. 可扩展性:可以轻松地向Kafka集群添加更多的broker,无需停机,以增加处理能力;
    4. 容错性:通过副本机制,Kafka能够在broker失败的情况下继续工作;
    5. 有序性:在单个分区内部,消息是有序的,消费者按照顺序读取消息。
    6. send(String topic, K key, V value): 发送一个键值对到指定主题。隔离性和持久性(ACID特性)。
    7. send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

    3.Kafka安装

    百度网盘链接:kafka_2.13-3.7.1.tgz

    Windows安装

    前提条件:Java环境(JDK 1.8或更高版本)

    1. 下载Kafka: 从Kafka官网下载下载Windows版本的Kafka
      在这里插入图片描述

    2. 解压Kafka安装包: 将下载的Kafka压缩包解压到一个目录下,调整一下目录,把kafka_2.13-3.7.1改成kafka,其中bin文件夹里还有个windows文件夹,是Windows使用的脚本文件

      在这里插入图片描述

    3. 启动Zookeeper服务: Kafka本身包含了Zookeeper

      启动Zookeeper之前先配置一下,在config文件夹下zookeeper.properties文件,主要修改下面两项

      # 数据存储dataDir=E:/projects/Kafka/kafka/zookeeper-data# 日志文件dataLogDir=E:/projects/Kafka/kafka/zookeeper-logs

      命令行进入/bin/windows目录下,启动,没有报错停止就是启动成功了

      zookeeper-server-start.bat ../../config/zookeeper.properties
    4. 启动Kafka服务

      启动Kafka之前也要配置一下,在config文件夹下server.properties文件

      # 集群模式下,每台Kafka服务器需要一个唯一的broker.id值broker.id=0# 日志文件log.dirs=E:/projects/Kafka/kafka/kafka-logs# zookeeper连接zookeeper.connect=localhost:2181# 端口(不用改)listeners=PLAINTEXT://:9092

      另起命令行窗口,进入/bin/windows目录下,然后运行以下命令,没有报错停止就是启动成功了

      kafka-server-start.bat ../../config/server.properties

    Linux(CentOS7)安装

    前提条件:Java环境(JDK 1.8或更高版本)

    1. 下载Kafka: 从Kafka官网下载下载,右键复制链接

      在这里插入图片描述

    2. 通过wget下载

      wgethttps://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz
    3. 解压Kafka安装包

      tar-xzfkafka_2.13-3.7.1.tgz -C/usr/localmv/usr/local/Kafka/kafka_2.13-3.7.1 /usr/local/kafka
    4. 启动Zookeeper服务: Kafka本身包含了Zookeeper

      启动Zookeeper之前先配置一下,在config文件夹下zookeeper.properties文件,主要修改下面两项

      # vim编辑zookeeper配置文件vi/usr/local/kafka/config/zookeeper.properties# i进入编辑i# esc退出编辑esc# 保存退出:qw# 数据存储dataDir=/usr/local/kafka/zookeeper-data# 日志文件dataLogDir=/usr/local/kafka/zookeeper-logs

      进入/bin目录下,启动,没有报错停止就是启动成功了

      cd/usr/local/kafka/binzookeeper-server-start.sh -daemon/usr/local/kafka/config/zookeeper.properties
    5. 启动Kafka服务

      启动Kafka之前也要配置一下,在config文件夹下server.properties文件,

      # 集群模式下,每台Kafka服务器需要一个唯一的broker.id值broker.id=0# 日志文件log.dirs=/usr/local/kafka/kafka-logs# zookeeper连接zookeeper.connect=localhost:2181# 端口(不用改)listeners=PLAINTEXT://:9092

      另起命令行窗口,进入/bin目录下,然后运行以下命令,没有报错停止就是启动成功了

      cd/usr/local/kafka/binkafka-server-start.sh -daemon/usr/local/kafka/config/zookeeper.properties

    Docker安装

    前提条件:Java环境(JDK 1.8或更高版本),确保安装了Docker

    1. 拉取ZooKeeper镜像(Kafka依赖ZooKeeper):

      dockerpull wurstmeister/zookeeper
    2. 拉取Kafka镜像: 使用以下命令拉取最新的Kafka镜像:

      dockerpull wurstmeister/kafka
    3. 启动ZooKeeper容器:容器启动起来就成功了

      dockerrun -d--namezookeeper -p2181:2181 -twurstmeister/zookeeper
    4. 启动Kafka容器:容器启动起来就成功了

      dockerrun -d--namekafka -p9092:9092 --linkzookeeper:zookeeper --envKAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --envKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 --envKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 --envKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1wurstmeister/kafka
      • --name kafka: 设置容器的名字为“kafka”。
      • send(String topic, K key, V value, long timeout, TimeUnit unit): 异步发送一个键值对到指定主题,并等待响应。

        在这里插入图片描述

    在安装过程中出现三个问题

    1. Tomcat日志输出乱码

      解决:在efak-web-3.0.1\kms\conf目录下修改logging.properties配置文件,将Tomcat日志输出编码改为GBK

      # 大概在51行,将UTF-8改为GBKjava.util.logging.ConsoleHandler.encoding = GBK
    2. MySQL时区异常

      解决:用root用户设置时区

      set global time_zone='+8:00';
    3. 表缺失,下载建表sql脚本,百度网盘链接:kafka-eagle-createTable.sql

    Linux(CentOS7)安装

    前提条件

    • Kafka已经安装并运行
    • Java环境(JDK 1.8或更高版本)
    • MySQL数据库(用于存储Kafka-Eagle的元数据)
    1. 下载Kafka-Eagle: 找到对应下载地址,右键复制链接,wget下载

      Kafka-Eagle官网

      Github地址

      wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
    2. 解压Kafka-Eagle安装包

      # 解压、主题类似于消息队列,但它可以拥有多个订阅者;
    3. 生产者(Producers):生产者是发布消息到Kafka主题的实体。移动目录等自己看着改,简洁一些就行tar -zxvf kafka-eagle-bin-3.0.1.tar.gz -d /usr/localmv /usr/local/kafka-eagle-bin-3.0.1 /usr/local/kafka-eaglecd /usr/local/kafka-eagletar -zxvf efak-web-3.0.1-bin.tar.gz
    4. 设置环境变量,在/etc/profile文件里

      # 使用vim编辑文件vi /etc/profile

      设置环境

      # 设置值,根据自己文件位置设置export JAVA_HOME=/usr/local/jdk/java-1.8.0-openjdk-1.8.0.161-3.b14.el6_9.x86_64export JRE_HOME=${JAVA_HOME}/jreexport KE_HOME=/usr/local/kafka-eagle/efak-web-3.0.1-binexport CLASSPATH=$CLASSPATH:.:${JAVA_HOME}/lib:${JAVA_HOME}/jre/libexport PATH=${KE_HOME}/bin:${JAVA_HOME}/bin:${JAVA_HOME}/jre/bin:$PATH

      使其生效

      # 退出,执行使其生效source /etc/profile
    5. 创建数据库(MySQL)ke,启动项目会自动创建表

    6. 配置Kafka-Eagle: 编辑\usr\local\kafka-eagle\efak-web-3.0.1\conf\system-config.properties文件,配置Kafka和数据库信息:

      # 配置Zookeeper(5行附近)efak.zk.cluster.alias=cluster1cluster1.zk.list=localhost:2181#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181# 配置JXM地址(59行附近)cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi# 配置MySQL数据库(124行附近)efak.driver=com.mysql.cj.jdbc.Driverefak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNullefak.username=rootefak.password=123456# 配置sqlite数据库(116行附近),和MySQL二选一# efak.driver=org.sqlite.JDBC# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db# efak.username=root# efak.password=www.kafka-eagle.org
    7. 运行以下命令

      cd /usr/local/kafka-eagle/efak-web-3.0.1./bin/ke.sh start
    8. 访问Kafka-Eagle Web界面: 打开浏览器,访问http://xxxip:8048/,使用默认用户名admin和密码123456登录。

    9. 发送消息和对象(可选键)
      • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。

    4.可视化工具kafka-eagle的安装

    百度网盘链接:kafka-eagle-bin-3.0.1.tar.gz

    Windows安装

    大部分借鉴大佬的安装,做个记录方便以后查看,原文地址

    前提条件

    • Kafka已经安装并运行
    • Java环境(JDK 1.8或更高版本)
    • MySQL数据库(用于存储Kafka-Eagle的元数据)
    1. 下载Kafka-Eagle

      Kafka-Eagle官网

      Github地址

    2. 解压Kafka-Eagle安装包: 将下载的Kafka-Eagle压缩包解压到一个目录下,可以修改一下目录

    在这里插入图片描述

    1. 配置两个环境变量:JDK(JAVA_HOME)和Kafka-Eagle(KE_HOME),以Kafka-Eagle为例:

      在这里插入图片描述

      在这里插入图片描述

    2. 创建数据库(MySQL)ke,启动项目会自动创建表

    3. 为了能监控Kafka数据,Kafka需要开启JMX,对外暴露更多数据,方便某些监控之类的插件来使用,修改\kafka\bin\windows目录下的kafka-server-start.bat脚本,然后重启Kafka

      # 在35行下面另起一行添加如下setJMX_PORT=9999
    4. 配置Kafka-Eagle: 编辑E:\projects\Kafka\kafka-eagle\efak-web-3.0.1\conf\system-config.properties文件,配置Kafka和数据库信息:

      # 配置Zookeeper(5行附近)efak.zk.cluster.alias=cluster1cluster1.zk.list=localhost:2181#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181# 配置JXM地址(59行附近)cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi# 配置MySQL数据库(124行附近)efak.driver=com.mysql.cj.jdbc.Driverefak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNullefak.username=rootefak.password=123456# 配置sqlite数据库(116行附近),和MySQL二选一efak.driver=org.sqlite.JDBCefak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.dbefak.username=rootefak.password=www.kafka-eagle.org
    5. 进入bin目录,运行以下命令

      ke.bat start
    6. 访问Kafka-Eagle Web界面: 打开浏览器,访问http://localhost:8048/,使用默认用户名admin和密码123456登录。可扩展性强、在Kafka中,消费者可以是属于消费者组的一部分;

    7. 消费者组(Consumer Groups):消费者组是一组消费者的集合,它们共同消费一个主题的消息,但每个消费者只能消费到消息的一个子集。生产者可以选择将消息发送到主题内的任何一个分区;
    8. 消费者(Consumers):消费者是订阅主题并读取消息的实体。Kafka的消息模型主要基于发布/订阅(Publish/Subscribe)模式,但也包含了点对点(Point-to-Point)模型的某些特性。
    9. 分区副本:为了提高可用性,Kafka允许为每个分区创建多个副本,这些副本分布在不同的broker上。异步发送、
    10. --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。
    11. 异步发送
      • send(String topic, Object message, long timeout, TimeUnit unit): 异步发送一个对象到指定主题,并等待响应。
      • --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。不可变的ID,称为偏移量(Offset)。分区内的每条消息都会被分配一个顺序的、

    Docker安装

    1. 拉取Kafka-Eagle镜像: 使用以下命令拉取Kafka-Eagle镜像:

      docker pull soulstone/kafka-eagle
    2. 启动Kafka-Eagle容器

      docker run -d --name kafka-eagle -p 8048:8048 \-e ZK_HOSTS="zookeeper:2181" \-e KAFKA_EAGLE_URL="jdbc:mysql://localhost:3306/ke" \-e KAFKA_EAGLE_USER="root" \-e KAFKA_EAGLE_PASSWORD="123456" soulstone/kafka-eagle

      请确保将MySQL的URL、

      关键特性:

      1. 异步通信:消息发送者不需要等待接收者的即时响应,可以继续执行其他任务;
      2. 解耦:消息中间件降低了服务之间的耦合度,使得各个服务可以独立开发和部署;
      3. 可靠传输:消息中间件通常提供消息持久化功能,确保消息不会因系统故障而丢失;
      4. 负载均衡:消息中间件可以平衡不同服务之间的负载,避免单个服务的过载;
      5. 消息排序:保证消息按照特定的顺序进行处理;
      6. 事务管理:支持事务性消息,确保消息处理的原子性、

      消息模型:

      1. 点对点(Point-to-Point):消息从一个发送者发送到一个接收者;
      2. 发布/订阅(Publish/Subscribe):消息从一个发送者发送到多个接收者;
      3. 请求/回复(Request/Reply):发送者发送消息并等待接收者的响应
      4. -p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。以下是一些主要的KafkaTemplateAPI方法:

        1. 同步发送
          • send(String topic, Object message): 发送一个对象到指定主题。
        2. 发送消息(键)
          • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
        3. 发送消息
          • send(String topic, Object message, String key): 发送一个对象到指定主题,并指定消息的键。
          • --link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。低延迟的特点。

            1.消息中间件简介

            消息中间件(Message Middleware)是一种在分布式系统中用于解耦不同服务或组件的软件,它通过异步消息传递的方式来实现服务之间的通信

          • send(String topic, K key, V value): 发送一个键值对到指定主题。副本分为领导者副本(Leader)和跟随者副本(Follower),生产者和消费者只与领导者副本交互,跟随着副本只是被动跟随。
          • send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。

        5.Kafka消息队列的使用

        SpringBoot整合Demo

        1. 创建两个Maven项目:一个是生产者KafkaProducerDemo,用于发布消息,另一个是消费者KafkaConsumerDemo,用于接受消息,两项目目录结构如下:

          在这里插入图片描述

          在这里插入图片描述

        2. 引入依赖:两个项目都在pom.xml中添加Spring Kafka的依赖,整个pom.xml如下

          <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.thkl</groupId>    <artifactId>KafkaDemo</artifactId>    <packaging>jar</packaging>    <version>1.0-SNAPSHOT</version>    <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>2.3.5.RELEASE</version>    </parent>    <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-web</artifactId>      <version>2.3.5.RELEASE</version>    </dependency>    <dependency>      <groupId>org.springframework.kafka</groupId>      <artifactId>spring-kafka</artifactId>      <version>2.8.3</version>    </dependency>    <dependency>      <groupId>org.projectlombok</groupId>      <artifactId>lombok</artifactId>      <version>1.18.10</version>    </dependency>    </dependencies>    <build>    </build></project>
        3. 配置Kafka:两个项目都在application.yml中配置Kafka的属性,注意端口号别一样

          server:  # 端口号  port: 8080spring:  kafka:    # Kafka服务器的地址和端口    bootstrap-servers: localhost:9092    consumer:      # 消费者组的ID      group-id: thkl-group
        4. 在KafkaProducerDemo中创建Kafka生产者:

          @Service// 可以通过lombok的@AllArgsConstructor注解自动用构造函数注入Beans@AllArgsConstructorpublicclassKafkaProducerService{privateKafkaTemplatekafkaTemplate;publicvoidsendMessage(Stringmessage){kafkaTemplate.send("thkl-topic",message);}}
          @RestController@AllArgsConstructorpublicclassKafkaProducerController{privateKafkaProducerServicekafkaProducerService;// 发送消息接口@GetMapping("/send")publicStringsendMessage(Stringmessage){kafkaProducerService.sendMessage(message);return"Message sent successfully";}}
        5. 在KafkaConsumerDemo中创建Kafka消费者:

          @Servicepublic class KafkaConsumerService {    // 使用@KafkaListener注解来创建Kafka消费者    @KafkaListener(topics = "thkl-topic", groupId = "thkl-group")    public void receiveMessage(String message) {        System.out.println("Received message: " + message);    }}
        6. 启动类:两项目都添加

          @SpringBootApplicationpublic class KafkaApplication {    public static void main(String[] args) {        SpringApplication.run(KafkaApplication.class, args);    }}
        7. 启动两个项目,端口号不可以一样,不然启动不起来

          KafkaProducerDemo发布消息

          在这里插入图片描述

          就可以在KafkaConsumerDemo接收到消息了

          在这里插入图片描述

        KafkaTemplate 主要API

        KafkaTemplate是Spring Framework提供的一个用于简化Kafka消息发送的抽象类。

      5. send(String topic, K key, V value, Map<String, Object> headers): 发送一个键值对到指定主题,并附带消息头。
      6. send(String topic, K key, V value): 发送一个键值对到指定主题。
      7. 发送消息(可选键和消息头)
        • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。

      常见使用场景:

      1. 应用解耦:在不同的服务或应用之间传递消息,降低它们之间的直接依赖;
      2. 事件驱动架构:在事件驱动的系统中,消息中间件作为事件总线,传递事件消息;
      3. 分布式系统:在分布式环境中,消息中间件用于服务间的通信;
      4. 大数据处理:在数据分析和处理系统中,消息中间件用于收集和分发大量数据;
      5. 微服务架构:微服务之间通过消息中间件进行通信,实现服务的独立性和动态扩展。
      6. 发送消息(消息头)
        • send(String topic, Object message, Map<String, Object> headers): 发送一个对象到指定主题,并附带消息头。用户名和密码替换为你自己的数据库信息。
      7. 发送消息(键和消息头)
        • send(String topic, Object message, String key, Map<String, Object> headers): 发送一个对象到指定主题,并指定消息的键和消息头。

      注:这些方法的具体实现可能会根据Kafka客户端版本和Spring Kafka版本有所不同。

    3. --env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。可持久化、
    4. 发送对象
      • send(String topic, Object message): 发送一个对象到指定主题。

        发布/订阅模型:

        1. 主题(Topics):在Kafka中,消息被发布到称为“主题”的类别中。发送消息和对象等。