个人主页:哈__

期待您的关注 

目录

 一、死信队列

RabbitMQ的工作模式

 死信队列的工作模式

 二、RabbitMQ相关的安装 

三、SpringBoot引入RabbitMQ

1.引入依赖

2.创建队列和交换器

2.1 变量声明 

2.2 创建延迟交换器

2.3 创建延迟队列

2.4 延迟队列绑定延迟交换器

2.5 死信队列配置

3. 添加application.yml

4. 添加RabbitMQListener (消费者)

5. 创建DelayMessageSender 

6. 创建Controller 

7.测试 

四、死信队列的应用场景


 一、死信队列

RabbitMQ的死信队列(Dead Letter Queue,DLQ)是一种特殊的队列,用于接收其他队列中的“死信”消息。所谓“死信”,是指满足一定条件而无法被消费者正确处理的消息,这些条件包括消息被拒绝、消息过期、消息达到最大重试次数等。

当消息成为死信时,RabbitMQ会将其重新发送到指定的死信队列,而不是丢弃它们。这样做的好处是可以对死信进行分析和处理,例如记录日志、重新入队或者进一步处理。

死信队列通常与RabbitMQ的延迟队列(Delayed Message Queue)一起使用,通过延迟队列延迟消息的处理时间,可以更容易地触发消息成为死信的条件,从而进行测试和调试。

死信队列在消息中间件中有许多实际应用场景,主要用于处理无法被正常消费的消息,增强了消息的可靠性和处理能力。以下是一些常见的应用场景:

  1. 延迟消息处理:通过将消息发送到延迟队列,在指定的时间后再将消息发送到目标队列,实现延迟处理消息的功能。

  2. 消息重试:当消费者无法处理消息时,消息可以被重新发送到队列并设置重试次数,达到最大重试次数后转发到死信队列,以便进行进一步处理。

  3. 异常处理:当消息无法被消费者正常处理时(如格式错误、业务异常等),将消息转发到死信队列,用于记录日志、报警或人工处理。

  4. 消息超时处理:当消息在队列中等待时间过长时,可以设置消息的过期时间(TTL),超过时间后将消息转发到死信队列。

  5. 消息路由失败:当消息无法被正确路由到目标队列时,可以将消息发送到死信队列,避免消息丢失。

  6. 消息版本兼容性处理:当消息的格式或内容发生变化时,通过死信队列可以处理老版本消息,确保新版本系统的兼容性。


RabbitMQ的工作模式


 死信队列的工作模式

今天我要实现的就是这个延迟队列和死信队列。生产者首先向延迟队列发送消息,待达到TTL后消息会被转送到死信队列当中,消费者会从死信队列中获取消息进行消费。

 二、RabbitMQ相关的安装 

win10 安装rabbitMQ详细步骤_rabbitmq 安装-CSDN博客

我这里直接引用别人的文章了,下载需要大家去看一看。

RabbitMQ延迟插件的安装。

[超详细]RabbitMQ安装延迟消息插件_rabbitmq安装延迟插件-CSDN博客

三、SpringBoot引入RabbitMQ

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>

2.创建队列和交换器

这一步是很重要的,如果你配置错误了,消息很可能无法正确的传送。要实现延迟队列和死信队列,我们一共要创建以下几个组件:

  1. 延迟队列
  2. 延迟队列的交换器
  3. 死信队列
  4. 死信队列的交换器

在我们创建了这几个组件之后,我们还要干一些事情,我们需要把这些组件进行组装,如果你不了解RabbitMQ的基础,你可以先看看基础教学,我这里简单的说一下。RabbitMQ中有一种绑定方式,这种绑定方式会把BindingKey和RoutingKey完全匹配的进行绑定,如下图所示,生产者发送了一个BindingKey为“warning”的消息,那么这个消息就会被发送到Queue1和Queue2,这并不难理解。

我们要做的就是把队列和交换器通过一个RoutingKey绑定在一起。


2.1 变量声明 

 接下来的代码要好好看了,首先我们把我们后边要用到的名称变量全部定义出来。因为这个名称起的很长,我们不方便直接使用。创建DeadRabbitConfig。在类中定义如下变量,延迟队列交换器名称、延迟队列名称、延迟队列Routing名称。除此之外还有死信队列交换器名称、死信队列名称和死信Routing名称。

  // 延迟队列交换器名称
    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    // 延迟队列A名称
    public static final String DELAY_QUEUE_A_NAME = "delay.queue.demo.business.queue_a";
    // 延迟队列B名称
    public static final String DELAY_QUEUE_B_NAME = "delay.queue.demo.business.queue_b";
    // 延迟队列routingA名称
    public static final String DELAY_QUEUE_ROUTING_A_NAME = "delay.queue.demo.business.queue_a.routing_key";
    // 延迟队列routingB名称
    public static final String DELAY_QUEUE_ROUTING_B_NAME = "delay.queue.demo.business.queue_b.routing_key";

    // 死信队列
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routing_key";
    public static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routing_key";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "delay.queue.demo.deadletter.queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "delay.queue.demo.deadletter.queue_b";

2.2 创建延迟交换器

// 注册延迟交换器delayExchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return  new DirectExchange(DELAY_EXCHANGE_NAME);
    }

2.3 创建延迟队列

这里的延迟队列需要我们额外的配置一些参数,用于和死信队列进行信息发送。这里我是用了两种不同的方式构建延迟队列A和延迟队列B,在延迟队列A种我没有设置TTL参数,而是通过RabbitMQ的延迟插件实现的,而延迟队列B我设置了TTL为10000ms,也就是十秒,十秒内消息如果没有被消费掉就会发送到死信队列。

// 注册延迟队列A   还要绑定死信交换器和死信routingA
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        //args.put("x-message-ttl",6000);
        return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(args).build();
    }
    // 注册延迟队列B   还要绑定死信交换器和死信routingB
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUE_B_ROUTING_KEY);
        args.put("x-message-ttl",10000);
        return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(args).build();
    }

2.4 延迟队列绑定延迟交换器

 // 延迟队列A绑定交换器
    @Bean
    public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_A_NAME);
    }
    // 延迟队列B绑定交换器
    @Bean
    public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue queue,@Qualifier("delayExchange") DirectExchange delayExchange){
        return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_QUEUE_ROUTING_B_NAME);
    }

2.5 死信队列配置

与延迟队列不同的是,死信队列并没有配置延迟参数。

// 注册死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A_NAME);
    }
    // 注册死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B_NAME);
    }
    // 注册死信交换器
    @Bean
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 死信队列A绑定死信交换器
    @Bean
    public Binding deadLetterQueueABinding(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
    }
    // 死信队列B绑定死信交换器
    @Bean
    public Binding deadLetterQueueBBinding(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange")DirectExchange deadLetterExchange){
        return BindingBuilder.bind(queue).to(deadLetterExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
    }

到此为止,RabbitMQ的组件配置完成。


3. 添加application.yml

server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. 添加RabbitMQListener (消费者)

下方的代码一共有两个消费者,一个消费者获取死信队列A中的消息,另一个消费者获取死信队列B中的消息。

@Component
public class DeadLetterQueueConsumer {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueConsumer.class);

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_A_NAME,ackMode = "MANUAL")
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列A收到消息:{}", new Date().toString(), msg);
        System.out.println(message.getMessageProperties().getDeliveryTag());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DeadRabbitConfig.DEAD_LETTER_QUEUE_B_NAME,ackMode = "MANUAL")
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        LOGGER.info("当前时间:{},死信队列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

5. 创建DelayMessageSender 

这里采用的就是两种不同的方式,一种方式是使用插件来延迟消息的发送,另一种是通过TTL参数。

@Component
public class DelayMessageSender {
    @Resource
    RabbitTemplate rabbitTemplate;


    public void sendMessage(String msg,Integer delayTimes){
        switch (delayTimes){
            case 6:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_EXCHANGE_NAME, DeadRabbitConfig.DELAY_QUEUE_ROUTING_A_NAME,msg,new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration(String.valueOf(6000));
                        return message;
                    }
                });
                break;
            case 10:
                rabbitTemplate.convertAndSend(DeadRabbitConfig.DELAY_QUEUE_B_NAME,msg);
                break;
        }
    }
}

6. 创建Controller 

@RestController
@RequestMapping("/student")
public class StudentController {
    @Autowired
    DelayMessageSender messageSender;
    @RequestMapping("/send-message")
    public String sendMessage(String msg,Integer delayTimes){
        System.out.println(new Date());
        messageSender.sendMessage(msg,delayTimes);
        return "发送成功";
    }
}

7.测试 

在浏览器中输入以下地址进入RabbitMQ界面。账号密码都是guest。

http://localhost:15672/

 先来看看我们的初始队列。这里是什么都没有的。


然后我们启动项目后在看。我们刚才创建出来的四个队列全部都被加载了出来。


 使用PostMan发送一次请求。


 我们的请求在17s的时候发送到后端,消息打印在23s,说明我们的延迟队列有效果。


接下来我们测试10s的延迟队列。


 10s后死信队列B成功的接收到了消息。

四、死信队列的应用场景

延迟队列通常用于需要延迟执行某些任务或触发某些事件的场景。例如,在电子商务中,可以使用延迟队列实现订单超时未支付自动取消功能。

  • 1.订单创建

    • 用户下单后,系统生成订单,并将订单信息发送到一个普通队列,同时设置一个TTL(Time-To-Live)为30分钟。
    • 这个队列配置了死信交换机(Dead Letter Exchange, DLX),当消息过期后会被转发到死信队列。
  • 2.等待支付

    • 在30分钟内,用户可以完成支付。如果用户在30分钟内支付完成,系统会从普通队列中移除对应的消息并正常处理订单。
  • 3.订单超时处理

    • 如果用户未在30分钟内完成支付,消息会自动过期并转发到死信交换机,进而转发到死信队列。
  • 4.取消订单

    • 系统有一个专门的消费者监听死信队列。当有消息进入死信队列时,消费者会自动处理这些消息,即取消订单、释放库存,并通知用户订单已取消。
  • 5.定时任务(可选):

    • 虽然死信队列已经提供了超时订单的处理,但为了防止消息丢失或处理延迟,可以设置一个定时任务定期检查订单状态,确保所有超时未支付的订单都得到了处理。

点赞(0) 打赏

评论列表 共有 0 条评论

暂无评论

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部