使用关系型数据库实现

TODO

使用Redis实现

主要思路如下:

  1. 生产消息时,将消息的值作为 ZSET 的value,将当前时间加上delay时间作为score, zadd 添加到ZSET中。
  2. 消费消息时,使用 zrangebyscore 从ZSET中获取score在当前时间之前(0 < score <= 当前时间戳)的消息进行消费。可以每隔一定时间查询一次,也可以使用Redis的Keyspace Notification功能接收消息到达的通知进行消费。
  3. 如果一定时间内消息未消费,由于其score值仍然小于当前时间,所以会被重复消费,实现了消息的重试机制。
  4. 消息被成功消费后,需要从ZSET中移除,避免重复消费,使用 zrem 移除消息。

实现

依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;

@Configuration
public class RedisConfig {
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName("localhost");
        config.setPort(6379);
        return new LettuceConnectionFactory(config);
    }
}

Redisson

使用MQ实现

RabbitMQ

RabbitMQ的延时消息主要有两个解决方案:

  • 消息的TTL + 死信Exchange
  • RabbitMQ Delayed Message Plugin

消息的TTL+死信Exchange

存活时间TTL

TTL 全称为:time to live,一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息会被自动删除。

RabbitMQ 支持两种 TTL 设置:

  • 对消息本身设置存活时间,每条消息的存活时间可以灵活设置为不同的存活时间。
  • 对传递的队列设置存活时间,每条传到到队列的过期时间都一致。

如果同时配置了队列的TTL和消息的TTL,将会使用较小的那个值。

当消息过期还没有被消费,此时消息会变成死信消息(dead letter),这是实现延迟队列的关键。死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

消息变为死信的条件:

  • 消息被拒绝(通过basic.reject 或者 back.nack),并且设置 requeue=false
  • 消息的过期时间到期了
  • 队列达到最大长度,消息被丢弃

死信交换机 DLX

DLX(Dead Letter Exchanges)意思为死信交换机。死信交换机和普通交换机没什么区别,不同的是死信交换机会绑定在其他队列上,当队列的消息变成死信消息后,死信消息会发送到死信交换上。

消息变成死信消息之后,它不会立即被删除,首先它要看有没有对应的死信交换机,如果有绑定的死信交换机,消息就会从发送到对应的死信交换机上。

队列绑定死信交换机需要两个参数:

  • x-dead-letter-exchange:绑定的死信交换机名称,必须
  • x-dead-letter-routing-key:死信交换机转发到死信队列的路由键,可选

死信交换机和普通交换机的区别就是死信交换机的ExchangeroutingKey作为绑定参数,绑定在其他队列上。

步骤

一个延时消息的流程如下图:

image-20230523220318222

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型【Direct、Fanout、Topic】。

  • 生产者将带有TTL的消息发送给交换机,由交换机路由到队列中;
  • 队列由于没有消费,消息一直停留在队列中,一直等到消息超时,变成死信消息;
  • 死信消息转发到死信交换机再路由到死信队列上,最后给消费者消费。
创建死信队列

坐标:

<!--消息队列相关依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

spring:
  rabbitmq:
    host: localhost #rabbitmq的连接地址
    port: 5672 #rabbitmq的连接端口号
    virtual-host: /yolo #rabbitmq的虚拟host
    username: yolo #rabbitmq的用户名
    password: yolo #rabbitmq的密码

配置交换机和队列的绑定:

  • 死信队列绑定死信交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayQueueRabbitConfig {
    /* @@@@ 死信队列 @@@ */
    /**
     * 死信队列
     */
    public static final String DLX_QUEUE = "queue.dlx";

    /**
     * 死信交换机
     */
    public static final String DLX_EXCHANGE = "exchange.dlx";

    /**
     * 死信routing-key
     */
    public static final String DLX_ROUTING_KEY = "routingKey.dlx";

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE, true);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE, true, false);
    }

    /**
     * 死信队列和死信交换机绑定
     *
     * @return
     */
    @Bean
    public Binding bindingDLX() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
    }

    /* @@@@ 延迟队列 @@@ */
    /**
     * 订单延迟队列
     */
    public static final String ORDER_QUEUE = "queue.order";

    /**
     * 订单交换机
     */
    public static final String ORDER_EXCHANGE = "exchange.order";

    /**
     * 订单routing-key
     */
    public static final String ORDER_ROUTING_KEY = "routingkey.order";

    /**
     * 订单延迟队列
     *
     * @return
     */
    @Bean
    public Queue orderQueue() {
        Map<String, Object> params = new HashMap<>();
        // 关键参数
        params.put("x-dead-letter-exchange", DLX_EXCHANGE);
        params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(ORDER_QUEUE, true, false, false, params);
    }

    /**
     * 订单交换机
     *
     * @return
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 订单队列和交换机绑定
     *
     * @return
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
    }
}

绑定死信交换机通过添加x-dead-letter-exchangex-dead-letter-routing-key参数指定对应的交换机和路由。

生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@RestController
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/dlx")
    public String dlx(@RequestParam("msg") String msgStr) {
        String date = DateUtil.dateFormat(new Date());
        String delayTime = "5000"; //单位毫秒
        System.out.println("【发送消息】延迟 5 秒 发送时间 " + date);
        rabbitTemplate.convertAndSend(
                DelayQueueRabbitConfig.ORDER_EXCHANGE,
                DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                msgStr,
                message -> {
                    // 设置过期时间
                    message.getMessageProperties().setExpiration(delayTime);
                    return message;
                });
        return "ok";
    }
}

class DateUtil {
    public static String dateFormat(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(date);
    }
}
消费者
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msgStr, Channel channel, Message message) {
    System.out.println("【接收消息】" + msgStr + " 接收时间" + DateUtil.dateFormat(new Date()));
}
测试

调用接口发送消息,可以看到,消息延迟了5秒后才被消费:

【发送消息】延迟 5 秒 发送时间 21:19:39
【接收消息】hello 接收时间21:19:44

消息时序问题

队列都有 先进先出 的特点。如果队列前面的消息延迟比队列后的消息延迟更长,需要等到队列前面的消息被消费后才能消费后面的消息。

测试下:

// 改造生产者
@GetMapping("/dlx")
public String dlx() {
    dlxSend("延迟10秒", "10000");
    dlxSend("延迟2 秒", "2000");
    dlxSend("延迟5 秒", "5000");
    return "ok";
}

private void dlxSend(String msgStr, String delayTime) {
    System.out.println("【发送消息】" + msgStr + "当前时间" + DateUtil.dateFormat(new Date()));
    rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                                  msgStr,
                                  message -> {
                                      message.getMessageProperties().setExpiration(delayTime);
                                      return message;
                                  });
}

控制台输出:

【发送消息】延迟10秒当前时间21:45:00
【发送消息】延迟2 秒当前时间21:45:00
【发送消息】延迟5 秒当前时间21:45:00
【接收消息】延迟10秒 接收时间21:45:10
【接收消息】延迟2 秒 接收时间21:45:10
【接收消息】延迟5 秒 接收时间21:45:10

当前面的消息没被消费时,后面的消息都不会被消费到。如上面的第二条消息延迟2秒的过期时间达到时,第一条还没被消费,第二条等待第一条被消费后立刻就能被消费了。

RabbitMQ Delayed Message Plugin

RabbitMQ Delayed Message Plugin是官方提供的延时消息插件,虽然使用起来比较方便,但是不是高可用的,如果节点挂了会导致消息丢失。

官网:传送门

Delayed messages are stored in a Mnesia table (also see Limitations below) with a single disk replica on the current node. They will survive a node restart. While timer(s) that triggered scheduled delivery are not persisted, it will be re-initialised during plugin activation on node start. Obviously, only having one copy of a scheduled message in a cluster means that losing that node or disabling the plugin on it will lose the messages residing on that node.

安装插件

去RabbitMQ的官网下载插件rabbitmq_delayed_message_exchange,插件地址:

将插件文件复制到RabbitMQ安装目录的plugins目录下:

image-20230523222937490

进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20230523223527215

启用插件成功,之后重新启动RabbitMQ服务,交换机会有新的类型x-delayed-message

image-20230524064307263

x-delayed-message类型的交换机,支持延迟投递消息。发送消息给x-delayed-message类型的交换机:

image-20230524071017694

  • x-delayed-message类型的交换机接收消息投递后,并未将直接路由到队列中,而是存储到mnesia(一个分布式数据系统),该系统会检测消息延迟时间。
  • 消息达到可投递时间,消息会被投递到目标队列。

实现延迟消息

配置交换机、队列以及绑定关系:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class XDelayedMessageConfig {
    /**
     * 队列
     */
    public static final String DIRECT_QUEUE = "queue.delayed";

    /**
     * 延迟交换机
     */
    public static final String DELAYED_EXCHANGE = "exchange.delayed";

    /**
     * 绑定的routing key
     */
    public static final String ROUTING_KEY = "routingKey.bind";

    @Bean
    public Queue directQueue() {
        return new Queue(DIRECT_QUEUE, true);
    }

    /**
     * 定义延迟交换机
     * 交换机的类型为 x-delayed-message
     *
     * @return
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map);
    }

    @Bean
    public Binding delayOrderBinding() {
        return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }

}

生产者:

@GetMapping("/delay")
public String delay() {
    delaySend("延迟队列10 秒", 10000);
    delaySend("延迟队列5 秒", 5000);
    delaySend("延迟队列2 秒", 2000);
    return "ok";
}

private void delaySend(String msgStr, Integer delayTime) {
    System.out.println("【发送消息】" + msgStr + "当前时间" + DateUtil.dateFormat(new Date()));
    rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE, XDelayedMessageConfig.ROUTING_KEY,
                                  msgStr, message -> {
                                      message.getMessageProperties().setDelay(delayTime);
                                      //message1.getMessageProperties().setHeader("x-delay",delayTime);
                                      return message;
                                  });
}

消费者:

@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
public void delayProcess(String msgStr, Channel channel, Message message) {
    System.out.println("【接收消息】" + msgStr + " 接收时间" + DateUtil.dateFormat(new Date()));
}

从控制台输出的结果,可以看出RabbitMQ的 rabbitmq_delayed_message_exchange 插件可以解决消息时序问题:

【发送消息】延迟队列10 秒当前时间06:53:18
【发送消息】延迟队列5 秒当前时间06:53:18
【发送消息】延迟队列2 秒当前时间06:53:18
【接收消息】延迟队列2 秒 接收时间06:53:20
【接收消息】延迟队列5 秒 接收时间06:53:23
【接收消息】延迟队列10 秒 接收时间06:53:28

比较

由于死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单

RocketMQ

TODO

参考


YOLO