1、rabbitmq服务端打开延迟插件 (超过 4294967295毫秒 ≈ 1193 小时 ≈ 49.7 天 这个时间会立即触发)
注意:只有RabbitMQ 3.6.x以上才支持
在下载好之后,解压得到.ez
结尾的插件包,将其复制到RabbitMQ安装目录下的plugins
文件夹。
然后通过命令行启用该插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange |
该插件在通过上述命令启用后就可以直接使用,不需要重启。
2、添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3、配置交换机和队列
@Configuration
public class DelayedConfig {
public static String EXCHANGE_NAME = "delayed_exchange";
public static String QUEUE_NAME = "delayed_queue";
public static String KEY_NAME = "delayed_key";
/**
* 基于插件实现的交换机,必须是CustomExchange类型,标识这是一个延时类型的交换机
*/
@Bean()
public CustomExchange delayedExchange(){
Map<String,Object> params = new HashMap<>();
params.put("x-delayed-type","direct");
//参数1:交换机名字,参数2:交换机的类型,参数3:是否持久化,参数4:是否自动删除队列,参数5:交换机的额外参数设置
return new CustomExchange(EXCHANGE_NAME,"x-delayed-message",true,false,params);
}
@Bean()
public Queue delayedQueue(){
return new Queue(QUEUE_NAME);
}
@Bean
public Binding delayedBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(KEY_NAME).noargs();
}
}
4、发送和接收消息
@GetMapping("/t5")
public void t5(){
Date date = new Date();
System.out.println("发送时间:" + date.toString());
//发送消息
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"1延迟消息wxm",
msg->{msg.getMessageProperties().setHeader(MessageProperties.X_DELAY, 15552000000L);
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;});
//发送消息
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.KEY_NAME,"延迟消息wxm",
msg->{msg.getMessageProperties().setDelay(10000);
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;});
log.info("发送成功");
}
@RabbitHandler
@RabbitListener(queues = "delayed_queue")
public void getDelayed(Message message, Channel channel) throws Exception{
Date date = new Date();
String rightNow = date.toString();
String msg = new String(message.getBody());
// 手动应答
System.out.println(message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("接受成功:"+msg+rightNow);
}
本站资源均来自互联网,仅供研究学习,禁止违法使用和商用,产生法律纠纷本站概不负责!如果侵犯了您的权益请与我们联系!
转载请注明出处: 免费源码网-免费的源码资源网站 » RabbitMq实现延迟队列功能
发表评论 取消回复