ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMq学习

2021-05-27 11:01:55  阅读:205  来源: 互联网

标签:rabbitTemplate 重试 ack rabbitmq RabbitMq 学习 消息 true


1、概念

amqp:协议
Provider:生产者
Consumer:消费者
Broker:接收和分发消息的应用 RabbitMQ Server
virtual:虚拟机
把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概
念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出
多个vhost,每个用户在自己的vhost创建exchange/queue等
Exchange:交换器
消息交换机,它指定消息按什么规则,路由到哪个队列
常用的交换器:
DirectExchange:交换机在接收到消息后,通过路由键路由到指定的队列
FanoutExchange:交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
TopicExchange:与DirectExchange相似,路由键支持模糊匹配(*,#)
Queue:队列
存放消息的实体

2、配置

spring:
  application:
    name: rabbit
  rabbitmq:
    host: local
    port: 5672
    username: root
    password: 123
    virtual-host: default

3、消息确认以及通知

RabbitMQ引入发送端消息确认机制,主要通过事务和publisher Confirm机制

3.1、事务

/**
 * 配置启用rabbitmq事务
 * @param connectionFactory
 * @return
 */
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}
 
	@PostConstruct
private void init() {
    //启用事务模式,不能开确认回调
	//rabbitTemplate.setConfirmCallback(this);
   //rabbitTemplate.setReturnCallback(this);
	rabbitTemplate.setChannelTransacted(true);
	rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}

@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void sendIngateQueue(TradePayModelRes msg) {
	logger.info("进闸消息已发送 {}",msg.getOutTradeNo());
	rabbitTemplate.convertAndSend(exchange,ingateQueue,msg);
}

3.2、Confirm机制

先加上配置,开始confirm机制

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.template.mandatory=true
@Component
public class BaseRabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    private static final Logger LOGGER = LoggerFactory.getLogger(BaseRabbitConfig.class);

//    @Resource
//    private RedisConfig redisConfig;

    @Resource
    private RedisTemplate<String, String> redisTemplate;

    @Resource
    private MsgLogMapper msgLogMapper;

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setConfirmCallback(this);
        //rabbitTemplate.setMandatory(true);
        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }

    //消息发送到Exchange的时候会进行判断
    //这里会有一个问题需要注意,就是confirm回productor可能会有延迟,意思就是可以消费者已近就收到了消息,生产者才收到confirm的消息
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
            if (ack) {
                //成功。。。。。
            } else {
                //失败。。。。。
            }
    }
    
    //只有在exchange路由到queue失败的时候才会调用
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

    }
}

3.3、消费者的ack机制

acknowledge-mode: manual/none/auto 手动/关闭/自动
manual:
根据我们的实际需求,手动进行ack
none:
不进行ack
auto:
消费者监听到消息后马上进行ack

ack有重试机制,默认的重试次数为3次,可以通过配置进行改动,这里需要注意的事,如果异常被catch 了,没有跑出来的话,重试次数和重试时间间隔的配置将会失效,没有处理好的话会出现死循环。(如果又要catch处理又要指定重试次数,可以使用redis)

spring.rabbitmq.listener.simple.retry.enabled = true //是否开启重试
spring.rabbitmq.listener.simple.retry.max-attempts = 3 //重试次数
spring.rabbitmq.listener.simple.retry.initial-interval = 2000 //每次重试的时间间隔

手动ack(下文的返回队列,意味着会进行重试)
注意:开启ack,必须要对消息进行ack不然消息就会阻塞,后果很严重

//消息消费成功,第一个参数为服务给消息的唯一标志,第二个参数为是否批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否重新返回队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
//消息消费失败,第二个参数为是否批量处理,第三个为是否重新返回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

监听方法的定义一般如下

@RabbitListener(queues = {"mail.send.queue"})
public void sendMail(Message message, Channel channel) throws IOException {
    //。。。。。
    }

标签:rabbitTemplate,重试,ack,rabbitmq,RabbitMq,学习,消息,true
来源: https://blog.csdn.net/weixin_45062785/article/details/117323576

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有