RabbitMQ - 02Spring整合XML形式
2021-03-07 17:30
标签:ace one @param sys use turn top 当前时间 out 测试启动类 消费者 resource/rabbitmq.xml 测试结果 ConfirmCallBackListener ReturnCallBackListener ReceiveConfirmTestListener 工具类 测试类 test1()测试结果: test2() 测试结果 确认回调, ack: false ; cause: channel error; protocol method: #method test3() 测试结果 失败 message:当前时间为:1609902810084 replyCode: 312 replyText: NO_ROUTE exchange: directExchange routingKey: confirm_test1 test4() 测试结果 确认回调, ack: false ; cause: channel error; protocol method: #method RabbitMQ - 02Spring整合XML形式 标签:ace one @param sys use turn top 当前时间 out 原文地址:https://www.cnblogs.com/kingdomer/p/14240304.htmlRabbitMQ - 02Spring整合XML形式
(1)XML形式:方式一
public class SpringTest {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("rabbitmq.xml");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("Spring整合RabbitMQXML消息");
((ClassPathXmlApplicationContext) context).destroy();
}
}
public class MyConsumer {
/**
* 用于接收消息
* @param message
*/
public void test(String message){
System.out.println("接收到的消息为: "+message);
}
}
xml version="1.0" encoding="UTF-8" ?>
beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
rabbit:connection-factory id="connectionFactory"
host="192.168.100.86" port="5672" username="admin" password="admin"
virtual-host="/test" publisher-confirms="true">rabbit:connection-factory>
rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
rabbit:admin connection-factory="connectionFactory"/>
rabbit:queue name="myQueue2" auto-declare="true"/>
rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
rabbit:bindings>
rabbit:binding queue="myQueue2">rabbit:binding>
rabbit:bindings>
rabbit:fanout-exchange>
bean id="consumer" class="com.bearpx.rabbitmq.spring.MyConsumer"/>
rabbit:listener-container connection-factory="connectionFactory">
rabbit:listener ref="consumer" method="test" queue-names="myQueue2"/>
rabbit:listener-container>
beans>
rabbit:direct-exchange name="directExchange" durable="true" auto-declare="false">
rabbit:bindings>
rabbit:binding queue="myQueue2" key="key2">rabbit:binding>
rabbit:bindings>
rabbit:direct-exchange>
rabbit:topic-exchange name="topicExchange" durable="true" auto-delete="false">
rabbit:bindings>
rabbit:binding pattern="key.*" queue="myQueue2">rabbit:binding>
rabbit:bindings>
rabbit:topic-exchange>
(2)XML形式:方式二
(2.1)resource/applicationContext.xml
xml version="1.0" encoding="UTF-8" ?>
beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
context:component-scan base-package="com.bearpx.rabbitmq">context:component-scan>
bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">bean>
rabbit:connection-factory id="connectionFactory"
host="192.168.100.86" port="5672" username="admin" password="admin"
virtual-host="/test" publisher-confirms="true">rabbit:connection-factory>
rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallBackListener"
return-callback="returnCallBackListener"
mandatory="true"/>
rabbit:admin connection-factory="connectionFactory"/>
rabbit:queue name="confirm_test" auto-declare="true"/>
rabbit:direct-exchange name="directExchange" id="directExchange">
rabbit:bindings>
rabbit:binding queue="confirm_test">rabbit:binding>
rabbit:bindings>
rabbit:direct-exchange>
rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
rabbit:listener ref="receiveConfirmTestListener" queues="confirm_test"/>
rabbit:listener-container>
beans>
@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("确认回调, ack: "+ack +" ; cause: " + cause);
}
}
@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("失败 message:" + new String(message.getBody()) +" replyCode: "+ replyCode + " replyText: "+ replyText +
" exchange: "+ exchange +" routingKey: "+ routingKey);
}
}
@Component("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
/**
* 收到消息时 执行的监听*/
@Override
public void onMessage(Message message, Channel channel) throws Exception{
try{
System.out.println("消费者收到了消息: "+ message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
e.printStackTrace();
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
@Component("publishUtil")
public class PublishUtil {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String exchange, String routingKey, Object message){
amqpTemplate.convertAndSend(exchange,routingKey, message);
}
}
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestMain {
@Autowired
private PublishUtil publishUtil;
private static String EXCHANGE_NAME="directExchange";
private static String QUEUE_NAME="confirm_test";
// EXCHANGE QUEUE 都对, confirm 会执行, ack=true
@Test
public void test1() throws Exception{
String message = "当前时间为:" + System.currentTimeMillis();
publishUtil.send(EXCHANGE_NAME,QUEUE_NAME, message);
}
}
消费者收到了消息: (Body:‘当前时间为:1609902615382‘ MessageProperties [
headers={spring_listener_return_correlation=43ec414b-86aa-4b5d-a664-db5ede6a31dd},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
redelivered=false, receivedExchange=directExchange, receivedRoutingKey=confirm_test, deliveryTag=1, consumerTag=amq.ctag-Q5xlehjut9wdFxiMEUpiGw, consumerQueue=confirm_test
])
确认回调, ack: true ; cause: null // EXCHANGE 错误 QUEUE 对, confirm 会执行, ack=false
@Test
public void test2() throws Exception{
String message = "当前时间为:" + System.currentTimeMillis();
publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME, message);
}
// EXCHANGE 对 QUEUE 错误, confirm 会执行, ack=true,失败会执行
@Test
public void test3() throws Exception{
String message = "当前时间为:" + System.currentTimeMillis();
publishUtil.send(EXCHANGE_NAME,QUEUE_NAME+"1", message);
Thread.sleep(2000);
}
确认回调, ack: true ; cause: null // EXCHANGE QUEUE 都错误, confirm 会执行, ack=false
@Test
public void test4() throws Exception{
String message = "当前时间为:" + System.currentTimeMillis();
publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME+"1", message);
}