ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ小记(二)

2020-04-06 19:04:15  阅读:227  来源: 互联网

标签:交换器 String RabbitMQ printStackTrace catch channel 小记


1、RabbitMQ相关介绍

(1)RabbitMQ整体上是一个生产者和消费者模型,主要负责接收、存储、转发消息。RabbitMQ整体结构图如下:

 

 

 

 (2)生产者:发送消息的一方,生产者创建一条消息,发布到RabbitMQ上,消息一般分为两部分:消息体和标签,消息体是带有业务逻辑结构的数据,也可以进一步对消息体进行序列化,标签用来描述这条消息。

     消费者:接收消息的一方,消费者创建一条连接,接到RabbitMQ服务器上的队列上,当消费者消费一条队列上的消息时,只是消费消息体,标签自动丢弃,所以消费者不会知道生产者是谁。

    Broker:消息中间服务节点,一个RabbitMQ Broker可以看作是一个RabbitMQ的实例,也可看作一台rabbitMQ的服务器。

    队列:Queue,RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅一个队列,不支持队列层面的广播消费。

    交换器:Exchange,生产者创建消息,把消息交给交换器,有交换器把消息发送到一个或多个队列上。如果交换器发送队列失败,消息会返回给生产者或者丢弃。RabbitMQ中交换器有四种类型:fanout、direct、topic、headers。

    fanout:四种交换器中其一,会把消息发送到所有与交换器绑定的队列上。

    direct:四种交换器其二,会把消息发送到bindingKey和RoutingKey完全匹配的队列上。

    topic:四种交换器其三,与direct相似,会把消息发送到bindingKey和RoutingKey完全匹配的队列上,但匹配规则不同。

    headers:四种交换器其四,根据消息中的headers的属性来进行匹配,性能差,基本不会使用。

    bindingKey:绑定键,RabbitMQ中通过绑定键把交换器和对列关联起来,与RoutingKey配合使用。

    RoutdingKey:路由键,生产者将消息发送给交换器时会指定一个RoutingKey,当bindingKey和RoutingKey完全匹配时,消息会被放到对应的队列上。

1、SpringBoot整合RabbitMQ

(1)环境配置

项目采用maven构建系统,所以需要在pom.xml的文件中引入RabbitMQ的相关依赖:

<!--引入RabbitMQ的相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
在application.yml中配置RabbitMQ相关信息:
Spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5727
    username: guest
    password: guest

(2)初始化RabbitMQ连接

public class RabbitMQConfig {
Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
/**
* rabbitmq连接
* */
@Bean
public Connection rabbitMQ_Config(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection conn = null;
try {
//创建连接
conn = factory.newConnection();
} catch (IOException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (TimeoutException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
return conn;
}
@Bean
/**
* rabbitmq连接.uri方式
* */
public Connection rabbitMQ_Config_Uri(){
ConnectionFactory factory = new ConnectionFactory();
Connection conn = null;
try {
factory.setUri("amqp://"+username+":"+password+"@"+host+":"+port+"/virtualHost");
try {
conn = factory.newConnection();
} catch (IOException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (TimeoutException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
} catch (URISyntaxException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (KeyManagementException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
return conn;
}
}

(3)生产者

/**
* 生产者
* */
@Bean
public boolean direct_MQ_Producer(String excahngeName, String exchangeType,String Queue,String Binding,byte[] array) throws Exception{

boolean flag = false;
Connection con = rabbitMQ_Config();
//创建通道
Channel channel = con.createChannel();
try {
//创建交换机
channel.exchangeDeclare(excahngeName,exchangeType,true);
//创建队列
channel.queueDeclare(Queue,true,false,false,null);
//将交换机与队列绑定
channel.queueBind(Queue,excahngeName,Binding);
//发送消息
channel.basicPublish(excahngeName,Binding, MessageProperties.PERSISTENT_TEXT_PLAIN,array);
flag = true;
} catch (IOException e) {
logger.error("发送消息失败!",e);
e.printStackTrace();
}finally {
//关闭通道连接
channel.close();
//关闭连接
con.close();
}
return flag;
}

(4)消费者   

/**
* 消费者
* */
/**
* direct类型交换机消费者
* */
@Bean
public String[] direct_MQ_Consumer(String Queue) throws IOException, TimeoutException {
Connection con = rabbitMQ_Config();
Channel channel = null;
final String[] message = {""};
try {
channel = con.createChannel();
//设置客户端最多接收未被接收的数目
channel.basicQos(64);
Channel finalChannel = channel;
//通过重写DefaultConsumer方法来实现消费者消息
Consumer consumer = new DefaultConsumer(finalChannel){
@Override
public void handleDelivery(String s1, Envelope envelope1, AMQP.BasicProperties basicProperties1, byte[] bytes) throws IOException{
message[0] = new String(bytes);
System.out.println("recv messahe : " + new String(bytes));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
finalChannel.basicAck(envelope1.getDeliveryTag(),false);
}
};
channel.basicConsume(Queue,consumer);
TimeUnit.SECONDS.sleep(5);
} catch (IOException e) {
logger.error("创建通道失败!",e);
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
//关闭资源
channel.close();
con.close();
}
return message;
}

标签:交换器,String,RabbitMQ,printStackTrace,catch,channel,小记
来源: https://www.cnblogs.com/carblack/p/12639260.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有