SpringBoot整合RabbitMq
2021-03-01 10:29
标签:context core 短信 src catch direct info actor auth 1.pom文件引入maven依赖: 2.application.properties文件添加自定义配置,也可使用默认的key: 3.添加配置类RabbitMqConfig,配置RabbitTemplate模板 BasicConfig配置基类如下,也可将application.properties文件中的配置全部映射到BasicConfig; 4.以上配置完毕,即可使用RabbitTemplate发消息,并能通过实现ConfirmCallback, ReturnCallback来接收发送消息结果的回调: 如下是发短信的demo: 接口: model 类 Message: 实现类: SpringBoot整合RabbitMq 标签:context core 短信 src catch direct info actor auth 原文地址:https://www.cnblogs.com/itfeng813/p/14442960.html
#使用rabbitmq发送短信
spring.rabbitmq.sms.addresses=10.0.193.28:5672,10.0.193.29:5672
spring.rabbitmq.sms.username=admin
spring.rabbitmq.sms.password=123456
spring.rabbitmq.sms.virtual-host=msg
spring.rabbitmq.sms.publisher-confirms=true
spring.rabbitmq.sms.publisher-returns=true
#rabbitmq服务连接超时时间/ms
spring.rabbitmq.sms.connect.timeout=10000
sms.direct.exchange=exc.msgtosend
sms.direct.queue=que.msg.fast.batch
sms.direct.routingkey=msg_fast
1 package cn.drz.config;
2
3 import javax.annotation.Resource;
4
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 import org.springframework.amqp.core.Binding;
8 import org.springframework.amqp.core.BindingBuilder;
9 import org.springframework.amqp.core.DirectExchange;
10 import org.springframework.amqp.core.Queue;
11 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
12 import org.springframework.amqp.rabbit.connection.ConnectionFactory;
13 import org.springframework.amqp.rabbit.core.RabbitTemplate;
14 import org.springframework.beans.factory.annotation.Qualifier;
15 import org.springframework.beans.factory.annotation.Value;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Configuration;
18
19 /**
20 * 使用RabbitMq Direct交换器模式推送消息
21 * @author drz
22 *
23 */
24 @Configuration
25 public class RabbitMqConfig {
26
27 private final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
28
29 @Resource
30 private BasicConfig basicConfig;
31
32 @Bean
33 public Queue smsDirectQueue() {
34 return new Queue(basicConfig.getSmsDirectQueue());
35 }
36
37 @Bean
38 public DirectExchange smsDirectExchange() {
39 return new DirectExchange(basicConfig.getSmsDirectExchange());
40 }
41
42 @Bean
43 public Binding bindQueueToExchangeWithKey() {
44 return BindingBuilder.bind(smsDirectQueue()).to(smsDirectExchange())
45 .with(basicConfig.getSmsRoutingKey());
46 }
47
48 @Bean(name = "smsConnectionFactory")
49 public ConnectionFactory smsConnectionFactory(
50 @Value("${spring.rabbitmq.sms.addresses}") String addresses,
51 @Value("${spring.rabbitmq.sms.username}") String username,
52 @Value("${spring.rabbitmq.sms.password}") String password,
53 @Value("${spring.rabbitmq.sms.virtual-host}") String virtualHost,
54 @Value("${spring.rabbitmq.sms.publisher-confirms}") boolean publisherConfirms,
55 @Value("${spring.rabbitmq.sms.publisher-returns}") boolean publisherReturns) {
56 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
57 connectionFactory.setAddresses(addresses);
58 connectionFactory.setUsername(username);
59 connectionFactory.setPassword(password);
60 connectionFactory.setVirtualHost(virtualHost);
61 connectionFactory.setPublisherConfirms(publisherConfirms);
62 connectionFactory.setPublisherReturns(publisherReturns);
63 connectionFactory.setConnectionTimeout(basicConfig.getSmsConnectTimeout());
64 return connectionFactory;
65 }
66
67 @Bean(name = "smsRabbitTemplate")
68 public RabbitTemplate smsRabbitTemplate(
69 @Qualifier("smsConnectionFactory") ConnectionFactory connectionFactory) {
70 RabbitTemplate smsRabbitTemplate = new RabbitTemplate(connectionFactory);
71 smsRabbitTemplate.setMandatory(true);
72 logger.info("加载smsRabbitTemplate完成");
73 return smsRabbitTemplate;
74 }
75 }
1 package cn.drz.config;
2
3 import org.springframework.beans.factory.annotation.Value;
4 import org.springframework.stereotype.Component;
5
6 import lombok.Getter;
7 import lombok.Setter;
8
9 @Getter
10 @Setter
11 @Component
12 public class BasicConfig {
13
14 @Value("${sms.direct.queue}")
15 private String smsDirectQueue;
16
17 @Value("${sms.direct.exchange}")
18 private String smsDirectExchange;
19
20 @Value("${sms.direct.routingkey}")
21 private String smsRoutingKey;
22
23 @Value("${spring.rabbitmq.sms.connect.timeout}")
24 private int smsConnectTimeout;
25
26 }
1 package cn.drz.service.open;
2
3 import cn.drz.model.Message;
4
5 public interface SmsService {
6 /**
7 * 发送短信
8 */
9 void sendMsg(Message message) throws Exception;
10
11 }
1 package cn.drz.model;
2
3 import java.io.Serializable;
4
5 import lombok.Getter;
6 import lombok.Setter;
7 @Getter
8 @Setter
9 public class Message implements Serializable {
10
11
12 String mobile;
13
14 String content;
15
16 }
1 package cn.drz.service.open.impl;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.UUID;
9
10 import javax.annotation.Resource;
11
12 import org.codehaus.jackson.map.ObjectMapper;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.springframework.amqp.rabbit.core.RabbitTemplate;
16 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
17 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
18 import org.springframework.amqp.rabbit.support.CorrelationData;
19 import org.springframework.beans.factory.annotation.Autowired;
20 import org.springframework.stereotype.Service;
21
22 import cn.drz.config.BasicConfig;
23 import cn.drz.Message;
24 import cn.drz.service.open.SmsService;
25
26 @Service("smsService")
27 public class SmsServiceImpl implements SmsService, ConfirmCallback, ReturnCallback {
28 private static final Logger logger = LoggerFactory.getLogger(SmsServiceImpl.class);
29
30 @Resource
31 private BasicConfig basicConfig;
32
33 @Resource
34 private RabbitTemplate smsRabbitTemplate;
35
36 /**
37 * 实现消息发送到RabbitMQ交换器后接收ack回调
38 */
39 @Override
40 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
41
42 String mqId = correlationData.getId();
43 if (ack) {
44 logger.info("RabbitMq消息发送到交换器{}成功,mqid={}", smsRabbitTemplate.getExchange(), mqId);
45 } else {
46 logger.error("RabbitMq消息发送到交换器{}失败,mqid={},失败原因={}", smsRabbitTemplate.getExchange(),
47 mqId, cause);
48 //do something
49 }
50 }
51
52 /**
53 * 实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
54 */
55 @Override
56 public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
57 String replyText, String exchange, String routingKey) {
58 String errInfo = "消息发送失败,未找到指定队列:replyCode=" + replyCode + ",replyText=" + replyText
59 + ",exchange=" + exchange + ",routingKey=" + routingKey + ",message=" + message;
60 logger.error(errInfo);
61 //do something
62 }
63
64 @Override
65 public void sendMsg(Message message) throws Exception {
66 String mobiles = message.getMobile();
67 String[] mobileList = mobiles.split(",");
68 for (int i = 0; i ) {
69 sendRabbitMQ(mobileList[i], message.getContent());
70 }
71
72 }
73
74 private void sendRabbitMQ(String mobile, String content) throws Exception {
75
76 String mqId = "NGDP" + System.currentTimeMillis()
77 + UUID.randomUUID().toString().replace("-", "").substring(0, 20);
78
79 List