RabbitMQ - 02Spring整合XML形式

2021-03-07 17:30

阅读:570

标签:ace   one   @param   sys   use   turn   top   当前时间   out   

RabbitMQ - 02Spring整合XML形式

(1)XML形式:方式一

测试启动类

public class SpringTest {
    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("rabbitmq.xml");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend("Spring整合RabbitMQXML消息");
        ((ClassPathXmlApplicationContext) context).destroy();
    }
}

消费者

public class MyConsumer {

    /**
     * 用于接收消息
     * @param message
     */
    public void test(String message){
        System.out.println("接收到的消息为: "+message);
    }
}

resource/rabbitmq.xml

xml version="1.0" encoding="UTF-8" ?>
beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd">
    
    rabbit:connection-factory id="connectionFactory"
                                 host="192.168.100.86" port="5672" username="admin" password="admin"
                                 virtual-host="/test" publisher-confirms="true">rabbit:connection-factory>
    
    rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
    rabbit:admin connection-factory="connectionFactory"/>
    
    rabbit:queue name="myQueue2" auto-declare="true"/>
    
    rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
        rabbit:bindings>
            rabbit:binding queue="myQueue2">rabbit:binding>
        rabbit:bindings>
    rabbit:fanout-exchange>

    
    bean id="consumer" class="com.bearpx.rabbitmq.spring.MyConsumer"/>
    
    rabbit:listener-container connection-factory="connectionFactory">
        
        rabbit:listener ref="consumer" method="test" queue-names="myQueue2"/>
    rabbit:listener-container>
beans>
    rabbit:direct-exchange name="directExchange" durable="true" auto-declare="false">
        rabbit:bindings>
            rabbit:binding queue="myQueue2" key="key2">rabbit:binding>
        rabbit:bindings>
    rabbit:direct-exchange>
    rabbit:topic-exchange name="topicExchange" durable="true" auto-delete="false">
        rabbit:bindings>
            rabbit:binding pattern="key.*" queue="myQueue2">rabbit:binding>
        rabbit:bindings>
    rabbit:topic-exchange>

测试结果

 技术图片

 

(2)XML形式:方式二

(2.1)resource/applicationContext.xml

xml version="1.0" encoding="UTF-8" ?>
beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/beans
           https://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           https://www.springframework.org/schema/context/spring-context.xsd">

    context:component-scan base-package="com.bearpx.rabbitmq">context:component-scan>
    bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">bean>
    
    rabbit:connection-factory id="connectionFactory"
                                 host="192.168.100.86" port="5672" username="admin" password="admin"
                                 virtual-host="/test" publisher-confirms="true">rabbit:connection-factory>
    
    rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                     confirm-callback="confirmCallBackListener"
                     return-callback="returnCallBackListener"
                     mandatory="true"/>
    rabbit:admin connection-factory="connectionFactory"/>

    rabbit:queue name="confirm_test" auto-declare="true"/>
    rabbit:direct-exchange name="directExchange" id="directExchange">
        rabbit:bindings>
            rabbit:binding queue="confirm_test">rabbit:binding>
        rabbit:bindings>
    rabbit:direct-exchange>

    rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
        rabbit:listener ref="receiveConfirmTestListener" queues="confirm_test"/>
    rabbit:listener-container>
beans>

ConfirmCallBackListener

@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("确认回调, ack: "+ack +" ; cause: " + cause);
    }
}

ReturnCallBackListener 

@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("失败 message:" + new String(message.getBody()) +" replyCode: "+ replyCode +  " replyText: "+ replyText +
                " exchange: "+ exchange +" routingKey: "+ routingKey);
    }
}

ReceiveConfirmTestListener 

@Component("receiveConfirmTestListener")
public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
    /**
     * 收到消息时 执行的监听*/
    @Override
    public void onMessage(Message message, Channel channel) throws Exception{
        try{
            System.out.println("消费者收到了消息: "+ message);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            e.printStackTrace();
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}

工具类

@Component("publishUtil")
public class PublishUtil {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String exchange, String routingKey, Object message){
        amqpTemplate.convertAndSend(exchange,routingKey, message);
    }
}

测试类

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:applicationContext.xml"})
public class TestMain {

    @Autowired
    private PublishUtil publishUtil;

    private static String EXCHANGE_NAME="directExchange";
    private static String QUEUE_NAME="confirm_test";

    // EXCHANGE QUEUE 都对, confirm 会执行, ack=true
    @Test
    public void test1() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME,QUEUE_NAME, message);
    }
}

test1()测试结果:

消费者收到了消息: (Body:‘当前时间为:1609902615382‘ MessageProperties [ 
headers={spring_listener_return_correlation=43ec414b-86aa-4b5d-a664-db5ede6a31dd},
contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0,
redelivered=false, receivedExchange=directExchange, receivedRoutingKey=confirm_test, deliveryTag=1, consumerTag=amq.ctag-Q5xlehjut9wdFxiMEUpiGw, consumerQueue=confirm_test
]) 确认回调, ack: true ; cause: null

 

test2()

    // EXCHANGE 错误  QUEUE 对, confirm 会执行, ack=false
    @Test
    public void test2() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME, message);
    }

测试结果

确认回调, ack: false ; cause: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)

 

test3()

    // EXCHANGE 对  QUEUE 错误, confirm 会执行, ack=true,失败会执行
    @Test
    public void test3() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME,QUEUE_NAME+"1", message);
        Thread.sleep(2000);
    }

测试结果

失败 message:当前时间为:1609902810084 replyCode: 312 replyText: NO_ROUTE exchange: directExchange routingKey: confirm_test1
确认回调, ack: true ; cause: null

 

test4()

    // EXCHANGE   QUEUE 都错误, confirm 会执行, ack=false
    @Test
    public void test4() throws Exception{
        String message = "当前时间为:" + System.currentTimeMillis();
        publishUtil.send(EXCHANGE_NAME+"1",QUEUE_NAME+"1", message);
    }

测试结果 

确认回调, ack: false ; cause: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange ‘directExchange1‘ in vhost ‘/test‘, class-id=60, method-id=40)

 

RabbitMQ - 02Spring整合XML形式

标签:ace   one   @param   sys   use   turn   top   当前时间   out   

原文地址:https://www.cnblogs.com/kingdomer/p/14240304.html


评论


亲,登录后才可以留言!