springBoot整合RabbitMQ(新手整合请勿喷)
2021-02-09 16:19
标签:转换 app prope 消费 重试 nfa public oid container 然后就开始搭建配置项,在springboot项目里的application.properties增加rabbitMQ配置 然后搭建rabbitMQ配置 RabbitMQConfig 然后定义初始化监听器方法 MQListenerConfig 初始化监听方法以注解形式获取消费者的队列以及监听器 初始交换机以及绑定关系接口 初始交换机以及绑定关系实现类分别为DirectMQServiceImpl,FanoutMQServiceImpl,TopicMQServiceImpl 自定义注解 自定义消费者 AbstractConsumer,此消费者用于通用,每多一个消费者只需继承,然后处理业务逻辑即可 有什么不完美的地方请各位多多指教~!,新手第一次入坑 springBoot整合RabbitMQ(新手整合请勿喷) 标签:转换 app prope 消费 重试 nfa public oid container 原文地址:https://blog.51cto.com/11152994/2489181
# rabbitMQ配置项
# rabbitmq访问域名
spring.rabbitmq.host=127.0.0.1
# rabbitmq端口号
spring.rabbitmq.port=5672
# rabbitMq账号
spring.rabbitmq.username=
# rabbitMq密码
spring.rabbitmq.password=
# 开启confirms回调 P-> exchange
spring.rabbitmq.publisher-confirms=true
#开启returnedMessage回调Exchange->Queue
spring.rabbitmq.publisher-returns=true
#设置手动确认(ack)Queue->C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
spring.rabbitmq.template.mandatory=true
#开启消费者重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数(重试5次还不行则会把消息删掉,默认是不限次数的,次数建议控制在10次以内)
spring.rabbitmq.listener.simple.retry.max-attempts=5
#重试间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=3000
spring.rabbitmq.virtual-host=/
@Configuration
public class RabbitMQConfig {
private Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 接受数据自动的转换为Json
*/
@Bean("messageConverter")
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean("rabbitTemplate")
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter());
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack) {
logger.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
String routingKey) {
logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
replyCode, replyText, message);
}
});
return rabbitTemplate;
}
@Bean("rabbitAdmin")
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
System.err.println("RabbitAdmin启动了。。。");
// 设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
@Configuration
public class MQListenerConfig {
@Bean
public MessageListenerConfig messageListenerConfig(RabbitAdmin admin,
CachingConnectionFactory rabbitConnectionFactory)
throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
MessageListenerConfig messageListenerConfig = new MessageListenerConfig();
messageListenerConfig.init(admin, rabbitConnectionFactory);
return messageListenerConfig;
}
}
@Component
public class MessageListenerConfig {
public void init(RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory)
throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
Map
public interface AbstractMQService {
static final String SERVICE_NAME = "MQService";
/**
* 初始化交换机
* @return
*/
public AbstractExchange initExchange(String exchangeName);
/**
* 初始化绑定关系
* @param routeKey
* @return
*/
public Binding initBinding(Queue queue,AbstractExchange exchange,String routeKey);
}
@Service("directMQService")
public class DirectMQServiceImpl implements AbstractMQService {
@Override
public AbstractExchange initExchange(String exChangeName) {
DirectExchange exchange = new DirectExchange(exChangeName);
return exchange;
}
@Override
public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
DirectExchange exchange = (DirectExchange) exChange;
DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
DirectExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
return routKeyConfigurer.with(routeKey);
}
}
@Service("fanoutMQService")
public class FanoutMQServiceImpl implements AbstractMQService {
@Override
public AbstractExchange initExchange(String exChangeName) {
FanoutExchange exchange = new FanoutExchange(exChangeName);
return exchange;
}
@Override
public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
FanoutExchange exchange = (FanoutExchange) exChange;
DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
Binding binding = bindConfigurer.to(exchange);
return binding;
}
}
@Service("topicMQService")
public class TopicMQServiceImpl implements AbstractMQService {
@Override
public AbstractExchange initExchange(String exChangeName) {
TopicExchange exchange = new TopicExchange(exChangeName);
return exchange;
}
@Override
public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
TopicExchange exchange = (TopicExchange) exChange;
DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
TopicExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
return routKeyConfigurer.with(routeKey);
}
}
@Target(value = { ElementType.FIELD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface RabbitMq {
/**
* 队列
*
* @return
*/
public String queues() default "";
/**
* 交换器
*
* @return
*/
public String exchange() default "";
/**
* 路由规则
*
* @return
*/
public String routingKey() default "";
/**
* 是否持久化
*
* @return
*/
public boolean isPersistence() default true;
/**
* 确认模式
*
* @return
*/
public AcknowledgeMode mode() default AcknowledgeMode.MANUAL;
/**
* 每个队列消费者数量
*
* @return
*/
public int consumersPerQueue() default 1;
/**
* 交换类型
*
* @return
*/
public String exchangeTypes() default ExchangeTypes.DIRECT;
}
public abstract class AbstractConsumer extends MessagingMessageListenerAdapter {
protected static final String MQ_CORRELATIONDATA_KEY = "spring_returned_message_correlation";
public static final String MQ_CACHE_MQ_KEY = "rabbitMQ.queues:";
public static final Integer FAIL_MAX_COUNT = 5;
private RedisService redisService = SpringUtil.getBean(RedisService.class);
@Override
public void onMessage(Message message, Channel channel) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
String correlationId = (String) message.getMessageProperties().getHeaders().get(MQ_CORRELATIONDATA_KEY);
String queues = messageProperties.getConsumerQueue();
String cacheKey = new StringBuilder().append(MQ_CACHE_MQ_KEY).append(queues).append(":").append(correlationId).toString();
Integer failCount = (Integer)redisService.get(cacheKey);
try {
this.handleMessage(new String(message.getBody(), "UTF-8"));
channel.basicAck(deliveryTag, false);
redisService.del(new StringBuilder().append(correlationId).toString());
} catch (Exception e) {
if(failCount > FAIL_MAX_COUNT) {
return;
}
redisService.incr(cacheKey, 1, new Long(CacheTime.CACHE_EXP_THIRTY_SECONDS));
channel.basicNack(deliveryTag, false, false);
}
}
public abstract void handleMessage(String message);
}
上一篇:数组的操作方法
文章标题:springBoot整合RabbitMQ(新手整合请勿喷)
文章链接:http://soscw.com/index.php/essay/53171.html