springboot整合rabbitMQ
2021-06-17 22:05
标签:repo 请求 ide ... tin 步骤 中间件 2.0 inf 当前社区活跃度最好的消息中间件就是kafka和rabbitmq了,前面对kafaka的基础使用做了一些总结,最近开始研究rabbitmq,查看了很多资料,自己仿着写了一些demo,在博客园记录一下。 关于rabbitmq基础知识,可以看这篇博客,介绍的很详细了:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html。 rabbitmq的安装很简单,我们可以根据自己的系统去网上找对应的安装说明,这里我为了方便,采用docker镜像的方式,我的虚拟机装的是centos7。步骤如下: 1、启动docker,关闭防火墙 2、拉取镜像:docker pull rabbitmq,如需要管理界面:docker pull rabbitmq:management 3、执行指令启动RabbitMQ 无管理界面: docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 rabbitmq 有管理界面: docker run --hostname rabbit-host --name rabbit -d -p 5672:5672 -p 15672:15672 rabbitmq:management 4、启动后输入你的虚拟机地址+端口号15672,即可访问到rabbitmq登录界面,默认用户名和密码都是guest。 IDE:STS,这是spring官方推荐的开发工具,构建springboot项目非常方便。JDK:1.8 1、pom.xml 2、定义常量 3、rabbitmq配置 4、生产者 5、消费者 6、控制器类 7、application.properties 至此,springboot整合rabbitmq基本demo完毕,这里不再贴出演示截图。 RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。 1、ConfirmCallback 确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调,使用该功能需要开启确认,spring-boot中配置如下: spring.rabbitmq.publisher-confirms = true 在MessageProducer.java加入如下代码: 2、ReturnCallback 通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发),使用该功能需要开启确认,spring-boot中配置如下:spring.rabbitmq.publisher-returns = true 在MessageProducer.java加入如下代码: 消费确认模式有三种:NONE、AUTO、MANUAL。 开启手动确认需要在配置中加入:spring.rabbitmq.listener.direct.acknowledge-mode=manual 消息在处理失败后将再次返回队列,重新尝试消费,如果再次失败则直接拒绝。 实例代码如下: DLX, Dead-Letter-Exchange。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这样我们就可以重新去处理这个消息。DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理 。消息变成死信一向有一下几种情况: 利用DLX,我们可以实现消息的延迟消费,可参考:https://www.jianshu.com/p/b74a14c7f31d,还可以像我的demo那样,对于有问题的消息进行重新处理,实例代码如下 首先在MyRabbitMqConfiguration上加入如下配置: 其次,修改我们的消息发送者,发送消息到我们新加入的交换器和路由键上,如下: 新添加一个消费者,同时将原来的消费者的监听队列换成新加入的 启动项目,发送请求http://localhost:8082/index?str=asdfgh,可以看到后台日志如下: rabbitmq支持四种交换器,同时还支持很多种插件,功能非常强大,这里我自己还没亲手用过,所以不再展开。 springboot整合rabbitMQ 标签:repo 请求 ide ... tin 步骤 中间件 2.0 inf 原文地址:https://www.cnblogs.com/hhhshct/p/9718063.htmlrabbitmq基础知识
rabbitmq安装
springboot与rabbitmq整合
xml version="1.0" encoding="UTF-8"?>
project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
modelVersion>4.0.0modelVersion>
groupId>powerx.iogroupId>
artifactId>springboot-rabbitmqartifactId>
version>0.0.1-SNAPSHOTversion>
packaging>jarpackaging>
name>springboot-rabbitmqname>
description>Demo project for Spring Bootdescription>
parent>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-parentartifactId>
version>2.0.1.RELEASEversion>
relativePath/>
parent>
properties>
project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
java.version>1.8java.version>
properties>
dependencies>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-amqpartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-webartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-testartifactId>
scope>testscope>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
package com.example.demo.constant;
public interface QueueConstants {
// 消息交换
String MESSAGE_EXCHANGE = "message.direct.myexchange";
// 消息队列名称
String MESSAGE_QUEUE_NAME = "message.myqueue";
// 消息路由键
String MESSAGE_ROUTE_KEY = "message.myroute";
// 死信消息交换
String MESSAGE_EXCHANGE_DL = "message.direct.dlexchange";
// 死信消息队列名称
String MESSAGE_QUEUE_NAME_DL = "message.dlqueue";
// 死信消息路由键
String MESSAGE_ROUTE_KEY_DL = "message.dlroute";
}
package com.example.demo.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.demo.constant.QueueConstants;
@Configuration
public class MyRabbitMqConfiguration {
/**
* 交换配置
*
* @return
*/
@Bean
public DirectExchange messageDirectExchange() {
return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE)
.durable(true)
.build();
}
/**
* 消息队列声明
*
* @return
*/
@Bean
public Queue messageQueue() {
return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME)
.build();
}
/**
* 消息绑定
*
* @return
*/
@Bean
public Binding messageBinding() {
return BindingBuilder.bind(messageQueue())
.to(messageDirectExchange())
.with(QueueConstants.MESSAGE_ROUTE_KEY);
}
}
package com.example.demo.producer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String str) {
rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE, QueueConstants.MESSAGE_ROUTE_KEY, str);
}
}
package com.example.demo.consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer {
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
public void processMessage(Channel channel,Message message) {
System.out.println("MessageConsumer收到消息:"+new String(message.getBody()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
}
}
}
package com.example.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.producer.MessageProducer;
@RestController
public class TestController {
@Autowired
private MessageProducer messageProducer;
@RequestMapping(value = "/index")
public String index(String str) {
// 将实体实例写入消息队列
messageProducer.sendMessage(str);
return "Success";
}
}
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=192.168.1.124
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
消息确认
消息发送确认
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause) -> {
System.out.println("消息唯一标识" + correlationData);
System.out.println("消息确认结果" + ack);
System.out.println("失败原因" + cause);
});
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText,
String exchange, String routingKey) ->{
System.out.println("消息主体message" + message);
System.out.println("消息replyCode" + replyCode);
System.out.println("消息replyText" + replyText);
System.out.println("消息使用的交换器" + exchange);
System.out.println("消息使用的路由键" + routingKey);
});
消息消费确认
package com.example.demo.consumer;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer {
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
public void processMessage(Channel channel, Message message) {
System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
try {
//模拟消息处理失败
int a = 3 / 0;
// false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收...");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝
} catch (IOException e1) {
}
} else {
System.out.println("消息即将再次返回队列处理...");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列
} catch (IOException e1) {
}
}
}
}
}
死信队列
@Bean
DirectExchange messagedlDirect() {
return (DirectExchange) ExchangeBuilder.directExchange(QueueConstants.MESSAGE_EXCHANGE_DL).durable(true)
.build();
}
@Bean
Queue messagedlQueue() {
return QueueBuilder.durable(QueueConstants.MESSAGE_QUEUE_NAME_DL)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", QueueConstants.MESSAGE_EXCHANGE)
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", QueueConstants.MESSAGE_ROUTE_KEY).build();
}
@Bean
public Binding messageTtlBinding(Queue messagedlQueue, DirectExchange messagedlDirect) {
return BindingBuilder.bind(messagedlQueue).to(messagedlDirect).with(QueueConstants.MESSAGE_ROUTE_KEY_DL);
}
rabbitTemplate.convertAndSend(QueueConstants.MESSAGE_EXCHANGE_DL, QueueConstants.MESSAGE_ROUTE_KEY_DL, str);
package com.example.demo.consumer;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer {
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME_DL)
public void processMessage(Channel channel, Message message) {
System.out.println("MessageConsumer收到消息:" + new String(message.getBody()));
try {
//模拟消息处理失败
int a = 3 / 0;
// false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("消息已重复处理失败,拒绝再次接收...");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//requeue为false,拒绝
} catch (IOException e1) {
}
} else {
System.out.println("消息即将再次返回队列处理...");
try {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // requeue为true重新回到队列
} catch (IOException e1) {
}
}
}
}
}
package com.example.demo.consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.QueueConstants;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer2 {
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_NAME)
public void processMessage(Channel channel,Message message) {
System.out.println("MessageConsumer2收到消息:"+new String(message.getBody()));
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
}
}
}
文章标题:springboot整合rabbitMQ
文章链接:http://soscw.com/index.php/essay/95232.html