消息中间件
消息中间件
1.消息队列
0.什么是消息队列?
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。
队列Queue是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
1.为什么要使用消息队列/消息队列有什么用?
总结一下,主要三点原因:解耦、异步、削峰。
1、解耦。使用消息队列,可以避免模块之间直接调用,将所需共享的数据放在消息队列中,对于新增业务模块,只要对该类消息感兴趣,即可订阅该类消息,对原有系统和业务没有任何影响,降低了系统各个模块的耦合度,提高了系统的可扩展性。
2、异步。消息队列允许生产者将消息发送到队列中,而不需要等待消费者立即处理。这样可以实现异步通信,提高系统的响应速度和吞吐量。
3、削峰。消息队列可以将消息存储起来,供消费者在适当的时间进行处理,从而实现延迟处理。高峰期的消息可以被积压起来,在随后的时间内进行平滑的处理完成,而不至于让系统短时间内无法承载而导致崩溃。在电商网站的秒杀抢购这种突发性流量很强的业务场景中,消息队列的强大缓冲能力可以很好的起到削峰作用。
2. 使用了消息队列会有什么缺点
- 系统可用性降低。引入消息队列之后,如果消息队列挂了,可能会影响到业务系统的可用性。
- 系统复杂性增加。加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。
3.RabbitMQ和Kafka的对比
- 应用场景方面 RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。性能极其好,延时很低,达到微秒级。社区成熟且活跃,提供完善的文档和插件支持 Kafka:用于处于活跃的流式数据,大数据量的数据处理上。提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集
- 架构模型方面
RabbitMQ:以broker为中心,有消息的确认机制。 Kafka:以consumer为中心,没有消息的确认机制。
- 吞吐量方面 RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储 可以采用内存或硬盘,吞吐量小。 Kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率 高,吞吐量高。
- 集群负载均衡方面 RabbitMQ:本身不支持负载均衡,需要loadbalancer的支持。 Kafka:采用zookeeper对集群中的broker,consumer进行管理,可以注册topic到zookeeper 上,通过zookeeper的协调机制,producer保存对应的topic的broker信息,可以随机或者轮询发 送到broker上,producer可以基于语义指定分片,消息发送到broker的某个分片上。
3.MQ常用协议
- AMQP协议 AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。优点:可靠、通用
- MQTT协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统
- STOMP协议 STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。优点:命令模式(非topic/queue模式)
- XMPP协议 XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大
- 其他基于TCP/IP自定义的协议:有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。
4.MQ的通讯模式
- 点对点通讯:点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
- 多点广播:MQ适用于不同类型的应用。其中重要的,也是正在发展中的是”多点广播”应用,即能够将消息发送到多个目标站点(Destination List)。可以使用一条MQ指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ将消息的一个复制版本和该系统上接收者的名单发送到目标MQ系统。目标MQ系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
- 发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。
- **集群(Cluster)**:为了简化点对点通讯模式中的系统配置,MQ提供 Cluster 的解决方案。集群类似于一个 域(Domain) ,集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用 Cluster 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性
5.多线程异步和MQ的区别
- CPU消耗。多线程异步可能存在CPU竞争,而MQ不会消耗本机的CPU。
- MQ 方式实现异步是完全解耦的,适合于大型互联网项目。
- 削峰或者消息堆积能力。当业务系统处于高并发,MQ可以将消息堆积在Broker实例中,而多线程会创建大量线程,甚至触发拒绝策略。
- 使用MQ引入了中间件,增加了项目复杂度和运维难度。
总的来说,规模比较小的项目可以使用多线程实现异步,大项目建议使用MQ实现异步。
6.MQ幂等性怎么保证/怎么解决消息重复消费问题?
MQ(消息队列)的幂等性可以通过多种措施来保证,确保系统即使在出现重复消息的情况下也能保持数据的一致性和正确性。具体如下:
- 生成全局唯一的inner-msg-id:在消息发送端(上半场),MQ客户端应为每条消息生成一个全局唯一且业务无关的inner-msg-id。这个ID由MQ保证其唯一性,并且对业务透明。这样,即使在网络波动或超时导致的重发情况下,MQ服务器可以利用这个ID进行去重,确保相同的消息不会被处理多次。
- 业务层的去重处理:在消息的消费端(下半场),业务系统需要根据业务特点带入业务相关的biz-id。消费端通过这个biz-id来进行去重,以保证对于同一业务操作,即使收到了多次相同消息,也只会被处理一次。这要求业务系统具备去重逻辑,通常涉及到数据库的唯一约束或者分布式锁等机制来保证操作的幂等性。
- 利用乐观锁机制:借鉴数据库中乐观锁的思想,可以为涉及状态变更的业务数据添加版本号。每次更新前先检查版本号,只有在版本号匹配的情况下才执行更新操作,并更新版本号。这种方式可以防止并发下的重复操作影响数据一致性。
- 消息确认机制:MQ客户端在发送消息给MQ服务器后,等待服务器的确认(ACK)。如果确认丢失或超时,客户端会重发消息。为了避免因此导致的重复消息处理,MQ服务器内部需要有机制识别并丢弃重复的消息,例如利用上述提到的inner-msg-id进行去重。
- 消费端幂等设计:在消费端实现业务逻辑时,需要考虑到消息可能会被重复投递的情况。因此,消费端的业务处理逻辑应当设计成幂等的,即多次执行相同的操作不会对最终结果产生影响。
- 事务性消息:对于需要保证精确一次消费的场景,可以使用事务性消息。这意味着消息的发送和消费是原子性的,要么都成功,要么都失败。这种方式下,系统能更好地控制幂等性,但可能会牺牲一些性能。
- 限流和补偿机制:在高并发场景下,适当的限流策略可以避免系统因过载而产生错误的重复操作。同时,建立补偿机制可以在检测到异常操作时进行修正。
综上所述,保证MQ幂等性是一个系统性工程,既涉及到技术层面的改进,如ID生成、去重、乐观锁等,也需要业务逻辑层面的支持,如业务去重、幂等设计等。
7.MQ 中的消息过期失效了怎么办?
如果使用的是RabbitMQ的话,RabbtiMQ 是可以设置过期时间的(TTL)。如果消息在 Queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。这时的问题就不是数据会大量积压在 MQ 里,而是大量的数据会直接搞丢。这个情况下,就不是说要增加 Consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。
我们可以采取一个方案,就是批量重导。就是大量积压的时候,直接将数据写到数据库,然后等过了高峰期以后将这批数据一点一点的查出来,然后重新灌入 MQ 里面去,把丢的数据给补回来。
8.消息队列里面拉取模式和推送模式的比较
在消息队列系统中,推送模式(Push)和拉取模式(Pull)是两种基本的消息传输机制。它们之间存在一定的区别。具体分析如下:
- 推送模式:在MQ中也就是Broker收到消息后主动推送给Consumer的操作,叫做推模式。推模式的实现是客户端会与服务端(Broker)建立长连接,当有消息时服务端会通过长连接通道将消息主动推送给客户端,这样客户端就能实时消费到最新的消息。优点: 实时性强,有消息立马推送给客户端,吞吐量大。客户端实现简单,只需要监听服务端的推送即可。缺点: 容易导致客户端发生消息堆积的情况,因为每个客户端的消费能力是不同的,如果简单粗暴的有消息就推送,就会会出现堆积情况。
- 拉取模式:在MQ中也就是客户端主动从服务器Broker端获取信息。很多拉模式都是基于长轮询来实现。长轮询就是客户端向服务端发起请求,如果此时有数据就直接返回,如果没有数据就保持连接,等到有数据时就直接返回。如果一直没有数据,超时后客户端再次发起请求,保持连接,这就是长轮询的实现原理。很多的开源框架都是用的这种方式,比如配置中心Apollo的推送优点: 不会造成客户端消息积压,消费完了再去拉取,主动权在自己手中。长轮询实现的拉模式实时性也能够保证。缺点: 实时性较差,针对于服务器端实时更新的信息,客户端难以获取实时信息;
推和拉都有各自的优势和劣势,不过目前主流的消息队列大部分都用的拉模式,比如RocketMQ,Kafka。
9.如果有100万消息堆积在MQ , 如何解决 ?
我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的
第一:提高消费者的消费能力 ,可以使用多线程消费任务
第二:增加更多消费者,提高消费速度
使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
第三:扩大队列容积,提高堆积上限
可以使用RabbitMQ惰性队列,惰性队列的好处主要是
①接收到消息后直接存入磁盘而非内存
②消费者要消费消息时才会从磁盘中读取并加载到内存
③支持数百万条的消息存储
10.说一说生产者与消费者模式
所谓生产者-消费者问题,实际上主要是包含了两类线程。一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库。生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
\1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;\2. 如果共享数据区为空的话,阻塞消费者继续消费数据。在Java语言中,实现生产者消费者问题时,可以采用三种方式:\1. 使用 Object 的 wait/notify 的消息通知机制;\2. 使用 Lock 的 Condition 的 await/signal 的消息通知机制;\3. 使用 BlockingQueue 实现。
2.RabbitMQ
1.什么是RabbitMQ
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的整,体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
什么是AMQP
RabbitMQ 就是 AMQP 协议的 Erlang
的实现(当然 RabbitMQ 还支持 STOMP2
、 MQTT3
等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
AMQP 协议的三层:
- Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
- Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
- TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
AMQP 模型的三大组件:
- **交换器 (Exchange)**:消息代理服务器中用于把消息路由到队列的组件。
- **队列 (Queue)**:用来存储消息的数据结构,位于硬盘或内存中。
- **绑定 (Binding)**:一套规则,告知交换器消息应该将消息投递给哪个队列。
2.RabbitMQ 核心概念?
0.Message消息体
Message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key、priority、delivery-mode(是否持久性存储)等。
1.Producer(生产者) 和 Consumer(消费者)
- Producer(生产者) :生产消息的一方
- Consumer(消费者) :消费消息的一方
消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。消息体也可以称为 payLoad ,消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
2.Exchange(交换器)
消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。
RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers,不同类型的 Exchange 转发消息的策略有所区别。
生产者将消息发给交换器的时候,一般会指定一个 **RoutingKey(路由键)**,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
4种交换机
fanout 类型的 Exchange ,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。anout 类型常用来广播消息。
**direct **类型的 Exchange 会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。常用在处理有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
topic 类型的交换器使用routing key和binding key进行模糊匹配,匹配成功则将消息发送到相应的队列。routing key和binding key都是句点号“. ”分隔的字符串,binding key中可以存在两种特殊字符“”与“##”,用于做模糊匹配,其中“”用于匹配一个单词,“##”用于匹配多个单词。
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
3.Queue(消息队列)
Queue(消息队列) 用来保存消息直到发送给消费者。队列的特性是先进先出。一个消息可分发到一个或多个队列。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
4.Broker(消息中间件的服务节点)
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
消息服务器,作为server提供消息核心服务
3.Rabbit5种工作模式
RabbitMQ常用的工作模式有:简单队列模式、工作队列模式、发布订阅模式、路由模式、主题模式。
简单队列模式(Simple Queue)只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列接收消息。(单生产单消费)
上图中,“P”是我们的生产者,“C”是我们的消费者。
**工作队列模式(Work Queues)**多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费。工作队列有轮训分发和公平分发两种模式。
发布-订阅模式(Publish/Subscribe)生产者将消息发送到交换器,然后交换器绑定到多个队列,监听该队列的所有消费者消费消息。
路由模式(Routing)生产者将消息发送到direct交换器,它会把消息路由到那些binding key与routing key完全匹配的Queue中,这样就能实现消费者有选择性地去消费消息。
主题(Topic)模式类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。
4.RabbitMQ如何保证消息不丢失
消息丢失有3种可能:生产者丢消息,消费者丢消息,RabbitMQ丢消息
消息丢失从三个方面来解决:生产者确认机制、消费者确认机制和持久化。
1.开启生产者确认机制,确保生产者的消息能到达队列:(生产者丢消息)
- 只要消息成功发送到交换机之后,RabbitMQ就会发送一个ack给生产者(即使消息没有Queue接收,也会发送ack)。如果消息没有成功发送到交换机,就会发送一条nack消息,提示发送失败。
2.开启消费者确认机制:确保消费者成功消费(消费者丢消息)
- 消费者处理完消息后,需要发送一个ack回执给RabbitMQ,告知消息已成功处理。若在处理过程中发生异常,消费者可以发送nack回执,要求RabbitMQ重新投递该消息。这保证了即使消费者处理过程中出现问题,消息也不会丢失。
3.开启持久化功能:(RabbitMQ自己丢东西)
- 当生产者发布一条消息到交换机上时,Rabbit会先把消息写入持久化日志,然后才向生产者发送ack响应。一旦从队列中消费了一条消息的话并且做了确认,RabbitMQ会在持久化日志中移除这条消息。在消费消息前,如果RabbitMQ重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失
5.RabbitMQ重复消费问题如何解决/幂等性怎么保证?
消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。
生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。
解决方法:发送消息时让每个消息携带一个全局的唯一ID,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:
- 消费者获取到消息后先根据id去查询redis/db是否存在该消息
- 如果不存在,则正常消费,消费完毕后写入redis/db
- 如果存在,则证明消息被消费过,直接丢弃
其他解决办法
其实这个就是典型的幂等的问题,比如,redis分布式锁、数据库的锁都是可以的
6. 如何保证 RabbitMQ 消息的顺序性?
在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除。消费者愣是换了顺序给执行成删除、修改、增加,这样能行吗?肯定是不行的。不同的消息队列产品,产生消息错乱的原因,以及解决方案是不同的。RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的****消息,就会导致消费 binlog 执行到数据库的时候顺序错乱。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。
6.什么是死信队列?如何导致的?
消费失败的消息存放的队列。
消息消费失败的原因:
- 消息被拒绝并且消息没有重新入队(requeue=false)
- 消息超时未消费
- 达到最大队列长度
当消息在一个队列中变成死信 消息之后,它能被重新被发送到一个死信交换机中,绑定 死信交换机 的队列就称之为死信队列。
7.什么是延迟队列?怎么实现的?
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(消息存活时间)。生产者将消息发送到一个特定的交换器,该交换器将这些消息路由到一个具有TTL属性的队列。当消息在队列中的时间超过设定的TTL值时,消息会被发送到死信队列。消费者可以从死信队列中获取这些消息并进行相应的处理。
- 可以使用rabbitmq_delayed_message_exchange插件:这个插件允许交换器直接支持延迟消息的发布。使用此插件,生产者可以在发送消息时指定一个延迟时间,然后交换器会在该时间过后才将消息路由到相应的队列中。这样一来,消费者只能在延迟时间过后才能接收到消息。
8.消息堆积怎么解决
面试官:如果有100万消息堆积在MQ , 如何解决 ?
候选人:
我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的
第一:提高消费者的消费能力 ,可以使用多线程消费任务
第二:增加更多消费者,提高消费速度
使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
第三:扩大队列容积,提高堆积上限
可以使用RabbitMQ惰性队列,惰性队列的好处主要是
①接收到消息后直接存入磁盘而非内存
②消费者要消费消息时才会从磁盘中读取并加载到内存
③支持数百万条的消息存储
什么是RabbitMQ惰性队列
惰性队列是RabbitMQ从3.6.0版本开始引入的一种队列类型,它的主要特点是将消息尽可能存储在磁盘中,而不是内存中。这种设计使得惰性队列能够支持存储更多的消息,即使在消费者无法及时处理消息的情况下也能保持高效。具体来说,惰性队列的工作原理如下:
- 消息存储:当消息进入惰性队列时,它们会被直接存储到磁盘上,而不是像传统队列那样存储在内存中。
- 消息读取:只有在消费者请求消费消息时,这些消息才会从磁盘中读取出来并加载到内存中,以便消费者可以处理它们。
- 设计目标:惰性队列的设计目标是能够处理更长的队列,即支持存储数百万条消息,这对于需要处理大量数据或长时间运行的作业特别有用。
- 应用场景:当消费者因为维护、宕机或其他原因无法及时处理消息时,惰性队列可以有效地防止消息丢失,因为它可以将消息安全地存储在磁盘上,直到消费者准备好处理它们。
总的来说,惰性队列提供了一种在RabbitMQ中处理大量消息的有效方式,特别是在消费者可能无法立即处理这些消息的情况下。通过将消息存储在磁盘上,惰性队列确保了消息的安全性和可用性,同时也提高了系统的整体性能和可扩展性。
9.RabbitMQ高可用机制
我们当时项目在生产环境下,使用的集群,当时搭建是镜像模式集群,使用了3台机器。
镜像队列结构是一主多从,所有操作都是主节点完成,然后同步给镜像节点,如果主节点宕机后,镜像节点会替代成新的主节点,不过在主从同步完成前,主节点就已经宕机,可能出现数据丢失
面试官:那出现丢数据怎么解决呢?
候选人:
我们可以采用仲裁队列,与镜像队列一样,都是主从模式,支持主从数据同步,主从同步基于Raft协议,强一致。
并且使用起来也非常简单,不需要额外的配置,在声明队列的时候只要指定这个是仲裁队列即可
11.为什么选择RabbitMQ实习订单超时取消而不是用Kafka
选择RabbitMQ作为实习订单超时取消的解决方案,而不是使用Kafka,可能是基于以下几个考虑:
- 消息顺序性:RabbitMQ对发送到队列的消息的顺序性提供了较强的保证。在某些业务场景下,如订单状态的变更,需要保证消息的严格顺序,RabbitMQ能够确保消费者按照生产者发送消息的顺序来处理消息。这对于订单超时取消的场景非常重要,因为订单状态的更新需要严格按照流程来执行。
- 消息的可靠性:RabbitMQ支持消息的可靠传递,并且支持事务。这意味着在处理订单超时取消的场景时,可以确保每一个订单状态的变化都被可靠地传递给所有关心订单变化的系统,从而避免因为消息丢失导致的订单状态不一致的问题。
- 重试逻辑和死信交换:RabbitMQ内置了对重试逻辑和死信交换的支持。这意味着如果一个消息处理失败,RabbitMQ可以配置死信队列来处理这些未能成功处理的消息,而Kafka则需要用户自己来实现这部分逻辑。
- 吞吐量和性能:虽然Kafka在吞吐量和性能方面通常优于RabbitMQ,但是在处理订单超时取消这种对消息顺序性和可靠性要求较高的场景时,这些特性并不是首要考虑的因素。相反,RabbitMQ提供的高可靠性和顺序保证更加符合此类场景的需求。
- 系统的复杂性:Kafka作为一个分布式流处理平台,适合处理大规模的数据流。但是,如果业务场景不需要处理如此大规模的数据流,使用Kafka可能会引入不必要的复杂性。相比之下,RabbitMQ作为一个消息代理中间件,可能更适合规模较小、需要高可靠性和顺序性保证的场景。
综上所述,选择RabbitMQ作为实习订单超时取消的解决方案,是基于其对消息顺序性的保证、可靠性、内置的重试逻辑和死信交换以及对系统复杂性的考虑。当然,具体选择哪种技术还需要根据实际业务需求和技术栈来决定。
kafka
1.Kafka 都有哪些特点?
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
2.Kafka 的设计架构你知道吗?
Kafka 架构分为以下几个部分:
- Producer :消息生产者,就是向 kafka broker 发消息的客户端。
- Consumer :消息消费者,向 kafka broker 取消息的客户端。
- Topic :可以理解为一个队列,一个 Topic 又分为一个或多个分区,
- Consumer Group:这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个 topic 可以有多个 Consumer Group。
- Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
- Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker上,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 consumer,kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
- Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka。
3.请简述下你在哪些场景下会选择 Kafka?
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和 Flink
4.Kafka 的多副本机制了解吗?带来了什么好处?
还有一点我觉得比较重要的是 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
- Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
- Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
ISR、OSR、AR 是什么?
ISR:In-Sync Replicas 副本同步队列
OSR:Out-of-Sync Replicas
AR:Assigned Replicas 所有副本
ISR是由leader维护,follower从leader同步数据有一些延迟(具体可以参见 图文了解 Kafka 的副本复制机制),超过相应的阈值会把 follower 剔除出 ISR, 存入OSR(Out-of-Sync Replicas )列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR
ISR详解
ISR的意思是in-sync replica,就是需要同步复制保存的follower
其中分区副本有很多的follower,分为了两类,一个是ISR,与leader副本同步保存数据,另外一个普通的副本,是异步同步数据,当leader挂掉之后,会优先从ISR副本列表中选取一个作为leader,因为ISR是同步保存数据,数据更加的完整一些,所以优先选择ISR副本列表
5.kafka如何保证消息不丢失
嗯,这个保证机制很多,在发送消息到消费者接收消息,在每个阶段都有可能会丢失消息,所以我们解决的话也是从多个方面考虑
第一个是生产者发送消息的时候,可以使用异步回调发送,如果消息发送失败,我们可以通过回调获取失败后的消息信息,可以考虑重试或记录日志,后边再做补偿都是可以的。同时在生产者这边还可以设置消息重试,有的时候是由于网络抖动的原因导致发送不成功,就可以使用重试机制来解决
第二个在broker中消息有可能会丢失,我们可以通过kafka的复制机制来确保消息不丢失,在生产者发送消息的时候,可以设置一个acks,就是确认机制。我们可以设置参数为all,这样的话,当生产者发送消息到了分区之后,不仅仅只在leader分区保存确认,在follwer分区也会保存确认,只有当所有的副本都保存确认以后才算是成功发送了消息,所以,这样设置就很大程度了保证了消息不会在broker丢失
第三个有可能是在消费者端丢失消息,kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了
6.Kafka中消息的重复消费问题如何解决的?
kafka消费消息都是按照offset进行标记消费的,消费者默认是自动按期提交已经消费的偏移量,默认是每隔5s提交一次,如果出现重平衡的情况,可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量,改为手动提交,当消费成功以后再报告给broker消费的位置,这样就可以避免消息丢失和重复消费了
为了消息的幂等,我们也可以设置唯一主键来进行区分,或者是加锁,数据库的锁,或者是redis分布式锁,都能解决幂等的问题
7.Kafka是如何保证消费的顺序性
kafka默认存储和消费消息,是不能保证顺序性的,因为一个topic数据可能存储在不同的分区中,每个分区都有一个按照顺序的存储的偏移量,如果消费者关联了多个分区不能保证顺序性
如果有这样的需求的话,我们是可以解决的,把消息都存储同一个分区下就行了,有两种方式都可以进行设置,第一个是发送消息时指定分区号,第二个是发送消息时按照相同的业务设置相同的key,因为默认情况下分区也是通过key的hashcode值来选择分区的,hash值如果一样的话,分区肯定也是一样的
8.Kafka的高可用机制
嗯,主要是有两个层面,第一个是集群,第二个是提供了复制机制
kafka集群指的是由多个broker实例组成,即使某一台宕机,也不耽误其他broker继续对外提供服务
复制机制是可以保证kafka的高可用的,一个topic有多个分区,每个分区有多个副本,有一个leader,其余的是follower,副本存储在不同的broker中;所有的分区副本的内容是都是相同的,如果leader发生故障时,会自动将其中一个follower提升为leader,保证了系统的容错性、高可用性
9.Kafka数据清理机制
嗯,了解过~~
Kafka中topic的数据存储在分区上,分区如果文件过大会分段存储segment
每个分段都在磁盘上以索引(xxxx.index)和日志文件(xxxx.log)的形式存储,这样分段的好处是,第一能够减少单个文件内容的大小,查找数据方便,第二方便kafka进行日志清理。
在kafka中提供了两个日志的清理策略:
第一,根据消息的保留时间,当消息保存的时间超过了指定的时间,就会触发清理,默认是168小时( 7天)
第二是根据topic存储的数据大小,当topic所占的日志文件大小大于一定的阈值,则开始删除最久的消息。这个默认是关闭的
这两个策略都可以通过kafka的broker中的配置文件进行设置
10.Kafka中实现高性能的设计有了解过嘛
Kafka 高性能,是多方面协同的结果,包括宏观架构、分布式存储、ISR 数据同步、以及高效的利用磁盘、操作系统特性等。主要体现有这么几点:
消息分区:不受单台服务器的限制,可以不受限的处理更多的数据
顺序读写:磁盘顺序读写,提升读写效率
页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问
零拷贝:减少上下文切换及数据拷贝
消息压缩:减少磁盘IO和网络IO
分批发送:将消息打包批量发送,减少网络开销
11.Kafka消费者分区分配策略
Kafka主要支持以下三种消费者分区分配策略:
- RoundRobin(轮询): 在这种策略中,如果当前没有消费者的分区会均匀地分配给各个消费者。如果有新的消费者加入或者有消费者离开,那么分区会重新进行均匀分配。这种策略确保了分区的均匀分布,但可能会导致消费者之间的负载不均衡,因为每个分区的数据量和处理时间可能会有所不同。
- Range(范围): 在这种策略中,每个主题的分区会被划分为多个范围,每个消费者负责一个或多个范围。这种策略可以保证具有相同键的消息总是被相同的消费者处理,但是当消费者数量发生变化时,需要进行分区的重平衡操作,这可能会导致一些性能问题。
- Sticky(粘性): 在这种策略中,每个分区都会被分配给上次消费该分区的消费者。这种策略可以提高消息处理的效率,因为它减少了分区重平衡的操作。但是,如果某个消费者宕机,那么它所负责的分区将需要被重新分配。
- Rebalance(重平衡): 这不是一种具体的分配策略,而是一个过程。当消费者组中的消费者数量发生变化时,Kafka会自动触发重平衡过程,重新分配分区到各个消费者。这个过程是必要的,以确保所有的分区都能被正确地消费。
注意,以上的分配策略并不是互斥的,而是可以结合使用的。例如,你可以首先使用Range策略对分区进行初步分配,然后使用Sticky策略来优化分区的分配。