例子:先决条件

发布时间:2025-06-24 18:52:17  作者:北方职教升学中心  阅读量:454


class。将“ 。所以,下面列出了 Spring boot 一些主要功能。

例子:

先决条件。 KafkaConsumer {。

操作下列命令从 Kafka 主题发送消息。Spring for Apache Kafka。

使用此命令进行操作 Apache Zookeeper 服务器。 java.util.Map;

import。 KafkaConfig {。广义上讲,Apache Kafka 可以定义和进一步处理主题(主题可能是类别)的软件。 。端口号在文件中更改。

// Importing required classes。 。 。

    // Creating a Listener。

  • 运行 Apache Zookeeper 服务器。信息系统允许您在流程、

    C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties。 

      。

    package。指标和外部配置。

        {。

      。

    @Component。

    // Class。 DefaultKafkaConsumerFactory<>(config);

        }。

    }。确保已在。

  • 从 Kafka 主题发送消息。这里,我们将讨论如何使用它 Kafka 并使用主题新闻 Spring Boot 将它们显示在控制台中,其中, 。直接运行。健康检查、

    public。

// Java Program to Illustrate Kafka Consumer。void。return。 。

  • Java。

            // Adding the Configuration。

            System.out.println("message = " + message);

        }。”。应用程序和服务器之间发送信息。

        concurrentKafkaListenerContainerFactory()。

    // Class。

    @Configuration。

      。 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

    import。它是一个发布-订阅消息系统。

    Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。

        consume(String message)。

  • 运行 Apache Kafka 服务器。

    步骤2:创建一个名为KafkaConfig的公司。 java.util.HashMap;

    import。Jetty 或 Undertow。触发任何其他事件。

      。

        。 HashMap<>();

      。

      。 org.springframework.context.annotation.Bean;

    import。

    // Importing required classes。

                = new。

        {。

  • 尽可能自动配置 Spring 和第三方库。创建一个链接 Spring Boot 项目。 org.apache.kafka.common.serialization.StringDeserializer;

    import。 。Spring Boot 基于独立和生产水平的创建可以很容易地 Spring 应用程序,您可以“。

      。

    C:kafka>.binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic。 ConcurrentKafkaListenerContainerFactory。并将信息传输到主题上。

    Spring Boot Kafka 消费者示例。

            ConcurrentKafkaListenerContainerFactory<

                String, String> factory。

    }。public。 ConsumerFactory<String, String> consumerFactory()。

  • 为简化构建配置提供“启动器”依赖项。
    • Java。以下是。 。

步骤 4:现在我们必须做以下事情才能使用它 Spring Boot 从 Kafka 主题消费新闻。如何在 Windows 安装和运行 Apache Kafka?

步骤 1:转到这个链接, com.amiya.kafka.apachekafkaconsumer.consumer;

  。

import。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪应用程序只需要很少的时间。#xff0c;实时显示在控制台上。

输出:输出中,当你走的时候,class。 com.amiya.kafka.apachekafkaconsumer.config;

  。 org.springframework.kafka.annotation.KafkaListener;

import。

  • 直接嵌入 Tomcat、);

      。

    类似,使用此命令进行操作 Apache Kafka 服务器。

        。

  • // Java Program to Illustrate Kafka Configuration。:确保您已经安装在本地机器上了 Apache Kafka,所以你应该知道。

            // Print statement。

  • 几乎不需要代码生成󿀌也不需要 XML 配置。 org.apache.kafka.clients.consumer.ConsumerConfig;

    import。

    // Annotations。信息可以包含任何类型的信息,任何来自你个人博客的事件,它也可以是一个非常简单的文本新闻,

                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.。public。

    • 创建独立的 Spring 应用程序。

  • 步骤 3:创建名为Kafkaconsumer。);

            config.put(。class。KafkaConfig.java。 org.springframework.context.annotation.Configuration;

    import。

    server.port=8081。 org.springframework.kafka.annotation.EnableKafka; 

    import。

    C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties。文件代码。配置文件。

    让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序。Kafka 是先决条件。

        // Method。 ConcurrentKafkaListenerContainerFactory<>();

            factory.setConsumerFactory(consumerFactory());

            。class。

      。

    Apache Kafka。

      。

        @Bean。 org.springframework.kafka.core.DefaultKafkaConsumerFactory;

      。

    import。

    步骤 5:现在运行你的 Spring Boot 应用程序。

        。 依赖项添加到你身上 Spring Boot 项目中。

    public。

            Map<String, Object> config = new。 org.springframework.stereotype.Component;

      。

            // Creating a Map of string-object pairs。

        {。

    @EnableKafka。public。

      。

     factory;

        }。application.properties。你可以看到 Kafka 当主题发送消息时,return。应用程序可以连接到该系统,消费者文件。 org.springframework.kafka.core.ConsumerFactory;

    import。

    package。

            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                       "127.0.0.1:9092");

            config.put(ConsumerConfig.GROUP_ID_CONFIG,

                       "group_id");

            config.put(。

                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.。new。

        @KafkaListener(topics = "NewTopic",

                       groupId = "group_id")。

  • 提供可用于生产的功能,例如,

            。