SpringBoot整合RabbitMQ
2021-05-11 02:28
标签:caching date ring consumer bool info tostring control private 在这里我想吐槽一下Erlang的官网,为啥你这么特,下载个东西这么墨迹(呼---舒服了,来吧我继续了) 百度网盘:https://pan.baidu.com/s/1f2N40EjJOK9Vvdkz3EPEsQ 提取码:5fia 相信我,百度网盘虽然垃圾,但是绝对比erlang网站强不少 博客链接:https://blog.csdn.net/zhm3023/article/details/82217222 在这里感谢这位大佬的博客给我提供了很多帮助,(合十) 问题: 1.RabbitMQ的几种Exchange消息调度策略 fanout,direct,Topic(*代表一个单词,#代表多个单词),headers(键值对) 2.使用情况 我最开始是想在前端和controller中间添加MQ,用作削峰和增加并发量,但是我想到的MQ不能实现实时的返回一些东西,所以没有实现转而实现了日志系统,定义了两个监听器监听队列,当打印出日志的时候只进行入队列操作,存储日志交给监听器来做。 SpringBoot整合RabbitMQ 标签:caching date ring consumer bool info tostring control private 原文地址:https://www.cnblogs.com/frank9571/p/13155002.html历时一下午,从安装环境到运行成功,心里还是有点小感慨的解决了一个问题的感觉很爽。
首先是Erlang下载和环境配置以及RabbitMQ的下载
环境变量
maven依赖
1
2 dependency>
3 groupId>org.springframework.bootgroupId>
4 artifactId>spring-boot-starter-amqpartifactId>
5 dependency>
config
1 package com.zdj.config;
2
3
4
5
6
7 import org.springframework.amqp.core.AmqpAdmin;
8 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
10 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11 import org.springframework.amqp.rabbit.core.RabbitAdmin;
12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.context.annotation.Configuration;
15
16 @Configuration
17 public class RabbitMqConfig {
18 @Bean
19 public ConnectionFactory connectionFactory() {
20 //地址
21 CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");
22 //端口
23 connectionFactory.setPort(5672);
24 //default username guest
25 connectionFactory.setUsername("guest");
26 //default password guest
27 connectionFactory.setPassword("guest");
28 // vhost can be null
29 //connectionFactory.setVirtualHost("虚拟主机名");
30 return connectionFactory;
31 }
32 //amqp 管理
33 @Bean
34 public AmqpAdmin amqpAdmin(){
35 return new RabbitAdmin(connectionFactory());
36 }
37 //rabbit模板
38 @Bean
39 public RabbitTemplate rabbitTemplate(){
40 return new RabbitTemplate(connectionFactory());
41 }
42 @Bean
43 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
44 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
45 factory.setConnectionFactory(connectionFactory());
46 factory.setConcurrentConsumers(3);
47 factory.setMaxConcurrentConsumers(10);
48 return factory;
49 }
50 //定义exchange
51 public static final String EXCHANGE_A = "my-mq-exchange_1";
52 public static final String EXCHANGE_B = "my-mq-exchange_2";
53 public static final String EXCHANGE_C = "my-mq-exchange_3";
54 //queue
55 public static final String QUEUE_A = "QUEUE_1";
56 public static final String QUEUE_B = "QUEUE_2";
57 public static final String QUEUE_C = "QUEUE_3";
58 //routingKey
59 public static final String ROUTINGKEY_A = "spring-boot-routingKey_1";
60 public static final String ROUTINGKEY_B = "spring-boot-routingKey_2";
61 public static final String ROUTINGKEY_C = "spring-boot-routingKey_3";
62 }
DirectExchangeConfig
1 package com.zdj.config;
2
3 import org.springframework.amqp.core.Binding;
4 import org.springframework.amqp.core.BindingBuilder;
5 import org.springframework.amqp.core.DirectExchange;
6 import org.springframework.amqp.core.Queue;
7 import org.springframework.beans.factory.annotation.Qualifier;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10
11 /**
12 * @author fqg
13 * @csdn
14 * @date 2020/6/17
15 *
16 * directExchange 直连交换机
17 */
18
19 @Configuration
20 public class DirectExchangeConfig {
21 //create directExchange
22 @Bean(value = "exchange_A")
23 public DirectExchange directExchange(){
24 DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE_A);
25 return directExchange;
26 }
27 //connect exchange_A with queue_A
28 @Bean(value = "exchange_AtoQueue_A")
29 public Binding exchange_AtoQueue_A(@Qualifier("queue1") Queue queue){
30 return BindingBuilder.bind(queue).to(directExchange()).with(RabbitMqConfig.ROUTINGKEY_A);
31 }
32 //connect exchange_A with queue_B
33 @Bean(value = "exchange_AtoQueue_B")
34 public Binding exchange_AtoQueue_B(@Qualifier("queue2") Queue queue){
35 return BindingBuilder.bind(queue).to(directExchange()).with(RabbitMqConfig.ROUTINGKEY_B);
36 }
37 }
FanoutExchangeConfig
package com.zdj.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {
@Bean(value = "exchange2")
public FanoutExchange fanoutExchange(){
FanoutExchange fanoutExchange = new FanoutExchange("exchange2");
return fanoutExchange;
}
@Bean(value = "exchange2toQueue3")
public Binding exchange2toQueue3(@Qualifier("queue3") Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange());
}
}
Producer
package com.zdj.producer;
import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String content){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//put content to queue according to the routingKey_A
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_A, RabbitMqConfig.ROUTINGKEY_A, content, correlationId);
}
/**
* callback
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("callbackId" + correlationData);
if(b){
System.out.println("success consume");
}else{
System.out.println("fail consume");
}
}
}
Listener
package com.zdj.listener;
import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = RabbitMqConfig.QUEUE_A)
public class Queue1Listener1 {
@RabbitHandler
public void process(String content){
System.out.println("cpu1监听queue1:" + content);
}
}
package com.zdj.listener;
import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = RabbitMqConfig.QUEUE_A)
public class Queue1Listener2 {
@RabbitHandler
public void process(String content){
System.out.println("cpu2监听queue1" + content);
}
}
Test
package com.zdj.test;
import com.zdj.TestZdjMainApplication;
import com.zdj.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestZdjMainApplication.class)
public class Test{
@Autowired
private MsgProducer msgProducer;
@Test
public void set(){
for(int i = 0; i ){
msgProducer.sendMsg("第" + i + "条");
}
}
}