RabbitMQ常见面试题

RabbitMQ如何保证消息不丢失

RabbitMQ消息丢失的三种情况

image-20251225114230266

消息丢失的三种情况:

  1. 消息未到达交换机
  2. 消息到达交换机但是未到达队列,或到达队列但是未莱得及消费
  3. 消费者未接收到消息

解决以上问题,可以从以下三个方面解决

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

image-20251225114948932

消息失败之后如何处理?

  • 回调方法即时重发
  • 记录日志
  • 保留数据到数据库,定时重发,成功后删除表中数据

消息持久化

MQ默认是内存存储的,开启持久化功能可以确保缓存到MQ中的消息不丢失。

  1. 交换机持久化 (Exchange Persistence)

    @Bean
    public DirectExchange simpleExchange() {
        // 三个参数:交换机名称、是否持久化、当没有 queue 与其绑定时是否自动删除
        return new DirectExchange("simple.direct", true, false);
    }
    
  2. 队列持久化 (Queue Persistence)

    @Bean
    public Queue simpleQueue() {
        // 使用 QueueBuilder 构建队列,durable 就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }
    
  3. 消息持久化 (Message Persistence)

    SpringAMQP 中的消息默认是持久化的,可以通过 MessageProperties 中的 DeliveryMode 来指定。

    Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化
        .build();
    

说明:

  • 交换机持久化:确保 MQ 重启后交换机依然存在。
  • 队列持久化:确保 MQ 重启后队列依然存在。
  • 消息持久化:确保 MQ 重启后,队列中的消息依然存在(前提是交换机和队列也必须持久化)。

消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息之后可以想MQ发送ACK回执。MQ收到ACK回执之后才删除该消息。而SpingAMQP则允许配置三种确认方式:

  • manual: 手动ack,需要在业务代码结束后,调用api发送ack。
  • auto: 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none: 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

image-20251225120039828

我们可以使用Spring的retry机制,在消费者出现异常的时候利用本地重试,设置重试次数,当次数达到上限之后,如果消息依然没有成功被消费,那么就把消息投递到异常交换机,然后交由人工处理。

RabbitMQ如何防止消息重复消费

1. 利用数据库唯一索引(Unique Index)

这是最简单直接的方法。如果你的业务是将数据写入数据库,可以利用数据库的强一致性。

  • 做法:在表中设置一个业务相关的唯一键(如 order_id)。
  • 效果:当重复的消息到达时,执行 INSERT 操作会触发唯一索引冲突,数据库报错。你只需要捕获异常并忽略即可。

2. 利用 Redis 的原子性操作(去重表机制)

如果你的业务不是简单的数据库写入(比如调用第三方 API 或发送邮件),可以使用 Redis 来做拦截。

  • 流程
    1. 每条消息在生产者发送时,附带一个全局唯一 ID(如 UUID 或雪花 ID)。
    2. 消费者接收到消息后,先去 Redis 查一下这个 ID 是否存在:
      • 不存在:说明没消费过。将 ID 存入 Redis(可设置过期时间),然后执行业务逻辑。
      • 存在:说明是重复消息,直接丢弃。
  • 注意:为了防止“逻辑执行了一半挂掉”导致后续无法重试,建议结合 Lua 脚本分布式锁 来保证查重与写入的原子性。

3. 状态机幂等(Status Machine)

适用于有明显状态流转的业务(如订单系统)。

  • 做法:在更新数据时,带上状态条件。

    UPDATE orders SET status = 'PAID' WHERE id = 123 AND status = 'UNPAID';

  • 效果:如果消息重复触发,第二次执行时 status 已经是 PAID 了,SQL 不会更新任何行,从而保证了业务正确性。


4. RabbitMQ 的 redelivered 标志位

RabbitMQ 协议层面提供了一个辅助工具。

  • 原理:当消息是第二次及以上投递给消费者时,RabbitMQ 会在消息头中将 redelivered 字段设为 true
  • 策略
    • 如果 redeliveredfalse:直接消费。
    • 如果 redeliveredtrue:进入“去重检查”逻辑(如查 Redis),这样可以减少对外部缓存的压力。

实际业务当中,一般都是采用的数据库唯一索引来保证的,无论咋操作,你的数据最后都是落库。

RabbitMQ中死信交换机(延时队列)

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信:

  • 消息被拒绝:消费者使用了 basic.rejectbasic.nack,且设置 requeue=false
  • 消息过期:消息达到了存活时间(TTL)。
  • 队列达到最大长度:最早的消息会被丢弃或变成死信。

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就被称为死信交换机(Dead Letter Exchange,简称DLX)。

image-20251225142508820

代码示例:

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
            .ttl(10000) // 设置队列的超时时间,10秒
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .build();
}

TTL

TTL,也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则会变为死信,TTL 超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间

image-20251225142914115

发信的时候可以携带一个过期时间,队列本身也可以设置一个过期时间,两者以较小的为准。到时间就自动成为死信。

// 创建消息
Message message = MessageBuilder
        .withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
        .setExpiration("5000")
        .build();

// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);

延迟队列插件

DelayExchange插件,需要安装在 RabbitMQ 中

RabbitMQ 有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

rabbitmq_delayed_message_exchange

A plugin that adds delayed-messaging (or scheduled-messaging) to RabbitMQ.

DelayExchange 的本质还是官方的三种交换机,只是添加了延迟功能。 因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定 delayed 属性为 true 即可。

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "delay.queue", durable = "true"),
    exchange = @Exchange(name = "delay.direct", delayed = "true"),
    key = "delay"
))
public void listenDelayedQueue(String msg){
    log.info("接收到 delay.queue 的延迟消息:{}", msg);
}

RabbitMQ如果有100万消息积压如何解决

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中消息积压,直到队列存储的消息达到上限。之后发送的消息就会成为死信,可能被丢弃,这就是消息堆积的问题。

解决消息堆积一般有三种思路:

  1. 增加更多消费者,提高消费速度
  2. 在消费者内开启线程池来消费加快处理速度
  3. 扩大队列容量,提高堆积上限

惰性队列

文字内容: 惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
  1. 通过 Bean 声明惰性队列:

    @Bean
    public Queue lazyQueue(){
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() // 开启 x-queue-mode 为 lazy
                .build();
    }
    
  2. 通过注解声明并监听惰性队列:

    @RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String msg){
        log.info("接收到 lazy.queue 的消息:{}", msg);
    }
    

RabbitMQ的高可用机制

  • 在生产环境下,使用集群来保证高可用性
  • 普通集群、镜像集群、仲裁集群

普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

一般不使用这个集群方式

镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个 mq 的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点
  • 一个队列的主节点可能是另一个队列的镜像节点。
  • 所有操作都是主节点完成,然后同步给镜像节点。
  • 主宕机后,镜像节点会替代成新的主。

image-20251225151455449

仲裁队列

仲裁队列:仲裁队列是 3.8 版本以后才有新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于 Raft 协议,强一致

Java (Spring AMQP) 中声明仲裁队列的代码示例:

@Bean
public Queue quorumQueue() {
    return QueueBuilder
            .durable("quorum.queue") // 持久化
            .quorum() // 仲裁队列
            .build();
}