SpringBoot整合RabbitMq

2021-03-01 10:29

阅读:366

标签:context   core   短信   src   catch   direct   info   actor   auth   

1.pom文件引入maven依赖: 

    
        org.springframework.boot
            spring-boot-starter-amqp
        

2.application.properties文件添加自定义配置,也可使用默认的key:

#使用rabbitmq发送短信
spring.rabbitmq.sms.addresses=10.0.193.28:5672,10.0.193.29:5672
spring.rabbitmq.sms.username=admin
spring.rabbitmq.sms.password=123456
spring.rabbitmq.sms.virtual-host=msg
spring.rabbitmq.sms.publisher-confirms=true
spring.rabbitmq.sms.publisher-returns=true
#rabbitmq服务连接超时时间/ms
spring.rabbitmq.sms.connect.timeout=10000
sms.direct.exchange=exc.msgtosend
sms.direct.queue=que.msg.fast.batch
sms.direct.routingkey=msg_fast

3.添加配置类RabbitMqConfig,配置RabbitTemplate模板  

技术图片技术图片
 1 package cn.drz.config;
 2 
 3 import javax.annotation.Resource;
 4 
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.amqp.core.Binding;
 8 import org.springframework.amqp.core.BindingBuilder;
 9 import org.springframework.amqp.core.DirectExchange;
10 import org.springframework.amqp.core.Queue;
11 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
12 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
13 import org.springframework.amqp.rabbit.core.RabbitTemplate;
14 import org.springframework.beans.factory.annotation.Qualifier;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Configuration;
18 
19 /**
20  * 使用RabbitMq Direct交换器模式推送消息
21  * @author drz
22  *
23  */
24 @Configuration
25 public class RabbitMqConfig {
26 
27     private final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
28 
29     @Resource
30     private BasicConfig basicConfig;
31 
32     @Bean
33     public Queue smsDirectQueue() {
34         return new Queue(basicConfig.getSmsDirectQueue());
35     }
36 
37     @Bean
38     public DirectExchange smsDirectExchange() {
39         return new DirectExchange(basicConfig.getSmsDirectExchange());
40     }
41 
42     @Bean
43     public Binding bindQueueToExchangeWithKey() {
44         return BindingBuilder.bind(smsDirectQueue()).to(smsDirectExchange())
45                 .with(basicConfig.getSmsRoutingKey());
46     }
47 
48     @Bean(name = "smsConnectionFactory")
49     public ConnectionFactory smsConnectionFactory(
50             @Value("${spring.rabbitmq.sms.addresses}") String addresses,
51             @Value("${spring.rabbitmq.sms.username}") String username,
52             @Value("${spring.rabbitmq.sms.password}") String password,
53             @Value("${spring.rabbitmq.sms.virtual-host}") String virtualHost,
54             @Value("${spring.rabbitmq.sms.publisher-confirms}") boolean publisherConfirms,
55             @Value("${spring.rabbitmq.sms.publisher-returns}") boolean publisherReturns) {
56         CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
57         connectionFactory.setAddresses(addresses);
58         connectionFactory.setUsername(username);
59         connectionFactory.setPassword(password);
60         connectionFactory.setVirtualHost(virtualHost);
61         connectionFactory.setPublisherConfirms(publisherConfirms);
62         connectionFactory.setPublisherReturns(publisherReturns);
63         connectionFactory.setConnectionTimeout(basicConfig.getSmsConnectTimeout());
64         return connectionFactory;
65     }
66 
67     @Bean(name = "smsRabbitTemplate")
68     public RabbitTemplate smsRabbitTemplate(
69             @Qualifier("smsConnectionFactory") ConnectionFactory connectionFactory) {
70         RabbitTemplate smsRabbitTemplate = new RabbitTemplate(connectionFactory);
71         smsRabbitTemplate.setMandatory(true);
72         logger.info("加载smsRabbitTemplate完成");
73         return smsRabbitTemplate;
74     }
75 }
View Code

  BasicConfig配置基类如下,也可将application.properties文件中的配置全部映射到BasicConfig; 

技术图片技术图片
 1 package cn.drz.config;
 2 
 3 import org.springframework.beans.factory.annotation.Value;
 4 import org.springframework.stereotype.Component;
 5 
 6 import lombok.Getter;
 7 import lombok.Setter;
 8 
 9 @Getter
10 @Setter
11 @Component
12 public class BasicConfig {
13 
14     @Value("${sms.direct.queue}")
15     private String smsDirectQueue;
16 
17     @Value("${sms.direct.exchange}")
18     private String smsDirectExchange;
19 
20     @Value("${sms.direct.routingkey}")
21     private String smsRoutingKey;
22 
23     @Value("${spring.rabbitmq.sms.connect.timeout}")
24     private int smsConnectTimeout;
25 
26 }
View Code

4.以上配置完毕,即可使用RabbitTemplate发消息,并能通过实现ConfirmCallback, ReturnCallback来接收发送消息结果的回调:

  如下是发短信的demo:

  接口: 

技术图片技术图片
 1 package cn.drz.service.open;
 2 
 3 import cn.drz.model.Message;
 4 
 5 public interface SmsService {
 6     /**
 7      * 发送短信
 8      */
 9     void sendMsg(Message message) throws Exception;
10 
11 }
View Code

  model 类 Message: 

技术图片技术图片
 1 package cn.drz.model;
 2 
 3 import java.io.Serializable;
 4 
 5 import lombok.Getter;
 6 import lombok.Setter;
 7 @Getter
 8 @Setter
 9 public class Message implements Serializable {
10 
11 
12     String mobile;
13 
14     String content;
15 
16 }
View Code

  实现类:  

技术图片技术图片
  1 package cn.drz.service.open.impl;
  2 
  3 import java.io.IOException;
  4 import java.util.ArrayList;
  5 import java.util.HashMap;
  6 import java.util.List;
  7 import java.util.Map;
  8 import java.util.UUID;
  9 
 10 import javax.annotation.Resource;
 11 
 12 import org.codehaus.jackson.map.ObjectMapper;
 13 import org.slf4j.Logger;
 14 import org.slf4j.LoggerFactory;
 15 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 16 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
 17 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
 18 import org.springframework.amqp.rabbit.support.CorrelationData;
 19 import org.springframework.beans.factory.annotation.Autowired;
 20 import org.springframework.stereotype.Service;
 21 
 22 import cn.drz.config.BasicConfig;
 23 import cn.drz.Message;
 24 import cn.drz.service.open.SmsService;
 25 
 26 @Service("smsService")
 27 public class SmsServiceImpl implements SmsService, ConfirmCallback, ReturnCallback {
 28     private static final Logger logger = LoggerFactory.getLogger(SmsServiceImpl.class);
 29 
 30     @Resource
 31     private BasicConfig basicConfig;
 32 
 33     @Resource
 34     private RabbitTemplate smsRabbitTemplate;
 35 
 36     /**
 37      * 实现消息发送到RabbitMQ交换器后接收ack回调
 38      */
 39     @Override
 40     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 41 
 42         String mqId = correlationData.getId();
 43         if (ack) {
 44             logger.info("RabbitMq消息发送到交换器{}成功,mqid={}", smsRabbitTemplate.getExchange(), mqId);
 45         } else {
 46             logger.error("RabbitMq消息发送到交换器{}失败,mqid={},失败原因={}", smsRabbitTemplate.getExchange(),
 47                     mqId, cause);
 48             //do something
 49         }
 50     }
 51 
 52     /**
 53      * 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
 54      */
 55     @Override
 56     public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
 57             String replyText, String exchange, String routingKey) {
 58         String errInfo = "消息发送失败,未找到指定队列:replyCode=" + replyCode + ",replyText=" + replyText
 59                 + ",exchange=" + exchange + ",routingKey=" + routingKey + ",message=" + message;
 60         logger.error(errInfo);
 61       //do something
 62     }
 63 
 64     @Override
 65     public void sendMsg(Message message) throws Exception {
 66         String mobiles = message.getMobile();
 67         String[] mobileList = mobiles.split(",");
 68         for (int i = 0; i ) {
 69             sendRabbitMQ(mobileList[i], message.getContent());
 70         }
 71 
 72     }
 73 
 74     private void sendRabbitMQ(String mobile, String content) throws Exception {
 75 
 76         String mqId = "NGDP" + System.currentTimeMillis()
 77                 + UUID.randomUUID().toString().replace("-", "").substring(0, 20);
 78 
 79         List> requestBody = new ArrayList>();
 80         Map requestMap = new HashMap();
 81         requestMap.put("serviceid", "10001");
 82         requestMap.put("sourcesystem", "NGDP");
 83         requestMap.put("sendtarget", mobile);
 84         requestMap.put("smcontent", content.length() > 990 ? content.substring(0, 990) : content);
 85         requestMap.put("msg_type", "sms");
 86         requestMap.put("pipeline_number", mqId);
 87         requestBody.add(requestMap);
 88         String message = null;
 89         ObjectMapper objectMapper = new ObjectMapper();
 90         try {
 91             message = objectMapper.writeValueAsString(requestBody);
 92         } catch (IOException e) {
 93             logger.error("生成Json数据失败,失败原因:{}", e);
 94         }
 95         send(basicConfig.getSmsDirectExchange(), basicConfig.getSmsRoutingKey(), message, mqId);
 96     }
 97 
 98     private void send(String exchange, String routingKey, String msg, String mqId)
 99             throws ServiceException {
100 
101         logger.info("开始发送rabbitmq消息 :exchange={}, queue={}, routingkey={}, mqId={}, msg={}",
102                 basicConfig.getSmsDirectExchange(), basicConfig.getSmsDirectQueue(),
103                 basicConfig.getSmsRoutingKey(), mqId, msg);
104         CorrelationData correlationId = new CorrelationData(mqId);
105         smsRabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationId);
106         logger.info("发送消息完成,mqId={}", mqId);
107     }
108 
109    
110 
111 }
View Code

 

SpringBoot整合RabbitMq

标签:context   core   短信   src   catch   direct   info   actor   auth   

原文地址:https://www.cnblogs.com/itfeng813/p/14442960.html


评论


亲,登录后才可以留言!