RocketMQ(三)——————javaAPI(7.事务消息)
2021-06-07 00:03
标签:cer status except str 生产 currently res 提交 定时 Half Message: 预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中 检查事务状态: Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息, 每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。 超时: 如果超过回查次数,默认回滚消息 TransactionListener的两个方法 executeLocalTransaction 半消息发送成功触发此方法来执行本地事务 checkLocalTransaction broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态 本地事务执行状态 LocalTransactionState.COMMIT_MESSAGE 执行事务成功,确认提交 LocalTransactionState.ROLLBACK_MESSAGE 回滚消息,broker端会删除半消息 LocalTransactionState.UNKNOW 暂时为未知状态,等待broker回查 1、生产者样例 2、消费者样例 RocketMQ(三)——————javaAPI(7.事务消息) 标签:cer status except str 生产 currently res 提交 定时 原文地址:https://www.cnblogs.com/lifan12589/p/14597995.html//1.发送事务消息
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("==e xecuteLocalTransaction==");
System.out.println("message-body : "+message.getBody());
System.out.println("message-TransactionId : "+message.getTransactionId());
try {
//业务
}catch (Exception e){
//回滚消息,broker端会删除半消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//执行事务成功,确认提交
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("==c heckLocalTransaction==");
System.out.println("message-body : "+new String(messageExt.getBody()));
System.out.println("message-TransactionId : "+messageExt.getTransactionId());
//暂时为未知状态,等待broker回查
//return LocalTransactionState.UNKNOW;
//回滚消息,broker端会删除半消息
//return LocalTransactionState.ROLLBACK_MESSAGE;
//执行事务成功,确认提交
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
TransactionSendResult sendResult = producer.sendMessageInTransaction(new Message
("TransactionTopic", "事务消息!".getBytes()), null);
System.out.println("sendResult : "+sendResult);
producer.shutdown();
System.out.println("生产者下线!");
}
//1.接收事务消息
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TransactionTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List
文章标题:RocketMQ(三)——————javaAPI(7.事务消息)
文章链接:http://soscw.com/index.php/essay/91498.html