SpringBoot整合RabbitMQ

2021-05-11 02:28

阅读:502

标签:caching   date   ring   consumer   bool   info   tostring   control   private   

历时一下午,从安装环境到运行成功,心里还是有点小感慨的解决了一个问题的感觉很爽。

首先是Erlang下载和环境配置以及RabbitMQ的下载

在这里我想吐槽一下Erlang的官网,为啥你这么特,下载个东西这么墨迹(呼---舒服了,来吧我继续了)

百度网盘:https://pan.baidu.com/s/1f2N40EjJOK9Vvdkz3EPEsQ

提取码:5fia

相信我,百度网盘虽然垃圾,但是绝对比erlang网站强不少

环境变量

博客链接:https://blog.csdn.net/zhm3023/article/details/82217222

在这里感谢这位大佬的博客给我提供了很多帮助,(合十)

maven依赖

1 
2         dependency>
3             groupId>org.springframework.bootgroupId>
4             artifactId>spring-boot-starter-amqpartifactId>
5         dependency>

 config

 1 package com.zdj.config;
 2 
 3 
 4 
 5 
 6 
 7 import org.springframework.amqp.core.AmqpAdmin;
 8 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 9 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
10 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11 import org.springframework.amqp.rabbit.core.RabbitAdmin;
12 import org.springframework.amqp.rabbit.core.RabbitTemplate;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.context.annotation.Configuration;
15 
16 @Configuration
17 public class RabbitMqConfig {
18     @Bean
19     public ConnectionFactory connectionFactory() {
20         //地址
21         CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1");
22         //端口
23         connectionFactory.setPort(5672);
24         //default username guest
25         connectionFactory.setUsername("guest");
26         //default password guest
27         connectionFactory.setPassword("guest");
28         //  vhost can be null
29         //connectionFactory.setVirtualHost("虚拟主机名");
30         return connectionFactory;
31     }
32     //amqp 管理
33     @Bean
34     public AmqpAdmin amqpAdmin(){
35         return new RabbitAdmin(connectionFactory());
36     }
37     //rabbit模板
38     @Bean
39     public RabbitTemplate rabbitTemplate(){
40         return new RabbitTemplate(connectionFactory());
41     }
42     @Bean
43     public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){
44         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
45         factory.setConnectionFactory(connectionFactory());
46         factory.setConcurrentConsumers(3);
47         factory.setMaxConcurrentConsumers(10);
48         return factory;
49     }
50     //定义exchange
51     public static final String EXCHANGE_A = "my-mq-exchange_1";
52     public static final String EXCHANGE_B = "my-mq-exchange_2";
53     public static final String EXCHANGE_C = "my-mq-exchange_3";
54     //queue
55     public static final String  QUEUE_A = "QUEUE_1";
56     public static final String  QUEUE_B = "QUEUE_2";
57     public static final String  QUEUE_C = "QUEUE_3";
58     //routingKey
59     public static final String ROUTINGKEY_A = "spring-boot-routingKey_1";
60     public static final String ROUTINGKEY_B = "spring-boot-routingKey_2";
61     public static final String ROUTINGKEY_C = "spring-boot-routingKey_3";
62 }

DirectExchangeConfig

 1 package com.zdj.config;
 2 
 3 import org.springframework.amqp.core.Binding;
 4 import org.springframework.amqp.core.BindingBuilder;
 5 import org.springframework.amqp.core.DirectExchange;
 6 import org.springframework.amqp.core.Queue;
 7 import org.springframework.beans.factory.annotation.Qualifier;
 8 import org.springframework.context.annotation.Bean;
 9 import org.springframework.context.annotation.Configuration;
10 
11 /**
12  * @author fqg
13  * @csdn
14  * @date 2020/6/17
15  *
16  * directExchange 直连交换机
17  */
18 
19 @Configuration
20 public class DirectExchangeConfig {
21     //create directExchange
22     @Bean(value = "exchange_A")
23     public DirectExchange directExchange(){
24         DirectExchange directExchange = new DirectExchange(RabbitMqConfig.EXCHANGE_A);
25         return directExchange;
26     }
27     //connect exchange_A with queue_A
28     @Bean(value = "exchange_AtoQueue_A")
29     public Binding exchange_AtoQueue_A(@Qualifier("queue1") Queue queue){
30         return BindingBuilder.bind(queue).to(directExchange()).with(RabbitMqConfig.ROUTINGKEY_A);
31     }
32     //connect exchange_A with queue_B
33     @Bean(value = "exchange_AtoQueue_B")
34     public Binding exchange_AtoQueue_B(@Qualifier("queue2") Queue queue){
35         return BindingBuilder.bind(queue).to(directExchange()).with(RabbitMqConfig.ROUTINGKEY_B);
36     }
37 }

 

 

FanoutExchangeConfig

package com.zdj.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutExchangeConfig {
    @Bean(value = "exchange2")
    public FanoutExchange fanoutExchange(){
        FanoutExchange fanoutExchange = new FanoutExchange("exchange2");
        return fanoutExchange;
    }

    @Bean(value = "exchange2toQueue3")
    public Binding exchange2toQueue3(@Qualifier("queue3") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange());
    }
}

 

Producer

package com.zdj.producer;

import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String content){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //put content to queue according to the routingKey_A
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_A, RabbitMqConfig.ROUTINGKEY_A, content, correlationId);
    }

    /**
     * callback
     * @param correlationData
     * @param b
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("callbackId" + correlationData);
        if(b){
            System.out.println("success consume");
        }else{
            System.out.println("fail consume");
        }
    }
}

Listener

package com.zdj.listener;

import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = RabbitMqConfig.QUEUE_A)
public class Queue1Listener1 {

    @RabbitHandler
    public void process(String content){
        System.out.println("cpu1监听queue1:" + content);
    }
}
package com.zdj.listener;

import com.zdj.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = RabbitMqConfig.QUEUE_A)
public class Queue1Listener2 {

    @RabbitHandler
    public void process(String content){
        System.out.println("cpu2监听queue1" + content);
    }
}

 

Test

package com.zdj.test;

import com.zdj.TestZdjMainApplication;
import com.zdj.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestZdjMainApplication.class)
public class Test{
    @Autowired
    private MsgProducer msgProducer;
    @Test
    public void set(){
        for(int i = 0; i ){
            msgProducer.sendMsg("第" + i + "条");
        }
    }
}

技术图片

 

问题:

1.RabbitMQ的几种Exchange消息调度策略

fanout,direct,Topic(*代表一个单词,#代表多个单词),headers(键值对)

2.使用情况

我最开始是想在前端和controller中间添加MQ,用作削峰和增加并发量,但是我想到的MQ不能实现实时的返回一些东西,所以没有实现转而实现了日志系统,定义了两个监听器监听队列,当打印出日志的时候只进行入队列操作,存储日志交给监听器来做。

 

SpringBoot整合RabbitMQ

标签:caching   date   ring   consumer   bool   info   tostring   control   private   

原文地址:https://www.cnblogs.com/frank9571/p/13155002.html


评论


亲,登录后才可以留言!