【C#】Kafka for .NET 入坑记录
时间:2025-06-24 12:27:54 来源:新华社
【字体:  

【C#】Kafka for .NET 入坑记录

一、有关Kafka

Kafka 是一个分布式、多副本、基于Zookeeper的消息流处理平台。Kafka 最初由Linkedin开发,为了解决 data pipeline 问题。2011年捐献给 Apache基金会,后来成为Apache的顶级开源项目。

Kafka 的未来:实时数据流平台。(出自饶军)

1-1、基本概念

  • 消息:Kafka 中的数据单元,也被称为记录。
  • 批次:消息会分批次写入Kafka,批次就是指一组消息。
  • 主题:消息的种类,也可以说一个主题代表一类消息。
  • 分区:物理上最小的存储单元。一个主题可以分为多个分区,每个分区内消息是有序的。
  • 生产者:生产消息,发送到主题。
  • 消费者:从主题获取消息,处理消息。
  • 消费者群组:由一个或多个消费者组成的群体。在同一个消费组中,一个消息只能被一个消息者消息。
  • 偏移量:一个不断自增的整数值,分区中一条消息的唯一标示符。可表示消费者在分区的位置。
  • broker:一个独立的Kafka服务器。
  • broker集群:由一个或多个broker组成的集群。
  • 副本:消息的备份。Kafka定义了两类副本(领导者副本、追随者副本)。
  • 重平衡:其个消费者实例挂掉后,其他消息者实例自动重新分配的过程。

相关词汇:

消息 - message分区 - partition主题 - topic生产者 - producer消费者 - consumer消费者群组 - consumer group偏移量 - consumer offset缓存代理 - broker副本 - replica领导者副本 - leader replica追随者副本 - follower replica重平衡 - rebalance

1-2、Kafka发行版本:

  • Apache Kafak:也称社区版 Kafka。
  • Confluent Kafka:Confluent 公司提供的 Kafka。
    2014年,Kafka的3个创始人 Jay Kreps、Naha Narkhede 和 Jun Rao 离开 LinkedIn,创立了Confluent公司,专注于提供基于 Kafka 的企业流处理解决方案。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,如跨数据中心备份、Schema 注册中心以及集群监控工具等。
  • Cloudera/Hortonworks Kafka:Cloudera 提供的 CDH Kafka 和 Hortonworks 提供的 HDP Kafka。集成了 Apache Kafka,通过便捷化的界面操作将 Kafka 的安装、运维、管理、监控全部统一在控制台中。

二、Technology Libraries

2-1、Confluent.Kafka

引用库

<PackageReferenceInclude="Confluent.Kafka"Version="2.8.0"/>
conststringtopic ="test";conststringhost ="localhost:9092";#region消费者vart1 =Task.Run(async()=>{ varcfg =newConsumerConfig(){ GroupId =$"test-consumer-group-console",BootstrapServers =host,AutoOffsetReset =AutoOffsetReset.Earliest,EnableAutoCommit =true// 自动提交};usingvarbuilder =newConsumerBuilder<Ignore,string>(cfg).Build();builder.Subscribe(topic);varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};try{ while(true){ try{ varconsumer =builder.Consume(cts.Token);Console.WriteLine($"Message: { consumer.Message.Value}received from { consumer.TopicPartitionOffset}");// builder.Commit(consumer); // 手动提交}catch(ConsumeExceptione){ Console.WriteLine($"Error occured: { e.Error.Reason}");}awaitTask.Yield();}}catch(OperationCanceledExceptionex){ builder.Close();Console.WriteLine($"ex: { ex}");}});#endregion消费者#region生产者vart2 =Task.Run(async()=>{ varcfg =newProducerConfig{ BootstrapServers =host,};varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};// 生产者usingvarproducer =newProducerBuilder<Null,string>(cfg).Build();while(true){ if(cts.IsCancellationRequested)break;try{ awaitproducer.ProduceAsync(topic,newMessage<Null,string>{ Value =JsonConvert.SerializeObject(newKafkaMessage{ Ts =DateTimeOffset.Now,Text =newFaker("en").Address.FullAddress()})});}catch(ProduceException<Null,KafkaMessage>ex){ Console.WriteLine($"ex: { ex.Error.Reason}");}awaitTask.Delay(TimeSpan.FromMilliseconds(200));}});#endregion生产者Console.ReadLine();awaitTask.WhenAll(t1,t2);

2-2、DotNetCore.CAP.Kafka

CAP 是一种处理分布式事务的解决方案,能够保证任务情况下事件消息都不会丢失。可作 EventBus 来使用。

引用库

<PackageReferenceInclude="DotNetCore.CAP.InMemoryStorage"Version="8.3.2"/><PackageReferenceInclude="DotNetCore.CAP.Kafka"Version="8.3.2"/><PackageReferenceInclude="Microsoft.Extensions.DependencyInjection"Version="9.0.0"/><PackageReferenceInclude="Microsoft.Extensions.Logging.Console"Version="9.0.0"/>
varservices =newServiceCollection();services.AddLogging(config =>{ config.SetMinimumLevel(LogLevel.Debug);config.AddConsole();});services.AddCap(config =>{ config.UseInMemoryStorage();config.UseKafka(opt =>{ opt.Servers ="localhost:9092";// 自定义Headers, cap-msg-id 和 cap-msg-name 必选项opt.CustomHeadersBuilder =(kr,sp)=>newList<KeyValuePair<string,string>>{ newKeyValuePair<string,string>("cap-msg-id",Guid.NewGuid().ToString()),newKeyValuePair<string,string>("cap-msg-name","test"),newKeyValuePair<string,string>("kafka.offset",kr.Offset.ToString()),};});config.FailedRetryInterval =10;// 重试的间隔时间,默认是60sconfig.DefaultGroupName ="test-group";});services.AddTransient<ICapSubscribe,SubscriberService>();varsp =services.BuildServiceProvider();varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};// 启动CAPvarbootstrap =sp.GetRequiredService<IBootstrapper>();awaitbootstrap.BootstrapAsync(cts.Token);awaitTask.Delay(500);// 生产者ICapPublishercapBus =sp.GetRequiredService<ICapPublisher>();while(true){ if(cts.IsCancellationRequested)break;// 推送消息capBus.Publish("test",newKafkaMessage{ Ts =DateTimeOffset.Now,Text =newFaker().Address.FullAddress()});awaitTask.Delay(TimeSpan.FromMilliseconds(200));}awaitbootstrap.DisposeAsync();Console.ReadLine();

消费者

internalclassSubscriberService:ICapSubscribe{ [CapSubscribe("test")]publicvoidReceiveMessage(KafkaMessagemsg,[FromCap]CapHeaderheader){ if(msg !=null){ Console.WriteLine($"Message: { msg.Ts}| { msg.Text}");}}}

2-3、MassTransit.Kafka

MassTransit 基于消息驱动的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的事件总线。

引用库

<PackageReferenceInclude="MassTransit.Kafka"Version="8.3.4"/><PackageReferenceInclude="Microsoft.Extensions.DependencyInjection"Version="9.0.0"/>
varservices =newServiceCollection();services.AddMassTransit(config =>{ conststringtopic ="test";conststringgroupId ="test-consumer-group";conststringhost ="localhost:9092";config.UsingInMemory((context,cfg)=>cfg.ConfigureEndpoints(context));config.AddRider(rider =>{ // 注册生产者rider.AddProducer<KafkaMessage>(topic);// 注册消费者rider.AddConsumer<KafkaMessageConsumer>();// Kafka连接配置rider.UsingKafka((context,k)=>{ k.Host(host);// 配置 topick.TopicEndpoint<KafkaMessage>(topic,groupId,e =>{ e.AutoOffsetReset =AutoOffsetReset.Earliest;e.ConfigureConsumer<KafkaMessageConsumer>(context);e.CreateIfMissing();});});});});varsp =services.BuildServiceProvider();varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};varbus =sp.GetRequiredService<IBusControl>();bus.Start(TimeSpan.FromSeconds(10.0));try{ // 生产者varproducer =sp.GetRequiredService<ITopicProducer<KafkaMessage>>();while(true){ if(cts.IsCancellationRequested)break;try{ // 推送消息stringvalue=newFaker().Address.FullAddress();awaitproducer.Produce(newKafkaMessage{ Ts =DateTimeOffset.Now,Text =value});awaitTask.Delay(TimeSpan.FromMilliseconds(200));}catch(ProduceException<Ignore,KafkaMessage>pex){ Console.WriteLine(pex.Error.Reason);}}}catch(OperationCanceledExceptioncce){ }finally{ awaitbus.StopAsync();}Console.ReadLine();

消费者

internalclassKafkaMessageConsumer:IConsumer<KafkaMessage>{ publicTaskConsume(ConsumeContext<KafkaMessage>context){ varctx =(context.ReceiveContext asKafkaReceiveContext<Ignore,KafkaMessage>);Console.WriteLine($"Message: { context.Message.Ts}| { context.Message.Text}, Offset: { ctx?.Offset}");returnTask.CompletedTask;}}

2-4、Streamiz.Kafka.Net

Kafka Streams 流式计算

// kafka连接配置varconfig =newStreamConfig<StringSerDes,StringSerDes>();config.ApplicationId ="test-app";config.BootstrapServers ="localhost:9092";StreamBuilderbuilder =newStreamBuilder();builder.Stream<string,string>("test")// 输入 topic (订阅 topic = test)// 定义处理过程.MapValues((value,context)=>{ return$"{ value}---- 1111 ---- { context.Offset}";})// 定义处理过程.MapValues((value,context)=>{ return$"{ value}---- 2222 ---- { context.Offset}";})// 定义处理过程// ...// 定义处理过程.To("test-output");// 输出 topic (把处理结束发到 topic = test-output)Topologyt =builder.Build();KafkaStreamstream =newKafkaStream(t,config);awaitstream.StartAsync();Console.ReadLine();stream.Dispose();

输入:a
输出:a ---- 1111 ---- 4910 ---- 2222 ---- 4910

[责任编辑:百度一下]
检察日报数字报 | 正义网 |
Copyrights©最高人民检察院 All Rights Reserved.