ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

注解位置辨析

2019-06-03 20:55:26  阅读:267  来源: 互联网

标签:java amqp 辨析 位置 springframework listener rabbit org 注解


随着对消息队列的应用日益推广,在分布式系统中的使用可以极大的降低对各个组件间的耦合度,从而提高组件的处理效率。因为消息队列的存在,可以使我们对任务进行异步处理,这样可以减少请求响应时间和解耦。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。
所谓解耦,就是说A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
注解位置辨析
所谓异步,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!
注解位置辨析
此外使用消息队列还有削峰的优势。所谓削峰,即在某些时刻,用户会大量的对我们的服务发起请求,我们的数据库有时候需要对这些请求进行写入,但是呢,mysql的吞吐量顶破天就5000,剩下的就要慢慢等了,而且当并发量过高的时候,数据库的各种异常也会让人抓狂,但是呢,我们使用消息队列就不一样了,用户的各种请求通通塞入消息队列里面,之后由消息队列返回处理结果,而请求存储在队列里面,一个个按顺序消费,使请求写入不出现高峰低谷
注解位置辨析
基于这些有点,我们开发团队最近在spring boot的开发过程中,由于项目的需要我们进行消息队列的接入改造。
在改造过程中遇到了这样的问题,起初我将注解写在了class上,在运行的过程中会出现异常,以下是异常的详细内容:

2019-05-31 17:42:36.798 WARN 30544 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1506)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1417)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:147)
at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:250)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:70)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
... 8 common frames omitted

至于说开发的源码,我是这么写的,我在class这里进行注解,这个时候我猜测,应该是注解的位置不对

@Component
@RabbitListener(queues = "xx.yy.zz")
public class Receiver {
    @RabbitHandler
    public void process(MSGSTO message) {
        System.out.println("消费消息");
        System.out.println(message.toString());
    }
}

事实上,确实是位置不对,但更加专业的解答方式是,这个listener注解是方法级别上的,而不能用在class上面,我们不妨来看下RabbitListener的源码,从根本上理解这个方法的使用。

package org.springframework.amqp.rabbit.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(RabbitListeners.class)
public @interface RabbitListener {
   String id() default "";

   String containerFactory() default "";

   String[] queues() default {};

   Queue[] queuesToDeclare() default {};

   boolean exclusive() default false;

   String priority() default "";

   String admin() default "";

   QueueBinding[] bindings() default {};

   String group() default "";

   String returnExceptions() default "";

   String errorHandler() default "";

   String concurrency() default "";

   String autoStartup() default "";

}

由于业务需要,我们确实是需要对消息进行异步处理,而异步接收消息的最简单的方法是使用带注解的监听端点基础结构。简而言之,它允许将托管bean的方法公开为Rabbit listener的端点。br/>在这里,使用queues属性时,可以指定关联的容器可以监听多个队列。可以使用@Header注释来创建POJO方法可接收消息的队列名称。
这里我通过queues来指定监听的队列

@Component
public class Receiver {

    @RabbitListener(queues = "xx.yy.zz")
    @RabbitHandler
    public void process(MSGSTO message) {
        System.out.println("消费消息");
        System.out.println(message.toString());

至于说配置方式,我是通过application.yml的形式进行接入配置的,例如

rabbitmq:
  addresses: 127.0.0.1
  port: 5672
  username: guest
  password: guest
  publisher-confirms: true
  publisher-returns: true
  virtual-host: dev
  listener:
      simple:
          concurrency: 10
          max-concurrency: 20

这些属性会被注入到RabbitProperties属性中,如

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
    …
}

挺有趣的对吧:)

参考资料:

  1. 为什么使用消息队列,https://www.javazhiyin.com/22897.html

标签:java,amqp,辨析,位置,springframework,listener,rabbit,org,注解
来源: https://blog.51cto.com/yerikyu/2404468

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有