ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

RocketMQ(三)——————javaAPI(7.事务消息)

2021-03-30 18:05:11  阅读:206  来源: 互联网

标签:事务 System 消息 javaAPI println LocalTransactionState RocketMQ out


 

Half Message:

预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

检查事务状态:

Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,

 

每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

 

超时:

如果超过回查次数,默认回滚消息

 

TransactionListener的两个方法

 executeLocalTransaction

半消息发送成功触发此方法来执行本地事务

checkLocalTransaction

broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态

 本地事务执行状态

LocalTransactionState.COMMIT_MESSAGE

执行事务成功,确认提交

LocalTransactionState.ROLLBACK_MESSAGE

回滚消息,broker端会删除半消息

LocalTransactionState.UNKNOW

暂时为未知状态,等待broker回查

 

 

1、生产者样例

//1.发送事务消息
    public static void main(String[] args) throws Exception {

        TransactionMQProducer producer = new TransactionMQProducer("TransactionGroup");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.setTransactionListener(new TransactionListener() {
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {

                System.out.println("==e xecuteLocalTransaction==");
                System.out.println("message-body : "+message.getBody());
                System.out.println("message-TransactionId : "+message.getTransactionId());

                try {

                    //业务

                }catch (Exception e){

                    //回滚消息,broker端会删除半消息
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                //执行事务成功,确认提交
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

                System.out.println("==c heckLocalTransaction==");
                System.out.println("message-body : "+new String(messageExt.getBody()));
                System.out.println("message-TransactionId : "+messageExt.getTransactionId());

                //暂时为未知状态,等待broker回查
                //return LocalTransactionState.UNKNOW;
                //回滚消息,broker端会删除半消息
                //return LocalTransactionState.ROLLBACK_MESSAGE;

                //执行事务成功,确认提交
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();
        TransactionSendResult sendResult = producer.sendMessageInTransaction(new Message
                ("TransactionTopic", "事务消息!".getBytes()), null);

        System.out.println("sendResult : "+sendResult);
        producer.shutdown();
        System.out.println("生产者下线!");

    }

 

 

 

2、消费者样例

//1.接收事务消息
    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionGroup");

        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.subscribe("TransactionTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

                for (MessageExt mes: list) {

                    System.out.println("mes : "+new String(mes.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer  start...");
    }

 

标签:事务,System,消息,javaAPI,println,LocalTransactionState,RocketMQ,out
来源: https://www.cnblogs.com/lifan12589/p/14597995.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有