springboot整合rabbitmq
2021-01-25 22:15
标签:消息传递 设置 文件 ns-3 mes string rabbit 配置 rabl 首先介绍一下rabbitmq三种模式 Direct–路由模式
Fanout–发布/订阅模式
Topic–匹配订阅模式
pom文件配置 第一步配置生产者 接下来就是消费者 启动项目,分别访问127.0.0.1:8080/rabbit/direct,127.0.0.1:8080/rabbit/topic,127.0.0.1:8080/rabbit/fanout三个地址看看效果。
这种注解方式其实原理和上面一样,只是消费者的RabbitListener只要配置一个queue的名称,其他配置同意提取到一个配置类中 结果如下
这种方式比较麻烦,但是呢,有些老项目可能是这么用的,所以在这里也做一个demo。 第二步就是配置文件,这里我分了两个配置文件,把不改变的连接信息之类的放在单独的配置文件,这部分文件配置即可发送mq消息 接下来就是消费者的一些监听绑定的xml 接下来我们的接受类com.example.xml.demo.receiver.Receiver 生产者类代码 接下来看结果 springboot整合rabbitmq 标签:消息传递 设置 文件 ns-3 mes string rabbit 配置 rabl 原文地址:https://www.cnblogs.com/blogs2002/p/12859262.html
任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue。
这种模式下不需要将Exchange进行任何绑定(binding)操作。
消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
这种模式不需要RouteKey。
这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上。
就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
在进行绑定时,要提供一个该队列关心的主题。
.“#”表示0个或若干个关键字,“*”表示一个关键字。
同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
springboot整合rabbitmq基于注解方式(最简单方式)
接下来就是配置文件
spring.application.name=springboot-rabbitmq
server.port=8080
//默认地址就是127.0.0.1:5672,如果是服务器的rabbitmq就改下
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
//消息确认模式,还有一种事务模式,这里不讲解,有兴趣自己去查资料
spring.rabbitmq.publisher-confirms=true
//这里我把他看作是虚拟主机目录,相当于数据库的库名
spring.rabbitmq.virtual-host=/
package com.example.annotion.demo.sender;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Sender {
//rabbitTemplate.convertAndSend(String exchange交换机名称(可省略), String routingKey路由键, Object object传递的消息)
@Autowired
private AmqpTemplate rabbitTemplate;
//direct方式交换机名字随便填,但是不能填direct,会造成两次消费
public void sendDirect() {
String msg1 = "hello " + new Date();
System.out.println("helloSender : " + msg1);
this.rabbitTemplate.convertAndSend("hello", msg1);
// this.rabbitTemplate.convertAndSend("direct","hello", msg1);
String msg2 = "user " + new Date();
System.out.println("userSender : " + msg2);
this.rabbitTemplate.convertAndSend("user", msg2);
// this.rabbitTemplate.convertAndSend("direct","user", msg2);
}
//topic方式
public void sendTopic() {
String msg1 = "I am topic.mesaage msg======";
System.out.println("topic.mesaage sender : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
String msg2 = "I am topic.mesaages msg########";
System.out.println("topic.mesaages sender : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
//fanout方式routingKey随便填
public void sendFanout() {
String msg = "I am fanoutSender msg======";
System.out.println("fanoutSender : " + msg);
this.rabbitTemplate.convertAndSend("fanoutExchange", "suibiantian",msg);
}
}
package com.example.annotion.demo.receiver;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @RabbitListener()rabbit监听
* @QueueBinding()队列绑定 value绑定@Queue,exchange绑定@Exchange,key为路由键
* @Queue队列 value:名称;autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除;durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
* @Exchange交换器,type有五种,其余参数同@Queue
*/
@Component
public class Receiver {
//===============以下是验证direct Exchange的队列==========
// @RabbitListener(queues = "hello")
//direct模式,exchange名字随便填
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "hello",autoDelete = "false",declare = "true"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "user"
))
@RabbitHandler
public void processHello(String msg) {
System.out.println("helloReceiver : " + msg);
}
// @RabbitListener(queues = "user")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "user",autoDelete = "false"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "hello"
))
@RabbitHandler
public void processUser(String msg) {
System.out.println("userReceiver : " + msg);
}
//===============以上是验证direct Exchange的队列==========
//===============以下是验证topic Exchange的队列==========
// @RabbitListener(queues = "topic.message")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.message",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.message"
))
@RabbitHandler
public void processTopicMessage(String msg) {
System.out.println("topicMessageReceiver : " + msg);
}
// @RabbitListener(queues = "topic.messages")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.messages",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.#"
))
@RabbitHandler
public void processTopicMessages(String msg) {
System.out.println("topicMessagesReceiver : " + msg);
}
//===============以上是验证topic Exchange的队列==========
//===============以下是验证fanout Exchange的队列==========
// @RabbitListener(queues = "fanout.A")
//fanout方式key不用填
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout.A",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
))
@RabbitHandler
public void processFanoutA(String msg) {
System.out.println("fanoutAReceiver : " + msg);
}
// @RabbitListener(queues = "fanout.B")
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout.B",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
))
@RabbitHandler
public void processFanoutB(String msg) {
System.out.println("fanoutBReceiver : " + msg);
}
// @RabbitListener(queues = "fanout.C")
@RabbitHandler
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout.C",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
))
public void processFanoutC(String msg) {
System.out.println("fanoutCReceiver : " + msg);
}
//===============以上是验证fanout Exchange的队列==========
}
再来一个controller方便测试
package com.example.annotion.demo.controller;
import com.example.annotion.demo.sender.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
@Autowired
private Sender sender;
@GetMapping("/direct")
public void direct() {
sender.sendDirect();
}
@GetMapping("/topic")
public void topic() {
sender.sendTopic();
}
@GetMapping("/fanout")
public void fanout() {
sender.sendFanout();
}
}
springboot整合rabbitmq基于注解方式
消费者代码
package com.example.annotion.demo.receiver;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
//===============以下是验证direct Exchange的队列==========
@RabbitListener(queues = "hello")
@RabbitHandler
public void processHello(String msg) {
System.out.println("helloReceiver : " + msg);
}
@RabbitListener(queues = "user")
@RabbitHandler
public void processUser(String msg) {
System.out.println("userReceiver : " + msg);
}
//===============以上是验证direct Exchange的队列==========
//===============以下是验证topic Exchange的队列==========
@RabbitListener(queues = "topic.message")
@RabbitHandler
public void processTopicMessage(String msg) {
System.out.println("topicMessageReceiver : " + msg);
}
@RabbitListener(queues = "topic.messages")
@RabbitHandler
public void processTopicMessages(String msg) {
System.out.println("topicMessagesReceiver : " + msg);
}
//===============以上是验证topic Exchange的队列==========
//===============以下是验证fanout Exchange的队列==========
@RabbitListener(queues = "fanout.A")
@RabbitHandler
public void processFanoutA(String msg) {
System.out.println("fanoutAReceiver : " + msg);
}
@RabbitListener(queues = "fanout.B")
@RabbitHandler
public void processFanoutB(String msg) {
System.out.println("fanoutBReceiver : " + msg);
}
@RabbitListener(queues = "fanout.C")
@RabbitHandler
public void processFanoutC(String msg) {
System.out.println("fanoutCReceiver : " + msg);
}
//===============以上是验证fanout Exchange的队列==========
}
配置类代码package com.example.annotion.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
public class QueueCofig {
//===============以下是验证direct Exchange的队列==========
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public Queue userQueue() {
return new Queue("user");
}
/**
*注入name为‘direct‘的DirectExchange,默认名字就是空字符串,可以不注入
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange("direct");
}
/**
* 将队列hello与exchange绑定,binding_key为hello,就是完全匹配
*/
@Bean
Binding bindingHelloExchange(Queue helloQueue, DirectExchange exchange) {
return BindingBuilder.bind(helloQueue).to(exchange).with("hello");
}
/**
* 将队列user与exchange绑定,binding_key为hello,就是完全匹配
*/
@Bean
Binding bindingUserExchange(Queue userQueue, DirectExchange exchange) {
return BindingBuilder.bind(userQueue).to(exchange).with("user");
}
//===============以上是验证direct Exchange的队列==========
//===============以下是验证topic Exchange的队列==========
@Bean
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean
public Queue queueMessages() {
return new Queue("topic.messages");
}
/**
*注入name为exchange的TopicExchange
*/
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
//===============以上是验证topic Exchange的队列==========
//===============以下是验证Fanout Exchange的队列==========
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
/**
* 注入name为fanoutExchange的FanoutExchange
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 将队列fanout.A与FanoutExchange绑定
*/
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
/**
* 将队列fanout.B与FanoutExchange绑定
*/
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
/**
* 将队列fanout.C与FanoutExchange绑定
*/
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
//===============以上是验证Fanout Exchange的队列==========
}
springboot整合rabbitmq基于xml方式
pom文件和上面一样,配置文件把mq的配置去掉,第一步设置配置类,加载xml文件
package com.example.xml.demo.config;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
/**
* 实例化xml文件中定义的bean
**/
@Configuration
@EnableRabbit
@ImportResource({ "classpath:config/applicationContext-*.xml" })
public class XmlConfig {
}
引入了rabbitmq.propertiesrmq.ip=127.0.0.1
rmq.port=5672
rmq.manager.user=guest
rmq.manager.password=guest
package com.example.xml.demo.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
public class Receiver {
public void processHello(String msg) {
System.out.println("helloReceiver : " + msg);
}
public void processUser(String msg) {
System.out.println("userReceiver : " + msg);
}
public void processTopicMessage(String msg) {
System.out.println("topicMessageReceiver : " + msg);
}
public void processTopicMessages(String msg) {
System.out.println("topicMessagesReceiver : " + msg);
}
public void processFanoutA(String msg) {
System.out.println("fanoutAReceiver : " + msg);
}
public void processFanoutB(String msg) {
System.out.println("fanoutBReceiver : " + msg);
}
public void processFanoutC(String msg) {
System.out.println("fanoutCReceiver : " + msg);
}
}
package com.example.xml.demo.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void sendQueue() {
String msg1 = "hello " + new Date();
System.out.println("helloSender : " + msg1);
this.rabbitTemplate.convertAndSend("hello", msg1);
// this.rabbitTemplate.convertAndSend("direct","hello", msg1);
String msg2 = "user " + new Date();
System.out.println("userSender : " + msg2);
this.rabbitTemplate.convertAndSend("user", msg2);
// this.rabbitTemplate.convertAndSend("direct","user", msg1);
}
public void sendTopic() {
String msg1 = "I am topic.mesaage msg======";
System.out.println("topic.mesaage sender : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
String msg2 = "I am topic.mesaages msg########";
System.out.println("topic.mesaages sender : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
public void sendFanout() {
String msg = "I am fanoutSender msg======";
System.out.println("fanoutSender : " + msg);
this.rabbitTemplate.convertAndSend("fanoutExchange","keysuibiantian", msg);
}
}
上一篇:js--数组中的最值
文章标题:springboot整合rabbitmq
文章链接:http://soscw.com/index.php/essay/46990.html