ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java-Spring AMQP集成-用户手册确认

2019-11-22 08:12:55  阅读:328  来源: 互联网

标签:spring-amqp rabbitmq spring-rabbit java spring-integration


我正在测试具有Spring-Integration支持的Spring-AMQP,我已经进行了配置和测试:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

在这种配置中,很明显,一旦消息到达consumptionChannel,它们就会被Acked并因此从队列中删除.我通过在接收并检查队列大小后设置高睡眠来验证这一点.对此没有进一步的控制.

现在,如果我设置了accept-mode = MANUAL,似乎没有办法通过spring集成来进行手动确认.

我需要处理消息,并在处理后执行手动确认,以便直到确认消息在持久性Q中保持不变.

有什么方法可以通过spring-amqp-integration处理MANUAL ack?我想避免将ChannelAwareMessageListener传递给inbound-channel-adapter,因为我想控制消费者的接收.

更新:

当将自己的listener-container与inbound-channel-adapter一起使用时,这似乎甚至是不可能的:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

由于不允许使用messageListener属性,因此上述配置会引发错误,请参见对标记的内联注释.因此,使用listner-container的目的就失败了(用于通过ChannelAwareMessageListener公开频道).

对我来说,spring-integration不能用于手动确认(我知道,这很难说!),有人可以帮助我进行验证吗?还是我缺少任何特定的方法/配置?

解决方法:

问题是因为您使用的是使用QueueChannel的异步切换.通常最好控制容器中的并发(concurrent-consumers =“ 2”),并且不要在流中进行任何异步切换(使用DirectChannels).这样,AUTO ACK就能正常工作.而不是从PollableChannel接收,而是将新的MessageHandler()订阅到SubscribableChannel.

更新:

您通常不需要在SI应用程序中处理Messages,但是与DirectChannel进行的测试等效的是…

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });

标签:spring-amqp,rabbitmq,spring-rabbit,java,spring-integration
来源: https://codeday.me/bug/20191122/2058390.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有