消息丢失的场景

RabbitMQ丢失消息的3种情况:

image-20250324142239134

  1. 生产者丢失消息
    • 场景: 生产者发送消息后,RabbitMQ 服务器崩溃或网络中断,导致消息未到达 RabbitMQ。
    • 解决方案: 使用 Confirm 模式,确保消息成功到达 RabbitMQ
  2. RabbitMQ 服务器丢失消息

    • 场景: RabbitMQ 服务器崩溃,且消息未持久化。
    • 解决方案: 将队列和消息都设置为持久化(durable=truedeliveryMode=2)。
  3. 消费者丢失消息

    • 场景: 消费者处理消息后,未发送 ACK,且消费者崩溃,导致 RabbitMQ 认为消息未被处理。
    • 解决方案 使用 手动 ACK,确保消息处理成功后再发送 ACK。
  4. 消息过期
    • 场景: 消息设置了 TTL(Time-To-Live),在过期后会被自动删除。
    • 解决方案: 合理设置消息的 TTL,或监控过期消息。
  5. 队列溢出
    • 场景: 队列达到最大长度限制,新消息会被丢弃。
    • 解决方案: 监控队列长度,合理设置队列的最大长度。

RabbitMQ 提供了以下机制来确保消息传输的可靠性:

  1. 消息持久化

    • 队列持久化: 将队列设置为持久化(durable=true),即使 RabbitMQ 服务器重启,队列也不会丢失。

    • 消息持久化 将消息设置为持久化(deliveryMode=2),即使 RabbitMQ 服务器重启,消息也不会丢失。

  2. 生产者确认机制(Publisher Confirm)

    • Confirm 模式: 生产者发送消息后,RabbitMQ 会返回一个确认(ack),表示消息已经成功到达 RabbitMQ 服务器。

    • Return 模式: 如果消息无法路由到队列,RabbitMQ 会返回一个 return,生产者可以处理这种异常情况。

  3. 消费者确认机制(Consumer Ack)

    • 手动 ACK: 消费者处理完消息后,手动发送一个确认(ack)给 RabbitMQ,RabbitMQ 才会将消息从队列中移除。

    • NACK 或 Reject: 如果消费者处理消息失败,可以发送 nackreject,RabbitMQ 会将消息重新入队或丢弃。

  4. 事务机制

    • 事务模式: 生产者可以使用事务机制确保消息的原子性,但性能较差,通常不推荐使用。

如何确保消息不丢失

image-20250324143814782

为了确保消息不丢失,可以采取以下措施:

  1. 生产者端

    • 使用 Confirm 模式,确保消息成功到达 RabbitMQ。

    • 实现消息重试机制,在消息发送失败时重试。

  2. RabbitMQ 端

    • 将队列和消息都设置为持久化。

    • 使用镜像队列(Mirrored Queues),确保消息在多个节点上备份。

  3. 消费者端

    • 使用 手动 ACK,确保消息处理成功后再发送 ACK。

    • 实现消息重试机制,在消息处理失败时重试。

  4. 监控和报警

    • 监控 RabbitMQ 的状态,包括队列长度、消息堆积情况等。

    • 设置报警机制,及时发现和处理异常情况。

实现

parent:org.springframework.boot:spring-boot-starter-parent:2.7.10

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>

config:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    // 定义队列
    @Bean
    public Queue myQueue() {
        return QueueBuilder.durable("myQueue") // 持久化队列
                // 将处理失败的消息转移到死信队列,避免阻塞正常队列
                .withArgument("x-dead-letter-exchange", "myDLX") // 绑定死信交换机
                .withArgument("x-dead-letter-routing-key", "myDLRoutingKey") // 绑定死信路由键
                .build();
    }

    // 定义交换机
    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange("myExchange", true, false);
    }

    // 绑定队列和交换机
    @Bean
    public Binding binding(Queue myQueue, DirectExchange myExchange) {
        return BindingBuilder.bind(myQueue)
                .to(myExchange)
                .with("myRoutingKey");
    }

    // 定义死信队列
    @Bean
    public Queue myDLQueue() {
        return QueueBuilder.durable("myDLQueue") // 持久化死信队列
                .build();
    }

    // 定义死信交换机
    @Bean
    public DirectExchange myDLX() {
        return ExchangeBuilder.directExchange("myDLX")
                .durable(true) // 持久化死信交换机
                .build();
    }

    // 绑定死信队列和死信交换机
    @Bean
    public Binding dlBinding(Queue myDLQueue, DirectExchange myDLX) {
        return BindingBuilder.bind(myDLQueue)
                .to(myDLX)
                .with("myDLRoutingKey");
    }
}

死信队列(Dead Letter Queue, DLQ):是 RabbitMQ 的一种容错机制,用于接收无法被正常消费的消息(即“死信”)。可以将无法处理的消息从主队列分离,避免阻塞正常业务。

  • 持久化机制:默认继承原消息属性。原始消息设置了 deliveryMode=2(持久化),则死信消息自动持久化到磁盘。
  • 超时机制(TTL):死信消息本身没有默认TTL,除非显式设置,否则死信消息会永久保留在队列中。

配置 confirm 回调和 return 回调:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitMQTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 开启强制标志(确保消息路由失败时能触发ReturnCallback)
        rabbitTemplate.setMandatory(true);
        // 设置Confirm回调
        rabbitTemplate.setConfirmCallback(confirmCallback());
        // 设置Return回调(新版本推荐使用setReturnsCallback)
        rabbitTemplate.setReturnsCallback(returnsCallback());
        return rabbitTemplate;
    }

    // Confirm 回调
    private RabbitTemplate.ConfirmCallback confirmCallback() {
        return (correlationData, ack, cause) -> {
            if (correlationData == null) {
                log.warn("Confirm回调收到null correlationData");
                return;
            }

            if (ack) {
                log.info("消息发送成功: [ID={}]", correlationData.getId());
            } else {
                log.error("消息发送失败: [ID={}] [原因:{}]", correlationData.getId(), cause);
                // 这里可以添加失败处理逻辑,如记录到数据库等待重试
            }
        };
    }

    // Return 回调处理路由失败(当消息无法路由到队列时触发)
    private RabbitTemplate.ReturnsCallback returnsCallback() {
        return returned -> {
            Message message = returned.getMessage();
            MessageProperties properties = message.getMessageProperties();

            log.error("消息无法路由到队列: [Body={}] [原因:{}] [Exchange={}] [RoutingKey={}] [Headers={}]",
                    new String(message.getBody()),
                    returned.getReplyText(),
                    returned.getExchange(),
                    returned.getRoutingKey(),
                    properties.getHeaders());

            // 这里可以添加路由失败处理逻辑,如记录到数据库等待人工干预
        };
    }
}

生产者:

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Slf4j
@Service
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        // 1. 构建消息属性(设置消息持久化)
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        // 2. 设置消息头(包括重试次数和唯一 correlationId)
        Map<String, Object> headers = new HashMap<>();
        // 添加 retryCount 头信息,初始值为 0
        headers.put("retryCount", 0);
        String correlationId = UUID.randomUUID().toString(); // 全局唯一ID
        headers.put("correlationId", correlationId);
        headers.put("traceId", UUID.randomUUID().toString()); // 用于全链路追踪
        properties.setHeaders(headers);

        // 3. 创建消息和 CorrelationData
        Message msg = new Message(message.getBytes(), properties);
        CorrelationData correlationData = new CorrelationData(correlationId);

        // 4. 发送消息(携带 CorrelationData)
        rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", msg, correlationData);

        log.debug("消息已发送: [ID={}] [TraceID={}]", correlationId, headers.get("traceId"));
    }
}

CorrelationData 是 Spring AMQP 提供的一个消息关联标识对象,主要用于 Publisher Confirm 模式下关联发送的消息和确认结果。通过 id 字段(通常是业务ID或UUID)唯一标识消息。

几个ID:

  • correlationId:关联ID。用于生产者确认(Publisher Confirm)模式,关联发送的消息和服务器确认回执。在需要幂等性处理的场景中,用于识别重复消息。
  • traceId:追踪ID。用于全链路追踪,跨服务、跨系统的调用链追踪;在分布式系统中,通过TraceId将不同服务的日志串联起来。

消费者:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Service
public class MessageConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final int MAX_RETRY_COUNT = 3; // 最大重试次数

    @RabbitListener(queues = "myQueue")
    public void handleMessage(Message message, Channel channel) throws IOException {
        try {
            // 1. 解析消息
            String msgBody = new String(message.getBody());
            Map<String, Object> headers = message.getMessageProperties().getHeaders();
            log.info("收到消息: [Body={}] [TraceID={}]",
                    msgBody, headers.get("traceId"));

            // 2. 模拟业务处理失败
            if (msgBody.contains("error")) {
                throw new RuntimeException("模拟业务处理失败");
            }

            // 3. 处理成功,手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 4. 处理失败逻辑
            handleRetry(message, channel, e);
        }
    }

    private void handleRetry(Message message, Channel channel, Exception e) throws IOException {
        MessageProperties originalProps = message.getMessageProperties();
        Map<String, Object> headers = originalProps.getHeaders();
        // 获取消息头中的重试次数
        int retryCount = (int) headers.getOrDefault("retryCount", 0);

        if (retryCount >= MAX_RETRY_COUNT) {
            // 超过最大重试次数,转入死信队列
            channel.basicNack(originalProps.getDeliveryTag(), false, false);
            log.error("消息处理失败,已达到最大重试次数,转移到死信队列: [TraceID={}]", headers.get("traceId"));
        } else {
            // 未超最大重试次数,重新入队
            headers.put("retryCount", retryCount + 1); // 更新重试次数
            // 创建新消息属性
            MessageProperties newProps = new MessageProperties();
            // 1) 拷贝持久化设置
            MessageDeliveryMode receivedMode = originalProps.getReceivedDeliveryMode();
            if (receivedMode != null) {
                newProps.setDeliveryMode(receivedMode);
            } else {
                // 默认设置为持久化(安全策略)
                newProps.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            }
            // 2) 拷贝其他必要属性
            newProps.setHeaders(new HashMap<>(originalProps.getHeaders())); // 保留原始 correlationId 和 traceId
            newProps.setContentType(originalProps.getContentType());
            newProps.setContentEncoding(originalProps.getContentEncoding());
            newProps.setCorrelationId(originalProps.getCorrelationId());
            newProps.setReplyTo(originalProps.getReplyTo());

            // 发布新消息到队列(必须传递 CorrelationData)
            rabbitTemplate.convertAndSend(
                    "myExchange",
                    "myRoutingKey",
                    new Message(message.getBody(), newProps),
                    new CorrelationData(headers.get("correlationId").toString()) // 防止生产者 Confirm 回调出现 NPE
            );

            // 确认原消息已处理(即使失败)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.warn("消息重试中: [次数={}] [TraceID={}]",
                    retryCount + 1,
                    headers.get("traceId"));
        }
    }

    // 指数退避算法
    private long calculateBackoffDelay(int retryCount) {
        long baseDelay = 5000; // 基础延迟5秒
        long maxDelay = 60000; // 最大延迟60秒
        return Math.min(baseDelay * (1 << retryCount), maxDelay);
    }

    // 增加死信处理
    @RabbitListener(queues = "myDLQueue")
    public void handleDeadLetter(Message message, Channel channel) throws IOException {
        try {
            String msg = new String(message.getBody());
            log.error("收到死信消息: {}", msg);

            // 可选操作:
            // 1. 记录到数据库(如MySQL、Elasticsearch)
            // 2. 发送告警(邮件/短信/钉钉)
            // 3. 人工介入或自动修复

            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("死信消息已确认处理完成");
        } catch (Exception e) {
            // 处理失败时拒绝消息(避免死循环)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.error("死信消息处理失败: {}", e.getMessage());
        }
    }
}
  • CorrelationData 传递:重试时从消息 Header 中读取原始 correlationId 并重建
  • 消息追踪:生产者在消息头中添加追踪ID(traceId)字段,通过唯一ID串联消息经过的所有服务(生产者→MQ→消费者→其他服务),以支持完整的消息生命周期追踪。消费者重试逻辑中需要保持原始 traceId。
  • 消费者重试机制的逻辑是 重建新消息+ACK原消息 的设计,主要是因为如果直接 basicNack(requeue=true) 会导致消息头(如 retryCount )无法更新。
  • originalProps.getReceivedDeliveryMode()

controller:

@RestController
public class TestController {
    @Autowired
    private MessageProducer messageProducer;
    @GetMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        messageProducer.sendMessage(message);
        return new ResponseEntity<>("消息已发送: " + message, HttpStatus.OK);
    }
}

配置信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 开启生产者确认模式
    # NONE-禁用 Confirm 模式
    # SIMPLE-启用 Confirm 模式,并同步等待确认结果
    # CORRELATED-启用 Confirm 模式,并异步处理确认结果
    publisher-confirm-type: correlated
    # 开启生产者返回模式
    publisher-returns: true
    # 开启消费者手动 ACK
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: false # 禁用Spring的自动重试,使用我们自定义的重试逻辑

启动类:

@SpringBootApplication
@EnableRabbit // 启用 RabbitMQ 监听功能
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

测试

  • 访问 http://localhost:8080/send?message=Hello RabbitMQ,发送正常消息
  • 访问 http://localhost:8080/send?message=error,发送会失败的消息

失败的消息大致生命周期:

sequenceDiagram
    participant Broker
    participant Consumer

    Broker->>Consumer: 消息A(retryCount=0)
    Consumer->>Broker: ACK消息A + 发送消息B(retryCount=1)
    Broker->>Consumer: 消息B(retryCount=1)
    Consumer->>Broker: ACK消息B + 发送消息C(retryCount=2)
    Broker->>Consumer: 消息C(retryCount=2)
    Consumer->>Broker: NACK消息C → 转入死信队列

扩展

优化重试机制

消费者重试不建议立即重试,而是应该采用延迟重试策略。很多情况下立即重试仍会失败,浪费资源;消息持续失败导致高频重试,CPU/内存飙升,拖垮整个服务。

立即重试的问题分析:

问题 风险 解决方案
资源风暴 消息处理失败后立即重试,可能引发瞬时高负载 引入指数退避延迟
无效重试 若失败原因是下游服务短暂不可用,立即重试大概率再次失败 逐步增加重试间隔
消息堆积 高频重试会占用队列资源,阻塞其他消息处理 使用独立延迟队列

常见的重试策略:

  1. 指数退避重试 (Exponential Backoff)
    • 重试间隔随时间指数增长(如1s, 2s, 4s, 8s…)
    • 防止短时间内大量重试导致系统雪崩
  2. 固定间隔重试
    • 每次重试之间保持固定时间间隔
    • 比立即重试更温和,但不如指数退避灵活
  3. 死信队列(DLX) + 延迟队列
    • 处理失败的消息先进入死信队列
    • 通过TTL或插件实现延迟重试
    • 这是RabbitMQ中最常见的生产级方案

接下来基于指数退避重试机制,结合延迟交换机(Delayed Exchange)优化重试机制。

消息流转大致流程:

sequenceDiagram
    participant Producer
    participant MainQueue
    participant Consumer
    participant RetryQueue
    participant DLQ

    Producer->>MainQueue: 发送消息
    Consumer->>MainQueue: 消费消息
    alt 处理成功
        Consumer->>MainQueue: ACK
    else 处理失败且可重试
        Consumer->>RetryQueue: 转发消息(带延迟)
        RetryQueue-->>MainQueue: TTL到期后重新投递
    else 彻底失败
        Consumer->>DLQ: NACK转入死信
    end

需安装 rabbitmq_delayed_message_exchange 插件,声明 x-delayed-message 类型交换机。RabbitMQ官方插件支持每条消息独立延迟:

# linux
# 进入RabbitMQ容器执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 验证插件列表应包含
rabbitmq-plugins list | grep delay

# windows
rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange
rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat list | find "delay"

修正RabbitMQ配置类(确保交换机、队列、绑定关系一致)

@Configuration
public class RabbitMQConfig {
    // 主业务队列(保持不变)
    @Bean public Queue myQueue() { /* ... */ }
    @Bean public DirectExchange myExchange() { /* ... */ }
    @Bean public Binding binding() { /* ... */ }
    // 死信队列和交换机(保持不变)
    @Bean public Queue myDLQueue() { /* ... */ }
    @Bean public DirectExchange myDLX() { /* ... */ }
    @Bean public Binding dlBinding() { /* ... */ }

    // 延迟交换机(插件支持)(新增)
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct"); // 底层仍用direct路由
        return new CustomExchange("myDelayExchange", "x-delayed-message", true, false, args);
    }

    // 绑定延迟交换机到延迟队列(新增)
    @Bean
    public Binding delayBinding(Queue myQueue, CustomExchange delayExchange) {
        return BindingBuilder.bind(myQueue)
                .to(delayExchange)
                .with("myRoutingKey") // 和主队列的 routingKey 一致
                .noargs();
    }
}

延迟消息不会 立即 进入队列,而是由 x-delayed-message 插件 暂存,直到延迟时间到期后才投递,会导致 ReturnCallback 可能在消息被暂存时误触发。所以需要修改 RabbitMQTemplateConfig#returnsCallback() 方法:

private RabbitTemplate.ReturnsCallback returnsCallback() {
    return returned -> {
        // 如果是延迟交换机,忽略 ReturnCallback
        if ("myDelayExchange".equals(returned.getExchange())) {
            log.debug("延迟消息暂存,忽略 ReturnCallback");
            return;
        }

        Message message = returned.getMessage();
        MessageProperties properties = message.getMessageProperties();

        log.error("消息无法路由到队列: [Body={}] [原因:{}] [Exchange={}] [RoutingKey={}] [Headers={}]",
                new String(message.getBody()),
                returned.getReplyText(),
                returned.getExchange(),
                returned.getRoutingKey(),
                properties.getHeaders());

        // 这里可以添加路由失败处理逻辑,如记录到数据库等待人工干预
    };
}

生产者无需修改,消费者需要修改 handleRetry() 方法,将重试消息发布到延迟交换机而不是直接重发给主队列:

private void handleRetry(Message message, Channel channel, Exception e) throws IOException {
    // ... 原有业务逻辑 ...

    if (retryCount >= MAX_RETRY_COUNT) {
        // ... 原有业务逻辑 ...
    } else {
        // ... 原有业务逻辑 ...

        // 计算动态延迟(指数退避:5s, 10s, 20s...)
        long delayMs = calculateBackoffDelay(retryCount);

        // 设置重试相关属性
        newProps.setHeader("x-delay", delayMs); // 添加延迟头
        newProps.setHeader("retryCount", retryCount + 1); // 更新重试次数

        // 发布到延迟交换机
        rabbitTemplate.convertAndSend(
                "myDelayExchange", // 使用延迟交换机
                "myRoutingKey", // 使用主队列的 routingKey
                new Message(message.getBody(), newProps),
                m -> {
                    // 设置消息级延迟(单位:毫秒)
                    m.getMessageProperties().setHeader("x-delay", delayMs);
                    return m;
                },
                new CorrelationData(headers.get("correlationId").toString())
        );

        // 确认原消息
        channel.basicAck(originalProps.getDeliveryTag(), false);
        log.warn("消息将在 {}ms 后重试 (第{}次)", delayMs, retryCount + 1);
    }
}
// 指数退避算法
private long calculateBackoffDelay(int retryCount) {
    long baseDelay = 5000; // 基础延迟5秒
    long maxDelay = 60000; // 最大延迟60秒
    return Math.min(baseDelay * (1 << retryCount), maxDelay);
}

其他

RabbitMQ命令

查看持久化配置

消息持久化需要三个条件同时满足:

  • 交换机持久化
  • 队列持久化
  • 消息发布时设置 properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
# 1、列出所有交换机及其属性
rabbitmqctl list_exchanges name type durable auto_delete arguments
# durable 列为 true 表示交换机设置了持久化

# 2、列出所有队列及其属性
rabbitmqctl list_queues name durable auto_delete arguments
# durable 列为 true 表示队列设置了持久化

# 3、查看消息持久化 (取决于发布时设置的delivery_mode:1-非持久化、2-持久化)
# 获取队列中的消息详情:properties.delivery_mode (需要安装rabbitmq_management插件)
# linux
rabbitmqadmin get queue=<queue_name> count=1
# windows
python rabbitmqadmin get queue=myQueue count=1 --format=raw_json

绑定关系验证

# 列出所有交换机
rabbitmqadmin list exchanges name type arguments
# 列出队列
rabbitmqadmin list queues name messages
# 列出绑定关系
rabbitmqadmin list bindings

windows 需要从 http://localhost:15672/cli/ 下载 rabbitmqadmin,并且用 python rabbitmqadmin ... 运行命令。

消费者重试时丢失了deliveryMode=2

在上面代码中,重试机制里面有段代码:

// 1) 拷贝持久化设置
MessageDeliveryMode receivedMode = originalProps.getReceivedDeliveryMode();
if (receivedMode != null) {
    newProps.setDeliveryMode(receivedMode);
} else {
    // 默认设置为持久化(安全策略)
    newProps.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
}

这里如果只是通过 message.getMessageProperties() 获取原消息属性 originalProps 直接设置给新消息属性 newProps,这样子就会导致新消息会丢失 deliveryMode=2 (持久化)配置信息,最后导致消息不会进行持久化了。

原因:

  • 当生产者发送消息时,deliveryMode=2(持久化)是 AMQP 协议层的属性,保存在消息的 BasicProperties 中。
  • Spring AMQP 在 @RabbitListener 中会将原始 AMQP 消息转换为 Message 对象时,会丢失部分协议层属性(包括 deliveryMode)。

在 Spring AMQP 2.3 及以上版本中,可以直接通过 MessagePropertiesgetReceivedDeliveryMode() 方法获取生产者设置的原始 deliveryMode 值。


YOLO