MQ对比 MQ:MessageQueue,消息队列。例如,某大型企业,可能在北京机房和长沙机房分别搭建RabbitMQ服务,然后希望长沙机房需要同步北京机房的消息,这样可以让长沙的消费者服务可以直接连接长沙本地的RabbitMQ,而不用费尽周折去连接北京机房的RabbitMQ服务。
其中有个特例就是Poison Message handling(处理有毒的消息)。发送批量消息之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。比如Kafka现在基本可以做到数据不丢失。类似的工具还有很多,比如F5,nginx等。
3、需要在搭建了普通集群之后再补充搭建。这种队列类似于RocketMQ当中的DLedger集群。
RabbitMQ高可用集群架构 RabbitMQ考虑了两种集群模式:
默认的普通集群模式 :
这种模式使用Erlang语言天生具备的集群方式搭建。比如Non-durable queues表示是非持久化的内存队列。
Topics 基于话题的路由 type为"topic" 的exchange
这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配。虚拟主机 virtual host
RabbitMQ出于服务器复用的想法,可以在一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有全套的基础服务组件,可以针对每个虚拟主机进行权限以及数据分配。通常情况下,第三种异步确认机制的性能是最好的。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。
对于RabbitMQ同样,针对单个队列,如何增加吞吐量呢? 消费者并不能对消息增加消费并发度,所以,RabbitMQ的集群机制并不能增加单个队列的吞吐量。这种模式下,队列数量最好不要过多。因为如果其中有个节点服务宕机了,那这个节点上的数据就无法消费了,需要等到这个节点服务恢复后才能消费,而这时,消费者端已经消费过的消息就有可能给不了服务端正确应答,服务起来后,就会再次消费这些消息,造成这部分消息重复消费。如果Haproxy服务崩溃了,整个应用程序就完全无法访问RabbitMQ了。信道 Channel
一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。其本质区别在于,这种模式会在镜像节点中间主动进行消息同步,而不是在客户端拉取消息时临时同步。
发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。数据安全方面的要求比低延迟、消息:在不同应用程序之间传递的数据。当然,这是以增加磁盘IO为代价的。
这种模式的消息可靠性更高,因为每个节点上都存着全量的消息。消费者把requeue参数设置为true(false),并且在消费后,向RabbitMQ返回拒绝。Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在"x-delivery-count"这样一个头部参数中。 Durable表示队列会将消息保存到硬盘,这样消息的安全性更高。Headers类型的Exchange就是一种忽略routingKey的路由方式。 但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。RabbitMQ的Stream队列就是模拟Kafka的实现机制,消息吞吐量也提升了非常多。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。而接下来,死信交换机就可以像普通交换机一样,通过RoutingKey将消息转发到对应的死信队列中。**例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。
RabbitMQ的重要概念 1、消费时,如果消费的不是存有数据的节点, RabbitMQ会临时在节点之间进行数据传输,将消息从存有数据的节点传输到消费的节点。使用Stream队列可以比较轻松的在队列中积累百万级别的消息。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。简单理解,他是RabbitMQ对于未能正常消费的消息进行的一种补救机制。
然后,是中间件最为关键的分发方式。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。keepalived是一个搭建高可用服务的常见工具。完成以后关闭连接,释放资源
Consumer主要有两种消费方式:
被动消费模式
Consumer等待rabbitMQ 服务器将message推送过来再消费。
在这里,x-dead-letter-exchange指定一个交换机作为死信交换机,然后x-dead-letter-routing-key指定交换机的RoutingKey。他们的部署也不麻烦,就是下载+配置+运行即可。这时要如何进行数据同步呢?搭建一个跨度这么大的内部子网显然就不太划算。
懒队列会尝试尽可能早的把消息写到硬盘中。还要注意,官方特意提到,所有的queue是不能被多次定义的。
为了防止这种情况情况,可以在RabbitMQ之前部署一个Haproxy,这是一个TCP负载均衡工具。
ActiveMQ、官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。
RabbitMQ常用消息场景 Work queues 工作序列 这是RabbitMQ最基础也是最常用的一种工作机制。
Publisher Confirms 发送者消息确认 RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。
2、Queue不需要Exchange也可以独立工作,只不过通常在业务场景中,会增加Exchange实现更复杂的消息分配策略。
producer: //只负责往exchange里发消息,后面的事情不管。队列 Queue
Queue是实际保存数据的最小单位。但是如何在消费者的处理能力有限的前提下提升消费者的消费速度呢?RabbitMQ提供的Sharding插件,就提供了一种思路。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。
另一种是主动消费模式
Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。
RabbitMQ高级功能 选择合适的队列 Classic经典队列 这是RabbitMQ最为经典的队列类型。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。
4、
镜像模式 :
这种模式是在普通集群模式基础上的一种增强方案,这也就是RabbitMQ的官方HA高可用方案。Haproxy反向代理
有了镜像集群之后,客户端应用就可以访问RabbitMQ集群中任意的一个节点了。
RabbitMQ核心编程模型 step1、
联邦插件 在企业中有很多大型的分布式场景,在这些业务场景下,希望服务也能够同样进行分布式部署。
Haproxy+Keeperalived 的组合是分布式场景中经常用到的一种高可用方案。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。
在RabbitMQ中,经典队列是一种非常传统的队列结构。真正要提升RabbitMQ单队列的吞吐量,还是要从数据也就是消息入手,只有将数据真正的分开存储才行。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。不同虚拟主机之间是完全隔离的,如果不考虑资源分配的情况,一个虚拟主机就可以当成一个独立的RabbitMQ服务使用。
消息分片存储插件 Lazy Queue懒队列机制提升了消息在RabbitMQ中堆积的能力,但是最终,消息还是需要消费者处理消化。
对于死信队列,在RabbitMQ中主要涉及到几个参数。
并且在集群内部有一个算法会选举产生master和slave,当一个master挂了后,也会自动选出一个来。
在官网的示例中,重点解释了三种策略:
1、当然,这并不是我们的重点,只要了解即可。
死信队列 死信队列是RabbitMQ中非常重要的一个特性。
消息达到预设的TTL时限还一直没有被消费。# 代表0个或多个单词。Consumer消费消息step7、
其中,Durability有两个选项,Durable和Transient。如果你熟悉Kafka或者RocketMQ,会对这种日志记录消息的方式非常熟悉。
Stream流式队列 Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型。一般是启一个一直挂起的线程来等待。
RabbitMQ常见的高可用集群部署方案 镜像集群+Haproxy+Keepalived 1、首先创建连接,获取Channel
step2、未来如果需要搭建集群,就需要通过这些Broker来构建。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,依然表现不是很理想,所以就有了懒队列的出现。这样就保证了Haproxy服务的高可用性。另一种是any,表示只要满足其中一个键值就可以了。
消息由于队列已经达到最长长度限制而被丢掉 对队列进行配置时,只有Classic经典队列和Quorum仲裁队列才能配置死信队列,而目前Stream流式队列,并不支持配置死信队列。
整体上来说,RabbitMQ的Stream队列,其实有很多地方借鉴了其他MQ产品的优点,在保证消息可靠性的基础上,着力提高队列的消息吞吐量以及消息转发性能。异步确认消息实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。
交换机多用来与生产者打交道。RabbitMQ、
Headers 头部路由机制 官网示例中的集中路由策略, direct,fanout,topic等这些Exchange,都是以routingkey为关键字来进行消息路由的,但是这些Exchange有一个普遍的局限就是都是只支持一个字符串的形式,而不支持其他形式。因此,经典队列主要用在数据量比较小,并且生产消息和消费消息的速度比较稳定的业务场景 。RabbitMQ以往更专注于企业级的内部使用,但是从这些队列功能可以看到,Rabbitmq也在向更复杂的互联网环境靠拢,未来对于RabbitMQ的了解,也需要随着版本推进,不断更新。这样即可以提高数据的安全性,也能够提升消息读取的性能。
很显然,这种集群模式的消息可靠性不是很高。声明Exchange-可选
step3、声明Exchange与Queue的绑定关系-可选
step5、这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点: 1、连接 Connection
客户端与RabbitMQ进行交互,首先就需要建立一个TCP连接,这个连接就是Connection。RabbitMQ 3.8.X版本添加了Quorum队列,3.9.X又添加了Stream队列。此时,对于RabbitMQ来说,整个集群的服务是稳定的。队列可能会因为一些原因变得非常长-也就是数据堆积。匹配的方式有两种,一种是all,表示需要所有的键值对都满足才行。 当主Haproxy服务出现异常后,keepalived可以将虚拟IP转为绑定到从Haproxy服务的网卡上,这个过程称为VIP漂移。
谈到Sharding,你是不是就想到了分库分表?对于数据库的分库分表,分库可以减少数据库的IO性能压力,而真正要解决单表数据太大的问题,就需要分表。服务主机Broker
一个搭建RabbitMQ Server的服务器称为Broker。下方有几个属性也都是来定义日志文件的大小以及保存时间。三种确认机制的区别这三种确认机制都能够提升Producer发送消息的安全性。
工作任务模式,领导部署一个任务,由下面的一个员工来处理。
懒队列 RabbitMQ从3.6.0版本开始,就引入了懒队列(Lazy Queue)的概念。
4、消息被Consumer从队列中取出后就会从队列中删除。也就是把producer与Consumer进行进一步的解耦。他会真正将一个队列中的消息分散存储到不同的节点上,并提供多个节点的负载均衡策略实现对等的读与写功能。Producer根据应用场景发送消息到queue
step6、
Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。既然是通道,那就需要尽量注意在停止使用时要关闭,释放资源。Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后,再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。但是同时又带来了Haproxy的单点崩溃问题。从官网的封面就能看到,现在RabbitMQ主推的是Quorum队列。
所以这种集群模式只适合一些对消息安全性不是很高的场景。