ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应

2019-07-04 03:47:46  阅读:322  来源: 互联网

标签:java rabbitmq spring-integration spring-amqp


我试图使用Spring Integration配置以下内容:

>向频道发送消息.
>将此消息发布到与n个消费者的兔子扇出(pub / sub)交换.
>每个消费者都提供响应消息.
>让Spring Integration在将它们返回到原始客户端之前聚合这些响应.

到目前为止,我有一些问题……

>我正在使用发布 – 订阅 – 通道来设置apply-sequence =“true”属性,以便correlationId,sequenceSize& sequenceNumber属性已设置. DefaultAmqpHeaderMapper抛弃了这些属性. DEBUG headerName = [correlationId]将不会被映射
>即使在扇出交换中注册了2个队列,sequenceSize属性也只被设置为1.据推测,这意味着消息将过早地从聚合器中释放.我希望这是因为我滥用发布 – 订阅 – 频道才能使用apply-sequence =“true”,而且正确地说只有一个订户,即int-amqp:outbound-gateway.

我的出站Spring配置如下:

<int:publish-subscribe-channel id="output" apply-sequence="true"/>

<int:channel id="reply">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:aggregator input-channel="reply" method="combine">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

<int:logging-channel-adapter id="logger" level="INFO"/>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"/>

我的rabbitMQ配置如下:

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>

<rabbit:fanout-exchange name="fanout-exchange">
    <rabbit:bindings>
        <rabbit:binding queue="a-queue" />
        <rabbit:binding queue="b-queue" />
    </rabbit:bindings>
</rabbit:fanout-exchange>

消费者看起来像这样:

<int:channel id="input"/>

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>

<bean id="listenerService" class="example.ListenerService"/>

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>

任何建议都会很棒,我怀疑我的某个地方有错误的结尾……

基于Gary的评论新的出站弹簧配置:

<int:channel id="output"/>

<int:header-enricher input-channel="output" output-channel="output">
    <int:correlation-id expression="headers['id']" />
</int:header-enricher>

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />

<int-amqp:outbound-gateway request-channel="output"
                                   amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                   reply-channel="reply"
                                   mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>

<int:channel id="reply"/>

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
    <bean class="example.SimpleAggregator"/>
</int:aggregator>

解决方法:

问题是S.I.不知道扇出交换的拓扑结构.

最简单的方法是使用自定义发布策略

release-strategy-expression="size() == 2"

在聚合器上(假设扇出为2).所以,你不需要序列大小;你可以避免使用标题扩充器“滥用”发布/订阅者频道…

    <int:header-enricher input-channel="foo" output-channel="bar">
        <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
    </int:header-enricher>

您可以使用消息ID避免创建新的UUID,消息ID已经是唯一的…

<int:correlation-id expression="headers['id']" />

最后,您可以通过添加将correlationId标头传递给AMQP

mapped-request-headers="correlationId"

到你的amqp端点.

标签:java,rabbitmq,spring-integration,spring-amqp
来源: https://codeday.me/bug/20190704/1373378.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有