[SpringBoot] Spring Boot(14)RabbitMQ延迟队列
2021-01-19 23:13
标签:isa 场景 edr inter config boot1 runner www 注册 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。 实现延迟队列的方式有两种: 1)通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。 由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。 打开官网下载:http://www.rabbitmq.com/community-plugins.html 选择相应的对应的版本“3.7.x”点击下载。 注意: 下载的是.zip的安装包,下载完之后需要手动解压。 拷贝插件到Docker: RabbitMQ在Docker的安装,请参照本系列的上一篇文章:http://www.apigo.cn/2018/09/11/springboot13/ 进入docker内部: 开启插件: 查询安装的所有插件: 安装正常,效果如下图:
重启RabbitMQ,使插件生效 执行结果如下: 完整代码访问我的GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq 到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。 原文链接:https://blog.csdn.net/sufu1065/article/details/84134500 [SpringBoot] Spring Boot(14)RabbitMQ延迟队列 标签:isa 场景 edr inter config boot1 runner www 注册 原文地址:https://www.cnblogs.com/wxxujian/p/12905914.html1 前言
2)使用rabbitmq-delayed-message-exchange插件实现延迟功能;2 安装延迟插件
2.1 下载插件
2.2 安装插件
docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins
2.3 启动插件
docker exec -it rabbit /bin/bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
rabbitmq-plugins list
docker restart rabbit
3 代码实现
3.1 配置队列
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map
3.2 发送消息
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", 3000);
return message;
}
});
}
}
3.3 消费消息
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
3.4 测试队列
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
发送时间:2018-09-11 20:47:51
接收时间:2018-09-11 20:47:54
消息内容:Hi Admin.
4 总结
备注
文章标题:[SpringBoot] Spring Boot(14)RabbitMQ延迟队列
文章链接:http://soscw.com/essay/44268.html