Kafka 是一个分布式、多副本、基于Zookeeper的消息流处理平台。Kafka 最初由Linkedin开发,为了解决 data pipeline 问题。2011年捐献给 Apache基金会,后来成为Apache的顶级开源项目。
Kafka 的未来:实时数据流平台。(出自饶军)
相关词汇:
消息 - message分区 - partition主题 - topic生产者 - producer消费者 - consumer消费者群组 - consumer group偏移量 - consumer offset缓存代理 - broker副本 - replica领导者副本 - leader replica追随者副本 - follower replica重平衡 - rebalance
引用库
<PackageReferenceInclude="Confluent.Kafka"Version="2.8.0"/>
conststringtopic ="test";conststringhost ="localhost:9092";#region消费者vart1 =Task.Run(async()=>{ varcfg =newConsumerConfig(){ GroupId =$"test-consumer-group-console",BootstrapServers =host,AutoOffsetReset =AutoOffsetReset.Earliest,EnableAutoCommit =true// 自动提交};usingvarbuilder =newConsumerBuilder<Ignore,string>(cfg).Build();builder.Subscribe(topic);varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};try{ while(true){ try{ varconsumer =builder.Consume(cts.Token);Console.WriteLine($"Message: { consumer.Message.Value}received from { consumer.TopicPartitionOffset}");// builder.Commit(consumer); // 手动提交}catch(ConsumeExceptione){ Console.WriteLine($"Error occured: { e.Error.Reason}");}awaitTask.Yield();}}catch(OperationCanceledExceptionex){ builder.Close();Console.WriteLine($"ex: { ex}");}});#endregion消费者#region生产者vart2 =Task.Run(async()=>{ varcfg =newProducerConfig{ BootstrapServers =host,};varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};// 生产者usingvarproducer =newProducerBuilder<Null,string>(cfg).Build();while(true){ if(cts.IsCancellationRequested)break;try{ awaitproducer.ProduceAsync(topic,newMessage<Null,string>{ Value =JsonConvert.SerializeObject(newKafkaMessage{ Ts =DateTimeOffset.Now,Text =newFaker("en").Address.FullAddress()})});}catch(ProduceException<Null,KafkaMessage>ex){ Console.WriteLine($"ex: { ex.Error.Reason}");}awaitTask.Delay(TimeSpan.FromMilliseconds(200));}});#endregion生产者Console.ReadLine();awaitTask.WhenAll(t1,t2);
CAP 是一种处理分布式事务的解决方案,能够保证任务情况下事件消息都不会丢失。可作 EventBus 来使用。
引用库
<PackageReferenceInclude="DotNetCore.CAP.InMemoryStorage"Version="8.3.2"/><PackageReferenceInclude="DotNetCore.CAP.Kafka"Version="8.3.2"/><PackageReferenceInclude="Microsoft.Extensions.DependencyInjection"Version="9.0.0"/><PackageReferenceInclude="Microsoft.Extensions.Logging.Console"Version="9.0.0"/>
varservices =newServiceCollection();services.AddLogging(config =>{ config.SetMinimumLevel(LogLevel.Debug);config.AddConsole();});services.AddCap(config =>{ config.UseInMemoryStorage();config.UseKafka(opt =>{ opt.Servers ="localhost:9092";// 自定义Headers, cap-msg-id 和 cap-msg-name 必选项opt.CustomHeadersBuilder =(kr,sp)=>newList<KeyValuePair<string,string>>{ newKeyValuePair<string,string>("cap-msg-id",Guid.NewGuid().ToString()),newKeyValuePair<string,string>("cap-msg-name","test"),newKeyValuePair<string,string>("kafka.offset",kr.Offset.ToString()),};});config.FailedRetryInterval =10;// 重试的间隔时间,默认是60sconfig.DefaultGroupName ="test-group";});services.AddTransient<ICapSubscribe,SubscriberService>();varsp =services.BuildServiceProvider();varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};// 启动CAPvarbootstrap =sp.GetRequiredService<IBootstrapper>();awaitbootstrap.BootstrapAsync(cts.Token);awaitTask.Delay(500);// 生产者ICapPublishercapBus =sp.GetRequiredService<ICapPublisher>();while(true){ if(cts.IsCancellationRequested)break;// 推送消息capBus.Publish("test",newKafkaMessage{ Ts =DateTimeOffset.Now,Text =newFaker().Address.FullAddress()});awaitTask.Delay(TimeSpan.FromMilliseconds(200));}awaitbootstrap.DisposeAsync();Console.ReadLine();
消费者
internalclassSubscriberService:ICapSubscribe{ [CapSubscribe("test")]publicvoidReceiveMessage(KafkaMessagemsg,[FromCap]CapHeaderheader){ if(msg !=null){ Console.WriteLine($"Message: { msg.Ts}| { msg.Text}");}}}
MassTransit 基于消息驱动的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的事件总线。
引用库
<PackageReferenceInclude="MassTransit.Kafka"Version="8.3.4"/><PackageReferenceInclude="Microsoft.Extensions.DependencyInjection"Version="9.0.0"/>
varservices =newServiceCollection();services.AddMassTransit(config =>{ conststringtopic ="test";conststringgroupId ="test-consumer-group";conststringhost ="localhost:9092";config.UsingInMemory((context,cfg)=>cfg.ConfigureEndpoints(context));config.AddRider(rider =>{ // 注册生产者rider.AddProducer<KafkaMessage>(topic);// 注册消费者rider.AddConsumer<KafkaMessageConsumer>();// Kafka连接配置rider.UsingKafka((context,k)=>{ k.Host(host);// 配置 topick.TopicEndpoint<KafkaMessage>(topic,groupId,e =>{ e.AutoOffsetReset =AutoOffsetReset.Earliest;e.ConfigureConsumer<KafkaMessageConsumer>(context);e.CreateIfMissing();});});});});varsp =services.BuildServiceProvider();varcts =newCancellationTokenSource();Console.CancelKeyPress +=(s,e)=>{ e.Cancel =true;cts.Cancel();};varbus =sp.GetRequiredService<IBusControl>();bus.Start(TimeSpan.FromSeconds(10.0));try{ // 生产者varproducer =sp.GetRequiredService<ITopicProducer<KafkaMessage>>();while(true){ if(cts.IsCancellationRequested)break;try{ // 推送消息stringvalue=newFaker().Address.FullAddress();awaitproducer.Produce(newKafkaMessage{ Ts =DateTimeOffset.Now,Text =value});awaitTask.Delay(TimeSpan.FromMilliseconds(200));}catch(ProduceException<Ignore,KafkaMessage>pex){ Console.WriteLine(pex.Error.Reason);}}}catch(OperationCanceledExceptioncce){ }finally{ awaitbus.StopAsync();}Console.ReadLine();
消费者
internalclassKafkaMessageConsumer:IConsumer<KafkaMessage>{ publicTaskConsume(ConsumeContext<KafkaMessage>context){ varctx =(context.ReceiveContext asKafkaReceiveContext<Ignore,KafkaMessage>);Console.WriteLine($"Message: { context.Message.Ts}| { context.Message.Text}, Offset: { ctx?.Offset}");returnTask.CompletedTask;}}
Kafka Streams 流式计算
// kafka连接配置varconfig =newStreamConfig<StringSerDes,StringSerDes>();config.ApplicationId ="test-app";config.BootstrapServers ="localhost:9092";StreamBuilderbuilder =newStreamBuilder();builder.Stream<string,string>("test")// 输入 topic (订阅 topic = test)// 定义处理过程.MapValues((value,context)=>{ return$"{ value}---- 1111 ---- { context.Offset}";})// 定义处理过程.MapValues((value,context)=>{ return$"{ value}---- 2222 ---- { context.Offset}";})// 定义处理过程// ...// 定义处理过程.To("test-output");// 输出 topic (把处理结束发到 topic = test-output)Topologyt =builder.Build();KafkaStreamstream =newKafkaStream(t,config);awaitstream.StartAsync();Console.ReadLine();stream.Dispose();
输入:a
输出:a ---- 1111 ---- 4910 ---- 2222 ---- 4910