每一个错误都是学习的机会

发布时间:2025-06-24 17:06:51  作者:北方职教升学中心  阅读量:425


publicclassWordCountApplication{publicstaticvoidmain(finalString[]args){Propertiesprops =newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());StreamsBuilderbuilder =newStreamsBuilder();KStream<String,String>textLines =builder.stream("TextLinesTopic");KTable<String,Long>wordCounts =textLines            .flatMapValues(textLine ->Arrays.asList(textLine.toLowerCase().split("\W+"))).groupBy((key,word)->word).count(Materialized.<String,Long,KeyValueStore<Bytes,byte[]>>as("counts-store"));wordCounts.toStream().to("WordsWithCountsTopic",Produced.with(Serdes.String(),Serdes.Long()));KafkaStreamsstreams =newKafkaStreams(builder.build(),props);streams.start();}}

Kafka的实际应用场景

image.png

  1. 日志聚合: 收集分布式系统中的日志,集中处理和分析。

  2. 实现自定义组件:尝试实现自定义的分区器、

  3. 性能测试: 使用Kafka自带的性能测试工具,了解不同配置对性能的影响。维护用户兴趣模型、生成推荐等功能。ZooKeeper集群等。KSQL等Kafka生态系统中的其他组件。每学一个新概念,就尝试在实际环境中应用它。保持好奇心,通过实践来理解它们。

  4. 模拟生产环境:在本地搭建一个模拟生产环境的Kafka集群,包括多个broker、

  5. Broker: Kafka集群中的服务器。”

    image.png

    于是,我开始了我的"糙快猛"学习之旅。

"糙快猛"学习Kafka的进阶之路

image.png

  1. 深入源码: 不要害怕阅读Kafka的源码。
  2. // 生产者配置props.put("acks","all");// 消费者配置props.put("enable.auto.commit","false");consumer.commitSync();// 在处理完消息后手动提交// Broker配置min.insync.replicas=2

    2. 消费者重平衡问题

    问题:消费者组重平衡可能导致短暂的服务中断。

    # 创建多个server.properties文件,修改broker.id和listenerscpconfig/server.properties config/server-1.propertiescpconfig/server.properties config/server-2.properties# 启动多个Brokerbin/kafka-server-start.sh config/server-1.properties &bin/kafka-server-start.sh config/server-2.properties &

    2. 实现一个复杂一点的生产者-消费者系统

    尝试实现一个模拟实时日志处理的系统。每一个错误都是学习的机会。

  3. Partition: Topic物理上的分组,一个Topic可以包含多个Partition。它从"user-behavior"主题读取用户行为数据,实时更新用户画像,然后基于最新的用户画像生成推荐,并将结果写入"user-recommendations"主题。但记住,不要一开始就陷入细节,而是要在使用中逐步理解它们。

    深入学习的"糙快猛"方法

    image.png

    1. 构建一个多Broker的Kafka集群

    不要满足于单节点的Kafka,尝试搭建一个多Broker的集群。分流、

  4. 边学边做: 学习理论的同时,不断实践。

  5. 拥抱错误: 不要害怕犯错。

    "糙快猛"学习Kafka的注意事项

    1. 保持好奇心: 遇到不理解的概念时,不要害怕。现在,让我们将视野扩大,看看Kafka如何在大规模分布式系统中发挥作用,以及如何与其他大数据技术协同工作。

      publicclassDataPipeline{publicstaticvoidmain(String[]args){StreamsBuilderbuilder =newStreamsBuilder();KStream<String,String>source =builder.stream("source-topic");KStream<String,String>transformed =source.mapValues(value ->{// 进行数据转换returntransformedValue;});transformed.to("destination-topic");KafkaStreamsstreams =newKafkaStreams(builder.build(),getProperties());streams.start();}}

      Kafka与其他大数据技术的集成

      1. Kafka + Hadoop

      Kafka可以与Hadoop生态系统无缝集成,实现实时数据采集和批处理分析。先把 Kafka 跑起来,再逐步深入学习。

    Kafka进阶:高级特性与生产实践

    在前两章中,我们讨论了如何以"糙快猛"的方式开始学习Kafka,并深入探讨了一些核心概念和应用场景。

    在技术快速发展的今天,Kafka也在不断进化。

    grep-ierror /path/to/kafka/logs/server.log

    实战案例:构建实时推荐系统

    image.png

    让我们通过一个实际的案例来综合运用我们所学的知识。

  6. 与云原生技术的深度集成:更好地支持Kubernetes等云原生环境。

    publicclassRecommendationSystem{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"recommendation-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");StreamsBuilderbuilder =newStreamsBuilder();// 用户行为流KStream<String,String>userBehavior =builder.stream("user-behavior-topic");// 商品信息表KTable<String,String>productInfo =builder.table("product-info-topic");// 处理用户行为,更新用户兴趣模型KTable<String,UserInterest>userInterests =userBehavior            .groupByKey().aggregate(UserInterest::new,(key,value,aggregate)->aggregate.update(value),Materialized.as("user-interests-store"));// 基于用户兴趣和商品信息生成推荐KStream<String,String>recommendations =userInterests            .toStream().flatMapValues(value ->generateRecommendations(value,productInfo));// 将推荐结果写入输出主题recommendations.to("user-recommendations-topic");KafkaStreamsstreams =newKafkaStreams(builder.build(),props);streams.start();}privatestaticList<String>generateRecommendations(UserInterestuserInterest,KTable<String,String>productInfo){// 基于用户兴趣和商品信息生成推荐列表}}

    这个案例展示了如何使用Kafka Streams API构建一个实时推荐系统,包括处理用户行为、这让我更深入地理解了 Kafka 的 API。不要害羞,提出你的问题,分享你的经验。

    Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092");props.put("transactional.id","my-transactional-id");Producer<String,String>producer =newKafkaProducer<>(props);producer.initTransactions();try{producer.beginTransaction();for(inti =0;i <100;i++)producer.send(newProducerRecord<>("my-topic",Integer.toString(i),Integer.toString(i)));producer.commitTransaction();}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationExceptione){producer.close();}catch(KafkaExceptione){producer.abortTransaction();}producer.close();

    2. 压缩(Compaction)

    Kafka的日志压缩特性允许Kafka仅保留每个key的最新值,这在需要维护状态的场景中非常有用。处理到存储和分析。

  7. 模拟大规模场景: 在你的开发环境中模拟大规模数据处理场景,了解系统在高负载下的表现。

  8. 对于消费者,禁用自动提交位移,手动控制位移提交。
  9. Producer: 消息生产者,向Topic发送消息。
  10. Consumer: 消息消费者,从Topic读取消息。Kafka 可以处理企业中所有的实时数据馈送。但记住,大模型是辅助工具,不是替代品。

    bin/kafka-topics.sh --create--bootstrap-server localhost:9092 --replication-factor 1--partitions1--topiccompacted-topic --configcleanup.policy=compact

    3. 安全特性

    在生产环境中,安全性是非常重要的。

  11. 使用自定义分区器。问题解决与未来展望

    在之前的内容中,我们已经深入探讨了Kafka的核心概念、假设我们要为一个电商平台构建一个实时推荐系统。

Kafka实战案例、Elasticsearch等)集成,了解不同场景下的最佳实践。

exportJMX_PORT=9999bin/kafka-server-start.sh config/server.properties

然后可以使用JConsole或其他JMX客户端来查看这些指标。处理到存储和分析,全面使用Kafka生态系统。

理解这些概念对于掌握Kafka至关重要。记得我刚开始接触 Kafka 时,就像是站在一座高山脚下,不知从何下手。

publicclassKafkaHadoopIntegration{publicstaticvoidmain(String[]args)throwsException{Configurationconf =newConfiguration();Jobjob =Job.getInstance(conf,"kafka to hadoop");job.setInputFormatClass(KafkaInputFormat.class);KafkaInputFormat.addInputPath(job,newPath("kafka://localhost:9092/my-topic"));job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,newPath("/output"));job.setMapperClass(KafkaMapper.class);job.setReducerClass(KafkaReducer.class);System.exit(job.waitForCompletion(true)?0:1);}}

2. Kafka + Spark Streaming

Spark Streaming可以直接从Kafka读取数据,实现实时流处理。这会让你更好地理解Kafka的分布式特性。

publicclassSimpleProducer{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String,String>producer =newKafkaProducer<>(props);producer.send(newProducerRecord<>("quickstart-events","Hello, Kafka!"));producer.close();}}

深入Kafka:从入门到实战

Kafka的核心概念

image.png

在深入学习之前,我们需要先理解Kafka的几个核心概念:

  1. Topic: 消息的类别,可以理解为一个消息队列。

    importorg.apache.spark.streaming._importorg.apache.spark.streaming.kafka010._valssc =newStreamingContext(sparkContext,Seconds(1))valkafkaParams =Map[String,Object]("bootstrap.servers"->"localhost:9092","key.deserializer"->classOf[StringDeserializer],"value.deserializer"->classOf[StringDeserializer],"group.id"->"spark-streaming-consumer","auto.offset.reset"->"latest","enable.auto.commit"->(false:java.lang.Boolean))valtopics =Array("my-topic")valstream =KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParams))stream.map(record =>(record.key,record.value)).print()ssc.start()ssc.awaitTermination()

    3. Kafka + Elasticsearch

    Kafka和Elasticsearch的结合可以构建强大的实时搜索和分析系统。

    # 创建主题bin/kafka-topics.sh --create--topicquickstart-events --bootstrap-server localhost:9092# 发送消息bin/kafka-console-producer.sh --topicquickstart-events --bootstrap-server localhost:9092# 消费消息bin/kafka-console-consumer.sh --topicquickstart-events --from-beginning --bootstrap-server localhost:9092

    image.png

    第三步: 编写简单的生产者和消费者程序

    然后,我开始编写简单的 Java 程序来生产和消费消息。

    糙快猛学习法则

    image.png

    1. 不求完美,先求上手: 不要陷入完美主义的陷阱。作为一个技术人,我们需要保持开放和好奇的心态,持续学习和实践。

      publicclassKafkaElasticsearchConnector{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092");props.put("group.id","elasticsearch-consumer");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer =newKafkaConsumer<>(props);consumer.subscribe(Arrays.asList("my-topic"));RestClientrestClient =RestClient.builder(newHttpHost("localhost",9200,"http")).build();while(true){ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record :records){// 将数据写入ElasticsearchRequestrequest =newRequest("POST","/my-index/_doc");request.setJsonEntity(record.value());restClient.performRequest(request);}}}}

      高级主题与最佳实践

      1. Kafka Connect

      Kafka Connect是一个强大的工具,可以轻松地将Kafka与外部系统集成。

    2. 增强的安全特性:更细粒度的访问控制和加密功能。

      2. Kafka Manager

      LinkedIn开源的Kafka Manager是一个非常有用的Kafka集群管理工具。遇到问题时,深入研究,这往往能带来意外的收获。

    "糙快猛"实践Kafka的建议

    image.png

    1. 构建端到端的数据管道:尝试构建一个完整的数据管道,从数据采集、

      Kafka的高级特性

      1. 精确一次语义(Exactly-Once Semantics)

      Kafka 0.11版本引入了精确一次语义,这是一个重要的特性,特别是在处理金融交易等对数据准确性要求极高的场景中。

      现在,让我们通过一些实际的应用案例来看看Kafka如何解决实际问题,同时探讨一些常见问题的解决方案,并对Kafka的未来发展进行展望。

    2. Consumer Group: 消费者组,用于实现高吞吐量的消费。保持"糙快猛"的态度,勇于尝试,不怕失败。现在,让我们更进一步,探索Kafka的一些高级特性,以及在生产环境中使用Kafka的最佳实践。

      1. 用户行为数据收集(点击、

        gitclone https://github.com/yahoo/CMAK.gitcdCMAK./sbt clean dist

        3. 日志分析

        定期分析Kafka的日志文件可以帮助发现潜在的问题。

        # 启动 Zookeeperbin/zookeeper-server-start.sh config/zookeeper.properties# 启动 Kafka 服务bin/kafka-server-start.sh config/server.properties

        第二步: 生产和消费消息

        接下来,我创建了一个主题,并尝试发送和接收消息。

      2. 故障演练: 故意制造一些故障(如关闭一个broker),观察系统的行为,学习如何处理这些情况。享受这个过程,你会发现,技术的世界是如此精彩。不要和别人比较,专注于自己的进步。

        2. 实时推荐系统

        假设我们要为一个电商平台构建实时推荐系统。序列化器等组件,深入理解Kafka的工作原理。

        CREATESTREAM pageviews (viewtime BIGINT,userid VARCHAR,pageid VARCHAR)WITH(KAFKA_TOPIC='pageviews',VALUE_FORMAT='JSON');CREATETABLEpageviews_per_user ASSELECTuserid,COUNT(*)ASpageviews  FROMpageviews  GROUPBYuserid  EMIT CHANGES;

        3. 多数据中心复制

        对于跨地域的大规模部署,Kafka的多数据中心复制是一个重要特性。今天,让我们一起探讨如何以"糙快猛"的方式学习 Kafka,在这个过程中,我们会发现学习的乐趣和效率。但我很快意识到,“不要一下子追求完美,在不完美的状态下前行才是最高效的姿势。

      3. 参与社区: Kafka有一个活跃的社区。

        解决方案:

        • 对于生产者,设置 acks=all确保所有副本都收到消息。多个主题和消费者组。
        • 消息系统: 作为传统消息中间件的替代,实现系统间的解耦。Spark、生产者模拟生成日志,消费者实时处理这些日志。
        props.put("partition.assignment.strategy","org.apache.kafka.clients.consumer.CooperativeStickyAssignor");props.put("group.instance.id","consumer-1");// 静态成员ID

        3. 数据倾斜问题

        问题:某些分区的数据量明显多于其他分区,导致处理不均衡。

      4. 使用静态成员机制(Kafka 2.3及以上版本)。从一个你感兴趣的特性开始,逐步深入。

        props.put("compression.type","snappy");

        监控与运维

        image.png

        1. JMX指标

        Kafka暴露了大量的JMX指标,可以用来监控集群的健康状况。

      5. 从简单开始,逐步复杂化: 先掌握基本的生产者-消费者模型,然后逐步引入更复杂的概念和功能。它主要用于构建实时数据管道和流式应用程序。这让我对 Kafka 的基本概念有了直观的理解。购买等)
      6. 实时处理这些数据
      7. 更新用户画像
      8. 生成推荐结果
      publicclassRecommendationSystem{publicstaticvoidmain(String[]args){// 配置Kafka StreamsPropertiesprops =newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"user-recommendation-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());StreamsBuilderbuilder =newStreamsBuilder();// 从"user-behavior"主题读取用户行为数据KStream<String,String>userBehavior =builder.stream("user-behavior");// 处理用户行为数据,更新用户画像KTable<String,String>userProfiles =userBehavior            .groupByKey().aggregate(()->"",// 初始值(key,value,aggregate)->updateUserProfile(aggregate,value),Materialized.as("user-profiles-store"));// 基于用户画像生成推荐KStream<String,String>recommendations =userProfiles            .toStream().mapValues(profile ->generateRecommendations(profile));// 将推荐结果写入"user-recommendations"主题recommendations.to("user-recommendations");KafkaStreamsstreams =newKafkaStreams(builder.build(),props);streams.start();}privatestaticStringupdateUserProfile(Stringprofile,Stringbehavior){// 实现用户画像更新逻辑returnupdatedProfile;}privatestaticStringgenerateRecommendations(Stringprofile){// 实现推荐生成逻辑returnrecommendations;}}

      这个例子展示了如何使用Kafka Streams来构建一个实时推荐系统。

    3. 探索Kafka生态: 除了Kafka Core,也要了解Kafka Connect、

    4. 性能测试和调优:对你的Kafka应用进行全面的性能测试,并根据测试结果进行调优。性能调优等方面。作为一个分布式流处理平台,它以其高吞吐量、

      解决方案:

      • 实现自定义的分区分配策略。

      • 参与开源项目: 参与Kafka或其生态系统中其他项目的开发,这将极大地提升你的技能和对整个生态的理解。broker宕机等),并制定相应的恢复策略。尝试使用它来处理和转换数据流。

      • 利用大模型: 在学习过程中,可以将大模型作为24小时助教。

        Kafka在大规模分布式系统中的应用

        image.png

        1. 微服务架构中的Kafka

        在微服务架构中,Kafka常被用作服务间通信的骨干。

        最后,我想再次强调,学习的过程应该是充满乐趣的。

        publicclassLogProducer{privatefinalKafkaProducer<String,String>producer;publicLogProducer(){Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");this.producer =newKafkaProducer<>(props);}publicvoidlog(Stringmessage){ProducerRecord<String,String>record =newProducerRecord<>("logs",message);producer.send(record);}}

        3. 实时数据管道

        Kafka可以作为实时数据管道的核心,将数据从源系统实时传输到目标系统。

        我的 Kafka 学习故事

        作为一个从零基础跨行到大数据领域的开发者,我深知学习新技术的挑战。Kafka的世界是如此丰富多彩,我们在这个"糙快猛"的学习过程中所涉及的内容,只是其中的一小部分。

      • 建立自己的节奏: 每个人的学习速度不同,找到适合自己的节奏很重要。

        bin/kafka-topics.sh --create--bootstrap-server localhost:9092 --replication-factor 3--partitions9--topicmy-topic

        2. 批量处理

        使用批量发送和批量获取可以显著提高吞吐量。

      • 用户活动跟踪: 收集用户在网站或应用上的活动数据,用于分析和个性化推荐。

        publicclassLogAnalysisSystem{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logs-analysis-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());StreamsBuilderbuilder =newStreamsBuilder();KStream<String,String>logs =builder.stream("logs-topic");// 解析日志并提取重要信息KStream<String,LogEntry>parsedLogs =logs.mapValues(value ->parseLog(value));// 按错误级别分组KStream<String,LogEntry>[]branches =parsedLogs.branch((key,value)->value.getLevel().equals("ERROR"),(key,value)->value.getLevel().equals("WARN"),(key,value)->true);// 错误日志写入专门的主题branches[0].to("error-logs");// 统计每分钟的警告日志数branches[1].groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(1))).count().toStream().to("warn-logs-count");// 所有日志写入ElasticsearchparsedLogs.foreach((key,value)->writeToElasticsearch(value));KafkaStreamsstreams =newKafkaStreams(builder.build(),props);streams.start();}privatestaticLogEntryparseLog(StringlogString){// 解析日志字符串,返回LogEntry对象}privatestaticvoidwriteToElasticsearch(LogEntrylogEntry){// 将日志写入Elasticsearch}}

        这个案例展示了如何使用Kafka Streams API构建一个实时日志分析系统,包括日志解析、

      • 监控: 收集各种指标数据,用于系统监控和报警。

    结语

    通过这一系列的文章,我们已经从Kafka的基础知识一路探索到了高级特性和实际应用案例。

    解决方案:

    • 设计合适的分区键。Kafka提供了多种安全特性,包括:

      • SSL/TLS加密
      • SASL认证
      • ACL权限控制
      security.protocol=SASL_SSLsasl.mechanism=PLAINsasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";

      性能优化

      1. 合理的分区数

      分区数的选择会直接影响Kafka的性能。

      name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=my-topickey.ignore=trueconnection.url=http://localhost:9200type.name=kafka-connect

      2. KSQL

      KSQL允许您使用SQL语法处理Kafka中的流数据。没有什么比解决真实问题更能促进学习了。

    • 构建项目: 尝试在实际项目中使用Kafka。
    publicclassCustomPartitionerimplementsPartitioner{@Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){// 自定义分区逻辑}}props.put("partitioner.class","com.example.CustomPartitioner");

    Kafka未来发展趋势

    image.png

    1. Kafka KRaft模式:去除对ZooKeeper的依赖,简化部署和管理。

      props.put("batch.size",16384);props.put("linger.ms",1);

      3. 压缩

      使用压缩可以减少网络传输和存储的数据量。它可以解耦服务,提供异步通信,并帮助实现事件驱动架构。高级特性、

    2. 模拟生产环境: 在你的开发机器上模拟一个小型的生产环境,包括多个broker、

      常见问题及解决方案

      image.png

      1. 消息丢失问题

      问题:在某些情况下,可能会出现消息丢失的情况。

    3. 改进的跨数据中心复制:更好地支持地理分布式部署。Kafka Streams、

      在大数据开发的世界里,Kafka 无疑是一个不可或缺的重要角色。

      让我们继续在Kafka和大数据的海洋中探索,相信不久的将来,你就能成为那个"可把我牛逼坏了,让我叉会腰儿"的Kafka大师!Remember, the journey of a thousand miles begins with a single step. Keep coding, keep learning, and most importantly, keep pushing your limits!

    4. 适当设置 min.insync.replicas参数。问题解决与未来展望
      • Kafka实战案例
        • 1. 实时日志分析系统
        • 2. 实时推荐系统
      • 常见问题及解决方案
        • 1. 消息丢失问题
        • 2. 消费者重平衡问题
        • 3. 数据倾斜问题
      • Kafka未来发展趋势
      • "糙快猛"实践Kafka的建议
      • 结语

但对于初学者来说,Kafka 可能看起来就像是一座难以攀登的高山。

image.png

Kafka生态系统:大规模应用与技术集成

在前面的文章中,我们从入门到进阶,深入探讨了Kafka的核心概念、尝试调整各种参数,观察它们如何影响性能。每一次的实践,每一个解决的问题,都是你宝贵的经验。Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,现在已经成为 Apache 软件基金会的顶级开源项目之一。可靠性和可扩展性而闻名。同时,我们也要记住,技术是解决问题的工具,真正重要的是理解问题的本质,并找到最适合的解决方案。我没有纠结于理解每一个配置参数,而是使用默认配置快速启动了一个单节点的 Kafka 集群。

publicclassLogProducer{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");Producer<String,String>producer =newKafkaProducer<>(props);for(inti =0;i <100;i++){producer.send(newProducerRecord<>("logs","Log message "+i));}producer.close();}}
publicclassLogConsumer{publicstaticvoidmain(String[]args){Propertiesprops =newProperties();props.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");props.put("group.id","log-processing-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String>consumer =newKafkaConsumer<>(props);consumer.subscribe(Arrays.asList("logs"));while(true){ConsumerRecords<String,String>records =consumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record :records){System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value());}}}}

3. 探索Kafka Streams

Kafka Streams是一个强大的库,用于构建实时流处理应用。

第一步: 快速上手

我的第一步是迅速搭建一个 Kafka 环境。它可以帮助解答疑问,解释概念,甚至提供代码示例。高级特性和实际应用。

  • 故障演练:模拟各种故障场景(如网络分区、一般来说,分区数应该是集群中broker数量的整数倍,这样可以使负载均匀分布。随着对 Kafka 理解的深入,不断回顾和更新你的知识体系。

  • process.roles=broker,controllernode.id=1controller.quorum.voters=1@localhost:9093,2@localhost:9093,3@localhost:9093
    1. Tiered Storage:支持将数据分层存储,优化存储成本和性能。

    2. 参与开源社区: 不要只是使用Kafka,尝试为Kafka贡献代码,这将大大提升你的技能。

      目录

        • 糙快猛学习法则
        • Kafka 是什么?
        • 我的 Kafka 学习故事
          • 第一步: 快速上手
          • 第二步: 生产和消费消息
          • 第三步: 编写简单的生产者和消费者程序
      • 深入Kafka:从入门到实战
        • Kafka的核心概念
        • 深入学习的"糙快猛"方法
          • 1. 构建一个多Broker的Kafka集群
          • 2. 实现一个复杂一点的生产者-消费者系统
          • 3. 探索Kafka Streams
        • Kafka的实际应用场景
        • "糙快猛"学习Kafka的进阶之路
      • Kafka进阶:高级特性与生产实践
        • Kafka的高级特性
          • 1. 精确一次语义(Exactly-Once Semantics)
          • 2. 压缩(Compaction)
          • 3. 安全特性
        • 性能优化
          • 1. 合理的分区数
          • 2. 批量处理
          • 3. 压缩
        • 监控与运维
          • 1. JMX指标
          • 2. Kafka Manager
          • 3. 日志分析
        • 实战案例:构建实时推荐系统
        • "糙快猛"学习Kafka的注意事项
      • Kafka生态系统:大规模应用与技术集成
        • Kafka在大规模分布式系统中的应用
          • 1. 微服务架构中的Kafka
          • 2. 日志聚合与分析
          • 3. 实时数据管道
        • Kafka与其他大数据技术的集成
          • 1. Kafka + Hadoop
          • 2. Kafka + Spark Streaming
          • 3. Kafka + Elasticsearch
        • 高级主题与最佳实践
          • 1. Kafka Connect
          • 2. KSQL
          • 3. 多数据中心复制
        • "糙快猛"学习Kafka生态系统的建议
      • Kafka实战案例、
      • 流处理: 结合Kafka Streams或其他流处理框架,实现实时数据处理和分析。
      • 关注性能: Kafka以高性能著称。

        bootstrap.servers=broker1:9092,broker2:9092group.id=mirror-makerauto.offset.reset=latest# 源集群配置source.bootstrap.servers=source-broker1:9092,source-broker2:9092source.group.id=mirror-maker-source# 目标集群配置destination.bootstrap.servers=dest-broker1:9092,dest-broker2:9092destination.group.id=mirror-maker-destination# 复制的主题topics=topic1,topic2

        "糙快猛"学习Kafka生态系统的建议

        1. 构建端到端的数据管道: 尝试构建一个完整的数据管道,从数据采集、

          @ServicepublicclassOrderService{@AutowiredprivateKafkaTemplate<String,String>kafkaTemplate;publicvoidcreateOrder(Orderorder){// 处理订单逻辑...// 发送订单创建事件kafkaTemplate.send("order-created",order.getId(),order.toJson());}}@ServicepublicclassInventoryService{@KafkaListener(topics ="order-created")publicvoidhandleOrderCreated(ConsumerRecord<String,String>record){Orderorder =Order.fromJson(record.value());// 更新库存逻辑...}}

          2. 日志聚合与分析

          在大规模系统中,Kafka可以作为集中式日志收集的管道,将分散在各处的日志汇聚起来,然后送入Elasticsearch或Hadoop等系统进行分析。

        2. 持续迭代: 学习是一个循环的过程。

        Kafka 是什么?

        image.png

        在我们开始学习之旅之前,先简单介绍一下 Kafka。真正的挑战和乐趣在于将这些知识应用到实际的生产环境中,解决真实世界的问题。

        Kafka实战案例

        1. 实时日志分析系统

        image.png

        假设我们需要构建一个实时日志分析系统,用于监控和分析大规模分布式系统的日志。

      • 跨技术栈实践: 尝试将Kafka与不同的技术栈(如Hadoop、统计和存储等功能。