ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Ribbit Mq 实现延迟消息

2022-01-27 21:03:53  阅读:191  来源: 互联网

标签:队列 Ribbit 死信 Mq DEAD LETTER org import 延迟


--------------------好记性不如烂笔头---------------------------

windows 环境,使用 rabbit Mq 需要安装, erl   和  rabbit Mq

1.erl 安装完需要配置环境变量

2.查询 erl 是否安装好,cmd-->erl -version

erl -version
3.MQ 安装目录下
D:\anzhuang\rabbitmq_server-3.8.9\sbin
启动 :cmd-->rabbitmq-server

访问:http://localhost:15672/#/

guest/guest (用户名/密码)

--以上可以登录,进行下面操作,配置文件

spring:


rabbitmq:
host: localhost
port: 5672
virtualHost: /
username : guest
password : guest
listener:
simple:
acknowledge-mode: manual

pom文件
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置类

package com.example.servicebuy.config;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* Created by qiuzhijie.
* Date: 2019-01-07
* 备注: mq config information
*/
@Configuration
public class RabbitMQConfig {

 

private Logger log = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String HOST;
@Value("${spring.rabbitmq.port}")
private Integer PORT;
@Value("${spring.rabbitmq.virtualHost}")
private String VIRTUALHOST;
@Value("${spring.rabbitmq.username}")
private String USERNAME;
@Value("${spring.rabbitmq.password}")
private String PASSWORD;

@Bean
public CachingConnectionFactory connectionFactory() {
log.info("RabbitMQ链接信息:{},{},{},{}", HOST, PORT, USERNAME, PASSWORD);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.HOST, this.PORT);

connectionFactory.setUsername(this.USERNAME);
connectionFactory.setPassword(this.PASSWORD);
connectionFactory.setVirtualHost(this.VIRTUALHOST);
log.info("RabbitMQ连接成功");

return connectionFactory;


}

 

/**
* 死信队列交换机标识符 属性值不能改,写死
*/
private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键 标识符 属性值不能改,写死
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


/**
* deadLetterExchange(direct类型交换机)
*
* @return
*/
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DEAD_LETTER_EXCHANGE").durable(true).build();
}

/**
* 声明一个死信队列
* x-dead-letter-exchange 对应 死信交换机
* x-dead-letter-routing-key 对应 死信队列
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
//应该像个普通队列,里面多设置了两个参数,这个队列没有被消费或者超时 则通过x-dead-letter-exchange 指明重新回到死信交换机 TEST_SIGN_EXCHANGE
//交换机
// 参数
Map<String, Object> args = new HashMap<>(2);
// 出现dead letter之后将dead letter重新发送到指定exchange
args.put(DEAD_LETTER_QUEUE_KEY, "DEAD_LETTER_EXCHANGE");
// 出现dead letter之后将dead letter重新按照指定的routing-key发送
args.put(DEAD_LETTER_ROUTING_KEY, "REDIRECT_KEY");
// name队列名字 durable是否持久化,true保证消息的不丢失, exclusive是否排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除, autoDelete如果该队列没有任何订阅的消费者的话,该队列是否会被自动删除, arguments参数map
return new Queue("DEAD_LETTER_QUEUE", true, false, false, args);
}


/**
* 死信路由通过 DEAD_LETTER_KEY 绑定到死信队列上.
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DEAD_LETTER_QUEUE", Binding.DestinationType.QUEUE, "DEAD_LETTER_EXCHANGE", "DEAD_LETTER_KEY", null);

}

/**
* 死信路由通过 REDIRECT_KEY 绑定到转发队列上. 这个队列绑定的是当出现死信消息后 重新转发给的队列
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DEAD_LETTER_EXCHANGE", "REDIRECT_KEY", null);
}
/**
* 定义死信队列转发队列. (和普通队列一样,这个队列是为了原有的消息没有被消费重新转发给一个新的队列)
*/
@Bean("redirectQueue")
public Queue redirectQueue() {
return new Queue("REDIRECT_QUEUE", true, false, false);
}

}

 

2.消息生产者 --》发送消息

 

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

 

@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 延迟消息 ribbit MQ栗子
*
* @param messAge
* @throws Exception
*/
@ApiOperation("死信发送消息")
@GetMapping("creatMessageDear")
public void creatMessageDear(@RequestParam("messAge") String messAge) throws Exception {
//声明消息处理器 设置消息的编码以及消息的过期时间 时间毫秒值为字符串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString().replaceAll("-", ""));
messageProperties.setContentEncoding("utf-8");
//超时时间10秒 (我这里是延迟10秒,根据业务需要设置时间)
messageProperties.setExpiration(String.valueOf(1000 * 10));
return message;
};
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("msg", messAge);
rabbitTemplate.convertAndSend("DEAD_LETTER_EXCHANGE", "DEAD_LETTER_KEY", dataMap, messagePostProcessor);
}

3.接受死信消息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class TestMQConsumer {

/**
* 监听转发队列 死信队列重新转发回这里
*
*/
@RabbitListener(queues = {"REDIRECT_QUEUE"})
public void redirect(HashMap<String,Object> dataMap) throws IOException {
System.out.println(dataMap.get("msg"));
System.out.println("我是转发队列,这里执行逻辑业务");
}
}
------------------------------


 

标签:队列,Ribbit,死信,Mq,DEAD,LETTER,org,import,延迟
来源: https://www.cnblogs.com/wanqiang/p/15851204.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有