Spring boot 2.x 集成Rocketmq实现事物消息

2020-12-13 05:41

阅读:434

标签:定时   hold   stat   ali   try   img   sum   none   str   

1.引入相关Maven依赖:

技术图片技术图片

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0com.xxx
    rocket
    1.0-SNAPSHOTorg.springframework.boot
        spring-boot-starter-parent
        2.0.5.RELEASEUTF-8UTF-81.8org.springframework.boot
            spring-boot-starter-web
        org.springframework.boot
            spring-boot-starter-test
            testorg.springframework.boot
            spring-boot-starter
        org.springframework.boot
            spring-boot-actuator-autoconfigure
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
              2.0.1org.projectlombok
            lombok
            1.16.14org.springframework.boot
                spring-boot-maven-plugin
            
View Code

 

2.配置生产者:

  2.1 application.properties 配置如下:

####producer
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=  my-group
rocketmq.producer.send-message-timeout= 300000
rocketmq.producer.compress-message-body-threshold= 4096
rocketmq.producer.max-message-size= 4194304
rocketmq.producer.retry-times-when-send-async-failed= 0
rocketmq.producer.retry-next-server= true
rocketmq.producer.retry-times-when-send-failed= 2

  2.2 事务监听:

技术图片技术图片
package com.xxx.listener;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@Slf4j
@RocketMQTransactionListener(txProducerGroup = "rocket")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("回查信息:" + JSON.toJSONString(message));

        return RocketMQLocalTransactionState.COMMIT;
    }
}
View Code

  2.3  发送事物消息:

技术图片技术图片
package com.xxx;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Slf4j
@SpringBootApplication
public class SpringBootRocketMqApplication {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);

        RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
        while (true) {
            String msg = "demo msg test";
            log.info("开始发送消息:"+msg);

            Message message = MessageBuilder.withPayload(msg).build();
            TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
            log.info("消息发送响应信息:"+result.toString());

            Thread.sleep(10);
        }
    }
}
View Code

 

3. 配置消费者:

  3.1 application.properties 配置如下:

    rocketmq.name-server=127.0.0.1:9876

 

  3.2 消费者监听:

技术图片技术图片
package com.xxx.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
@RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumer-group")
public class ConsumerLifecycleListener implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
    @Override
    public void onMessage(String s) {
        // 实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用
        log.info("实现RocketMQPushConsumerLifecycleListener监听器之后,此方法不调用");
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : msgs) {
                    System.out.println("重试次数:" + messageExt.getReconsumeTimes());

                    System.out.println("接受到的消息:" + new String(messageExt.getBody()));
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}
View Code

 

4. 延时消息
  RocketMQ 目前只支持固定精度的定时消息。

  延迟级别(18个等级)

  1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h  Message message = MessageBuilder.withPayload(msg).build();

  rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒

5. 顺序消息
  asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)

     通过指定hashkey实现顺序消费,同步的hashkey会按顺序消费

  

Spring boot 2.x 集成Rocketmq实现事物消息

标签:定时   hold   stat   ali   try   img   sum   none   str   

原文地址:https://www.cnblogs.com/yx88/p/11146484.html

上一篇:Web服务之Nginx浅析

下一篇:CSS选取器


评论


亲,登录后才可以留言!