Spring boot 2.x 集成Rocketmq实现事物消息
2020-12-13 05:41
标签:定时 hold stat ali try img sum none str 1.引入相关Maven依赖: 2.配置生产者: 2.1 application.properties 配置如下: 2.2 事务监听: 2.3 发送事物消息: 3. 配置消费者: 3.1 application.properties 配置如下: rocketmq.name-server=127.0.0.1:9876 3.2 消费者监听: 4. 延时消息 延迟级别(18个等级) 1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h Message message = MessageBuilder.withPayload(msg).build(); rocketMQTemplate.syncSend(topic, message,1000,2);//表示延时5秒 5. 顺序消息 Spring boot 2.x 集成Rocketmq实现事物消息 标签:定时 hold stat ali try img sum none str 原文地址:https://www.cnblogs.com/yx88/p/11146484.html####producer
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group= my-group
rocketmq.producer.send-message-timeout= 300000
rocketmq.producer.compress-message-body-threshold= 4096
rocketmq.producer.max-message-size= 4194304
rocketmq.producer.retry-times-when-send-async-failed= 0
rocketmq.producer.retry-next-server= true
rocketmq.producer.retry-times-when-send-failed= 2
package com.xxx.listener;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "rocket")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("本地事务和消息发送:" + JSON.toJSONString(message));
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("回查信息:" + JSON.toJSONString(message));
return RocketMQLocalTransactionState.COMMIT;
}
}
package com.xxx;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Slf4j
@SpringBootApplication
public class SpringBootRocketMqApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(SpringBootRocketMqApplication.class, args);
RocketMQTemplate template = context.getBean(RocketMQTemplate.class);
while (true) {
String msg = "demo msg test";
log.info("开始发送消息:"+msg);
Message message = MessageBuilder.withPayload(msg).build();
TransactionSendResult result = template.sendMessageInTransaction("rocket", "ts", message, null);
log.info("消息发送响应信息:"+result.toString());
Thread.sleep(10);
}
}
}
package com.xxx.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
@RocketMQMessageListener(topic = "ts", consumerGroup = "my-consumer-group")
public class ConsumerLifecycleListener implements RocketMQListener
RocketMQ 目前只支持固定精度的定时消息。
asyncSendOrderly(String destination, Object payload, String hashKey, SendCallback sendCallback)
通过指定hashkey实现顺序消费,同步的hashkey会按顺序消费
上一篇:Web服务之Nginx浅析
下一篇:CSS选取器
文章标题:Spring boot 2.x 集成Rocketmq实现事物消息
文章链接:http://soscw.com/essay/31574.html