ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RocketMQ入门

2021-10-08 20:30:42  阅读:152  来源: 互联网

标签:入门 producer 消息 msg new consumer public RocketMQ


RocketMQ

consumer消费消息的两种模式:

1、并发消费

2、顺序消费

consumer如何消费:

1、broker推送消息到consumer

2、consumer拉取broker中的消息

一、Windows环境下载及安装

1、下载地址:

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.0/rocketmq-all-4.9.0-bin-release.zip

2、安装:

解压到任意文件夹下,我的:D:\rocketMQ\rocketmq-all-4.9.0-source-release

3、配置环境变量

ROCKETMQ_HOME=D:\rocketMQ\rocketmq-all-4.9.0-bin-release
NAMESRV_ADDR=localhost:9876

4、启动nameServer:

.\bin\mqnamesrv.cmd

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LNxmQHdt-1633695784222)(/1625489325760.png)]

5、启动broker:

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YCGlwY8S-1633695784235)(/1625489342147.png)]

6、测试:

发送消息:

.\tools.cmd org.apache.rocketmq.example.quickstart.Producer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hKD4A5sc-1633695784238)(/1625489666961.png)]

接收消息:

.\tools.cmd org.apache.rocketmq.example.quickstart.Consumer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qRAumKev-1633695784240)(/1625489684782.png)]

7、安装可视化界面:

下载:https://github.com/apache/rocketmq-externals

进入项目的:D:\rocketMQ\插件\rocketmq-externals\rocketmq-console\src\main\resources下,修改application.properties文件。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aZiSic77-1633695784241)(/1625490397542.png)]

进入D:\rocketMQ\插件\rocketmq-externals\rocketmq-console目录,cmd之后:

对项目打包编译:

mvn clean package -Dmaven.test.skip=true

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ea852CQr-1633695784243)(/1625490830927.png)]

打包成功,在target目录下生成jar包:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sjE0CRQQ-1633695784244)(/1625490861047.png)]

运行:

java -jar rocketmq-console-ng-2.0.0.jar &

输入:

127.0.0.1:端口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Peei1e7K-1633695784245)(/1625491439440.png)]

二、简单例子快速入门

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

三种方式发送消息:同步、异步、单向传输

同步:

可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等

public class SyncProducer {

    /**
     * 同步发送消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例,指定主题、标记和消息主体。
            // new Message(主题、标记、消息体)
            Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
            //发送消息到一个 broker.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //关闭生产者
        producer.shutdown();
    }

}

异步:

异步传输一般用于响应时间敏感的业务场景。

public class AsyncProducer {

    /**
     * 发送异步消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化
        DefaultMQProducer producer = new DefaultMQProducer("asyncProducer");
        // 指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        //消息发送失败,重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 10;
        //等待10个线程执行完后再执行
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("asyncProducer-Topic", "asyncProducer-Tag", "asyncProducer-key", "asyncProducer-Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //消息发送的回调执行SendCallback
                producer.send(msg, new SendCallback() {

                    /**
                     * 消息发送成功的回调
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    /**
                     * 消息发送出现异常的回调
                     * @param e
                     */
                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 10s后主线程被唤醒,不管countDown()被执行多少次
        countDownLatch.await(10, TimeUnit.SECONDS);
        // 关闭
        producer.shutdown();
    }
}

单向传输:

单向传输用于需要中等可靠性的情况,例如日志收集。

public class OneWayProducer {
    /**
     * 单向发送消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        //实例化生产组
        DefaultMQProducer producer = new DefaultMQProducer("oneWayProducer");
        // 指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例、指定主题、标记、消息体
            Message msg = new Message("oneWayProducer-Topic", "oneWayProducer-Tag", ("oneWayProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送消息到 broker
            producer.sendOneway(msg);
        }
        //等待发送完成
        Thread.sleep(5000);
        producer.shutdown();
    }
}

消费消息:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("oneWayProducer-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行,并发消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、广播消息

广播正在向主题的所有订阅者发送消息。如果您希望所有订阅者都收到有关某个主题的消息,广播是一个不错的选择。默认集群模式。

生产者:

public class RadioProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("radioProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 10; i++){
            Message msg = new Message("radioProducer-TopicTest", "radioProducer-Tag", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

消费者订阅:

public class RadioConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("radioProducer");
        consumer.setNamesrvAddr("localhost:9876");
        // 设置从队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置为广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //订阅一个主题来使用
        consumer.subscribe("radioProducer-TopicTest", "radioProducer-Tag");
        //注册监听,并发消费
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("11111111111111111111111");
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }
}

四、顺序消息

RocketMQ 使用 FIFO 顺序提供有序消息,即消息发送时,将消息发送至同一个MessageQueue中,实现消息的局部顺序消费。

生产者:

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        MQProducer producer = new DefaultMQProducer("orderGroupName");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("order-Topic", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

消费者:

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderGroupName");
        //从消息队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅
        consumer.subscribe("order-Topic", "TagA || TagC || TagD");
        //注册消息监听,有序消费
        consumer.registerMessageListener(new MessageListenerOrderly() {

            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    //正确消费
                    return ConsumeOrderlyStatus.SUCCESS;
                } else if ((this.consumeTimes.get() % 3) == 0) {
                    //回滚
                    return ConsumeOrderlyStatus.ROLLBACK;
                } else if ((this.consumeTimes.get() % 4) == 0) {
                    //提交
                    return ConsumeOrderlyStatus.COMMIT;
                } else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    //稍后消费
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

五、延迟消费

消息发送至broker后,等待一段时间之后,再推送至消费者那消费。

生产者:

public class SyncProducer {

    /**
     * 同步发送消息-----延迟消息
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("syncProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建一个消息实例,指定主题、标记和消息主体。
            // new Message(主题、标记、消息体)
            Message msg = new Message("syncProducer-Topic" , "syncProducer-Tag" , ("syncProducer-Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
            //延迟消息设置延迟等级,开源版的RocketMQ不支持自定义的消息等级,只能修改18个等级中的时间
            //delayTimeLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
            //上述等级可以在拓展的集群配置中修改
            msg.setDelayTimeLevel(3);
            //发送消息到一个 broker.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //关闭生产者
        producer.shutdown();
    }

}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oneWayProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("oneWayProducer-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

六、批量消息

将多条消息合成一个批量消息,一次发送出去。好处:减少网络IO、提高吞吐量

使用限制:

同一批次的消息应该具有:相同的主题,相同的 waitStoreMsgOK 并且没有调度支持.

一批消息的总大小不应超过 1MiB

生产者:

1、如果一次发送的数据不大于1M时,可以采用如下的方式发送:

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();

        String topic = "Batch-Topic";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));
        //发送数据
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);
        //关闭生产者
        producer.shutdown();
    }
}

2、如果发送的消息一次超过1M,可以拆分为列表:

public class BatchSplitProducer {
    public static void main(String[] args) throws Exception {
        //实例化生产组名称
        DefaultMQProducer producer = new DefaultMQProducer("batchProducer");
        // 指定nameserver器地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();

        String topic = "Batch-Topic";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic,"TagA","batchKey01","Hello batch 0".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey02","Hello batch 1".getBytes(StandardCharsets.UTF_8)));
        messages.add(new Message(topic,"TagA","batchKey03","Hello batch 2".getBytes(StandardCharsets.UTF_8)));

        //然后你可以将大列表拆分为小列表
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            try {
                List<Message>  listItem = splitter.next();
                producer.send(listItem);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //关闭生产者
        producer.shutdown();
    }
}
/**
 * 消息切割
 */
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1000 * 1000; //最大数据限制
    private final List<Message> messages;  //数据
    private int currIndex;  //当前消息下标

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }
    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }
    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            //计算单个消息的大小
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead

            if (tmpSize > SIZE_LIMIT) {
                //单个消息大于最大限制,则丢弃,否则会阻塞在此
                if (nextIndex - currIndex == 0) {
                    //如果是最后一个元素,如果下一个列表没有元素,则加入这个消息之后就退出,否则退出
                   nextIndex++;  
                }
                break;
            }

            //如果当前单个消息 + 已经计算的消息大小 > 最大限制,则退出,否则累加
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }

        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batchProducer");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("Batch-Topic", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "线程======>");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

七、过滤消费

rocketMQ有sql进行消息过滤,消息过滤是在broker端进行的,consumer将sql过滤表达式推送至broker,broker过滤数据之后推送消息到consumer,减少网络数据传输。

消息过滤的两种方式:

1、TAG

2、SQL92

前提:需要在/conf/broker.conf配置文件加入支持过滤的属性:

enablePropertyFilter=true

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DmhTCbNC-1633695784246)(/1625883963295.png)]

生产者:

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("filter_group");
        producer.start();

        String[] tags = {"tagA","tagB","tagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("filterTopic",
                    tags[i% tags.length],
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            //设置property:a 就是下标
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}

消费者:

public class FilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_group");

        // only subsribe messages have property a, also a >=0 and a <= 3
        consumer.subscribe("filterTopic", MessageSelector.bySql("a between 0 and 3"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

sql表达式:

基本语法:
1、数值比较,如>, >=, <, <=, BETWEEN, =;
2、字符比较,如=, <>, IN;
3、IS NULL或IS NOT NULL;
4、逻辑AND, OR, NOT;
常量类型:
1、数字,如 123、3.1415;
2、字符,如'abc',必须用单引号引起;
3、NULL,特殊常数;
4、布尔值,TRUE或FALSE;

八、事务消息

什么是事务性消息?

它可以被认为是两阶段提交消息的实现,以确保分布式系统中的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。

使用限制

(1) 事务性消息没有调度和批处理支持。
(2) 为了避免单条消息被检查次数过多导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但用户可以通过更改“transactionCheckMax ”参数在broker的配置中,如果一条消息被检查了“transactionCheckMax”次,broker默认会丢弃这条消息并同时打印错误日志。用户可以通过覆盖“AbstractTransactionCheckListener”类来更改此行为。
(3) 交易消息在一定时间后将被检查,该时间由代理配置中的参数“transactionTimeout”确定。并且用户也可以在发送事务消息时通过设置用户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制,这个参数优先于“transactionMsgTimeout”参数。
(4) 一条交易消息可能被检查或消费不止一次。
(5) 向用户目标主题提交的消息回复可能会失败。目前,这取决于日志记录。高可用是由 RocketMQ 本身的高可用机制来保证的。如果要保证事务消息不丢失,保证事务完整性,建议使用同步双写。机制。
(6) 事务性消息的生产者ID不能与其他类型消息的生产者ID共享。与其他类型的消息不同,事务性消息允许向后查询。MQ Server 通过生产者 ID 查询客户端。

Transaction状态

事务消息有三种状态:
(1) TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费这条消息。
(2) TransactionStatus.RollbackTransaction:回滚事务,表示消息将被删除,不允许消费。
(3) TransactionStatus.Unknown:中间状态,表示需要MQ回检来确定状态。

生产者:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        //线程池,作用?
        ExecutorService executorService = new ThreadPoolExecutor(
            2,
                5,
            100,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });
        //设置线程池
        producer.setExecutorService(executorService);
        //设置事务监听器
        producer.setTransactionListener(transactionListener);
        producer.start();
        //发送消息,分别由5个tag,每个tag2条消息
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                //null,所有消息都是事务消息,也可以指定某一条消息是事务消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //等待
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
/**
 * 事务监听器
 */
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        Integer status = transactionIndex.getAndIncrement() % 3;
        //map保存<key,value>
        localTrans.put(msg.getTransactionId(), status);
        System.out.println("transactionId:" + msg.getTransactionId() + "----status:" + status);
        //消息未确定,需要消息回查确定
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 消息回查
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个主题来使用
        consumer.subscribe("TopicTest1234", "*");
        // 注册回调,以便从broker中及时拉取消息执行
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("=====================消息======================");
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody(),0,msg.getBody().length));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

标签:入门,producer,消息,msg,new,consumer,public,RocketMQ
来源: https://blog.csdn.net/Linging_24/article/details/120659175

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有