ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java架构师成长之道之RabbitMQ开发与运维入门-基础篇

2019-08-09 23:54:28  阅读:111  来源: 互联网

标签:java 运维 Exchange rabbitmq ittimeline RabbitMQ Java 消息



Java架构师成长之道之RabbitMQ开发与运维入门-基础篇

Java架构师成长之道

主流消息中间件介绍

MQ衡量指标

  • 服务性能
  • 数据存储
  • 集群架构

ActiveMQ

  • Apache出品
  • 完全支持JMS规范的消息中间件
  • 丰富的API、多种集群构建(Master-Slave,NetWork)模式
  • 广泛运用于中小型企业
  • 性能一般,不适用于高并发、大数据的业务场景

ActiveMQ 集群模式
ActiveMQ 集群模式

Kafka

  • LinkedIn开源的分布式发布-订阅消息系统,后来贡献给Apache,成为顶级开源项目
  • 基于Pull模式处理消费消息
  • 追求高吞吐量,一开始的目的就是用于日志收集和传输,适合大数据的互联网服务的数据收集业务
  • 0.8版本开始支持复制
  • 不支持事务,对于消息的重复、丢失、错误没有严格要求
  • 高性能读写是基于操作系统底层的Page Cache实现,使用内存存储。

基于内存、高性能,节点之间相互复制
Kafka集群模式

RocketMQ

  • 阿里开源的消息中间件,后来贡献给Apache,成为顶级开源项目
  • 使用Java开发,具有高吞吐量、高可用性、适用大规模分布式系统的特点
  • RocketMQ思路源于Kakfa,对消息的可靠传输以及事务性做了优化
  • 目前被阿里集团广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景
  • 商业版收费

RocketMQ集群拓扑图
RocketMQ集群拓扑图

RabbitMQ

  • RabbitMQ是Erlang语言开发的开源消息队列系统,基于AMQP协议实现,用来通过普通协议在完全不同的应用之间共享数据
  • AMQP的主要特征是面向消息、队列、路由(包括点对点发布订阅)、可靠性、安全
  • AMQP协议更多用在企业系统内部,对数据一致性、稳定性、可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
  • 适用在金融交易场景

基于镜像队列实现高可用
RabbitMQ集群架构

RabbitMQ核心概念与AMQP协议

互联网大厂为什么会选择RabbitMQ

RabbitMQ凭借着以下特性在美团、滴滴、去哪儿、头条、艺龙等一线互联网大厂广泛使用

  • 开源、性能优秀、稳定
  • 提供可靠性消息投递模式,返回模式,
  • 与Spring AMPQ完美的整合,API丰富。
  • 集群模式丰富、支持表达式配置、HA模式、镜像队列模型。
  • 保证数据不丢失的前提下做到高可靠性、可用性。

RabbitMQ的高性能之道是如何做到的

RabbitMQ是使用Erlang语言开发,Erlang语言广泛用于网络交换机领域,Erlang有着与原生Socket一样的延迟,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀。

什么是AMQP高级协议

AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和 IEC 国际化标准。目前基于AMQP协议实现的产品包括:

  • Apache Qpid, an Apache project
  • Fedora Linux AMQP Infrastructure
  • IIT Software's SwiftMQ is a enterprise grade JMS messaging product with full support for AMQP 1.0
  • INETCO's AMQP protocol analyzer
  • JORAM: open reliable asynchronous messaging, 100% pure Java implementation of JMS
  • Kaazing's AMQP Web Client
  • Microsoft's Windows Azure Service Bus
  • JBoss A-MQ by Red Hat built from Qpid
  • StormMQ a cloud hosted messaging service based on AMQP
  • VMware Inc RabbitMQ; also supported by SpringSource
  • MQlight by IBM

关于AMPQ协议的完整介绍可以参考官方文档

AMQP核心概念

AMQP协议模型
AMQP协议模型
AMQP协议的原理
AMQP协议的原理

  • AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等

  • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。

  • 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成。

  • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息。

  • Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中。

  • Binding-绑定:Exchange和Exchange、Queue之间的连接关系,绑定中可以包含routing key或者参数。

  • Queue:消息队列,实际存储消息数据,常用的属性有Durability:是否持久化,Durable:是,Transient:否,Auto delete:如果选yes,代表最后一个监听被移除之后,该Queue会自动删除。

  • Message:服务器和应用程序之间传送个的数据,本质上就是一段数据,由Properties和Payload(Body)组成,消息中常用的属性有 delivery mode,headers(自定义属性),content_type,content_encoding,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id

  • Virtual Host-虚拟主机:虚拟地址,用于进行逻辑隔离,最上层的消息路由,同一个Virtual Host里面不能有同名的Exchange或Queue。

RabbitMQ整体架构模型

RabbitMQ整体架构模型
RabbitMQ整体架构模型

RabbitMQ消息是如何流转

AMPQ消息的流转流程
AMPQ消息流转流程
1.在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了。

2.Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式,我们随后会讲到。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。

3.Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。

4.Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel。

RabbitMQ安装和基本使用

RabbitMQ安装

  1. 系统基础组件和配置

1.1 查看服务器版本
服务端操作系统基于CentOS7.3搭建,可以使用cat /etc/centos-release查看系统版本

[root@ittimeline ~]# cat /etc/centos-release
CentOS Linux release 7.3.1611 (Core) 

1.2 配置hostname
然后使用vim /etc/hostname修改host配置,因为RabbitMQ Server日志的文件名会依赖hostname
vim编辑器的基本使用:
首先使用vim修改文件时进入的浏览模式,需要使用i键盘进入编辑模式
修改完成文件之后,需要使用esc退出编辑模式,然后使用:wq退出vim并保存修改的内容

[root@ittimeline ~]# vim  /etc/hostname 

查看修改的内容

[root@ittimeline ~]# cat /etc/hostname                      
ittimeline.net

1.3 关闭防火墙
CentOS7的防火墙默认是开机启动的,这里为了方便使用RabbitMQ基于BS的管控台,需要关闭防火墙。

[root@ittimeline ~]# systemctl stop firewalld.service

还可以使用如下命令开机禁用防火墙

[root@ittimeline ~]# systemctl disable  firewalld.service

1.4 网卡开机启动

[root@ittimeline ~]# cd /etc/sysconfig/network-scripts/
 [root@ittimeline network-scripts]# cat ifcfg-eth0 
# Generated by parse-kickstart
DEVICE="eth0"
IPV6INIT="yes"
BOOTPROTO="dhcp"
UUID="c79432fa-c801-4f60-b4c1-a9cb34eeae2c"
ONBOOT="yes"
TYPE=Ethernet
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_PEERDNS=yes
IPV6_PEERROUTES=yes
IPV6_FAILURE_FATAL=no
IPV6_PRIVACY=no
NAME="System eth0" 

将属性ONBOOT的值改为yes即可,然后重启系统,就可以实现开机启动网络连接。

1.5 安装系统基础组件

在安装rabbitMQ之前使用yum install安装以下组件,否则rabbitMQ会安装失败。

[root@ittimeline ~]# yum install -y build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m3 ncurse-devel tk tc xz
  1. erlang、socat、rabbitmq下载
    首先根据官网给出的Erlang和RabbitMQ版本对应关系
    选择Erlang 22.0.X +RabbitMQ 3.7.17,版本确定以后,可以从官网下载erlang和rabbitmq安装包。

erlang针对CentOS7的rpm包
erlang针对CentOS7的rpm包
abbitmq针对CentOS7的rpm包
rabbitmq针对CentOS7的rpm包

下载erlang和rabbitmq只需要使用wget命令然后加上下载的地址即可
由于rabbitmq server依赖socat,还需要从阿里云镜像站上下载socat,注意它们的下载顺序和安装顺序一致

[root@ittimeline ~]# cd /home/liuguanglei/Downloads/
root@ittimeline Downloads]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.0.7-1.el7.x86_64.rpm/download.rpm

[root@ittimeline Downloads]# wget https://mirrors.aliyun.com/centos/7.6.1810/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm

[root@ittimeline Downloads]# wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.7.17-1.el7.noarch.rpm/download.rpm

查看下载的软件版本

[root@ittimeline Downloads]# ls -al
total 29428
drwxr-xr-x.  2 liuguanglei liuguanglei     4096 Aug  6 11:50 .
drwx------. 16 liuguanglei liuguanglei     4096 Aug  6 10:19 ..
-rw-r--r--.  1 root        root        19384788 Jul 12 01:31 erlang-22.0.7-1.el7.x86_64.rpm
-rw-r--r--.  1 root        root        10438300 Jul 29 12:07 rabbitmq-server-3.7.17-1.el7.noarch.rpm
-rw-r--r--.  1 root        root          296632 Aug 11  2017 socat-1.7.3.2-2.el7.x86_64.rpm
  1. erlang、socat、rabbitmq安装
    rpm的安装只需要使用rpm -ivh命令安装即可。
[root@ittimeline Downloads]# rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm 
[root@ittimeline Downloads]# rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm 
[root@ittimeline Downloads]# rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm 

4.修改配置
使用rpm安装方式安装的rabbitmq的配置默认路径是/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app
这里暂时只是修改loopback_users,让guest用户能够使用ip登录,如果不修改的话,登录RabbitMQ 管控台会看到如下提示
用户只能使用localhost登录

[root@ittimeline Downloads]# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app 

修改之前
loopback_users
修改之后
loopback_users
就是把loopback_users的["<<guest>>"]修改为[guest]即可

RabbitMQ基本使用

  1. 启动rabbitmq-server
    使用rabbitmq-server start & 启动rabbitmq服务
[root@ittimeline Downloads]# rabbitmq-server start &
[1] 25834
[root@ittimeline Downloads]# 
  ##  ##
  ##  ##      RabbitMQ 3.7.17. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /var/log/rabbitmq/rabbit@ittimeline.log
                    /var/log/rabbitmq/rabbit@ittimeline_upgrade.log

              Starting broker...
 completed with 0 plugins.

RabbitMQ服务启动后会输出版本信息以及日志路径,其日志路径
Logs: /var/log/rabbitmq/rabbit@ittimeline.log
/var/log/rabbitmq/rabbit@ittimeline_upgrade.log

如果想要在终端上输入其他命令,回车即可,此时RabbitMQ服务进入后台运行,可以使用ps命令查看rabbitmq进程信息

Last login: Tue Aug  6 10:20:39 2019 from 172.16.237.111
[root@ittimeline ~]# ps -ef|grep rabbitmq
root     25834  4615  0 12:12 pts/1    00:00:00 /sbin/runuser -u rabbitmq -- /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 25843 25834  0 12:12 pts/1    00:00:00 /bin/sh /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 25940     1  0 12:12 ?        00:00:00 /usr/lib64/erlang/erts-10.4.4/bin/epmd -daemon
rabbitmq 25995 25843  0 12:12 pts/1    00:00:04 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin  -noshell -noinput -s rabbit boot -sname rabbit@ittimeline -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/var/log/rabbitmq" -rabbit lager_default_file "/var/log/rabbitmq/rabbit@ittimeline.log" -rabbit lager_upgrade_file "/var/log/rabbitmq/rabbit@ittimeline_upgrade.log" -rabbit feature_flags_file "/var/lib/rabbitmq/mnesia/rabbit@ittimeline-feature_flags" -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@ittimeline-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@ittimeline" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 start
rabbitmq 26097 25995  0 12:12 ?        00:00:00 erl_child_setup 1024
rabbitmq 26127 26097  0 12:12 ?        00:00:00 inet_gethost 4
rabbitmq 26128 26127  0 12:12 ?        00:00:00 inet_gethost 4
root     28310 28242  0 12:21 pts/0    00:00:00 grep --color=auto rabbitmq

使用lsof(list open files)查看进程进程打开的文件,进程打开的端口,打开文件的进程。。

[root@ittimeline ~]# lsof -i:5672
COMMAND    PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
beam.smp 25995 rabbitmq   77u  IPv6 148008      0t0  TCP *:amqp (LISTEN)

2.关闭rabbitmq server

[root@ittimeline Downloads]# rabbitmqctl stop
Stopping and halting node rabbit@ittimeline ...
Gracefully halting Erlang VM

3.启用 RabbitMQ Server Web管控台

使用rabbitmq-plugins enable rabbitmq_management启用RabbitMQ Server的Web管理界面

[root@ittimeline Downloads]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@ittimeline:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@ittimeline...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

set 3 plugins.
Offline change; changes will take effect at broker restart.

然后就可以使用浏览器访问地址http://172.16.238.193:15672/
然后输入guest/guest后点击Login按钮登录RabbitMQ管控台
登录RabbitMQ管控台
RabbitMQ管控台预览页面
RabbitMQ管控台预览页面

其中172.16.238.193是CentOS的ip地址,可以使用ifconfig命令查看

[root@ittimeline Downloads]# ifconfig
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 172.16.238.193  netmask 255.255.252.0  broadcast 172.16.239.255
        inet6 fe80::21c:42ff:feb5:d22e  prefixlen 64  scopeid 0x20<link>
        ether 00:1c:42:b5:d2:2e  txqueuelen 1000  (Ethernet)
        RX packets 1110406  bytes 326972310 (311.8 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 39752  bytes 5885101 (5.6 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        inet6 ::1  prefixlen 128  scopeid 0x10<host>
        loop  txqueuelen 1  (Local Loopback)
        RX packets 1616  bytes 105899 (103.4 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 1616  bytes 105899 (103.4 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

virbr0: flags=4099<UP,BROADCAST,MULTICAST>  mtu 1500
        inet 192.168.122.1  netmask 255.255.255.0  broadcast 192.168.122.255
        ether 52:54:00:16:27:85  txqueuelen 1000  (Ethernet)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 0  bytes 0 (0.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

其中eth0表示第一块网卡。
而15672是RabbitMQ管控台的访问端口,常用的端口还有客户端程序(Java,Python)通讯的5672端口和集群的25672端口。

RabbitMQ管理

RabbitMQ命令使用

RabbitMQ提供了rabbitmqctl工具用于管理RabbitMQ,具体的使用可以使用rabbitmqctl --helpman rabbitmqctl查看对应的帮助以及手册,因为命令提供的功能在管控台中已经实现,而基于管控台的操作更加便捷和直观。

  • rabbitmqctl 应用管理

    • rabbitmqctl stop_app:关闭应用
    • rabbitmqctl start_app:启动应用
    • rabbitmqctl status:节点状态
  • rabbitmqctl用户管理

    • rabbitmqctl add_user username password :添加用户
    • rabbitmqctl list_users:列出所有用户
    • rabbitmqctl delete_user username:删除用户
    • rabbitmqctl clear_permissions -p vhostpath username:清除权限
    • rabbitmqctl list_user_permissions username:列出用户权限
    • rabbitmqctl change_password username newpassword:修改密码
    • rabbitmqctl set_permissions -p vhostpath username "".""."*":设置用户权限
  • rabbitmqctl 虚拟主机管理

    • rabbitmqctl add_vhost vhostpath:创建虚拟主机
    • rabbitmqctl list_vhosts:列出所有虚拟主机
    • rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有的权限
    • rabbitmqctl delete_vhost vhostpath :删除虚拟主机
  • rabbitmqctl 队列管理

    • rabbitmqctl list_queues:查看所有队列消息
    • rabbitmqctl -p vhostpath purge_queue blue:清除队列的消息
  • rabbitmqctl 高级操作(集群)

    • rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用
    • rabbitmqctl join_cluster [--ram]:组成集群命令
    • rabbitmqctl cluster_status:查看集群状态
    • rabbitmqctl change_cluster_node_type disc|ram:修改集群节点的存储形式
    • rabbitmqctl forget_cluster_node [--offline]:忘记节点(摘除节点)
    • rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2][newnode2]:修改节点名称

RabbitMQ管控台使用

浏览器中输入地址 http://172.16.238.193:15672/ 后,并输入guest/guest账号访问RabbitMQ
登录成功之后默认进入的是预览页面,预览页面给出了erlang和rabbit server的版本信息
以及Connetions,Channels,Exxhanges,Queues、Consumers信息以及当前节点的信息。
RabbitMQ管控台预览页面

RabbitMQ 消息生产与消费

基于Java-Client API实现

首先实现基于RabbitMQ java-client实现的获取RabbitMQ Server的连接

在java-client-utils的src/main/resources目录下定义rabbitmq.properties
同学可以根据自己的RabbitMQ Server配置作相应的修改

#rabbitmq.host=192.168.0.102
rabbitmq.host=172.16.238.193
rabbitmq.port=5672
rabbitmq.virtualhost=/
rabbitmq.username=guest
rabbitmq.password=guest

然后定义RabbitMQUtils类,主要用于读取rabbitmq.properties,然后借助RabbitMQ Java Client的ConnectionFactory创建连接以及关闭连接。

 package net.ittimeline.java.middleware.rabbitmq.java.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

/**
 * Rabbit Message Queue 工具类
 * 封装获取连接和关闭连接方法
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 19:53
 * @website www.ittimeline.net
 * @since JDK11.03
 * @see https://www.rabbitmq.com/java-client.html
 */
@Slf4j
public class RabbitMQUtils {


    /**
     * 基于rabbitmq java client构建客户端与rabbitmq server连接
     * @return
     */
    public static Connection buildConnection(){
        Connection connection=null;
        try(
                //读取src/main/resources目录下的rabbitmq.properties
                InputStream inputStream= RabbitMQUtils.class.getClassLoader().getResourceAsStream("rabbitmq.properties")
        ){
            ConnectionFactory connectionFactory=new ConnectionFactory();
            Properties properties = new Properties();
            properties.load(inputStream);
            connectionFactory.setHost(properties.getProperty("rabbitmq.host"));
            connectionFactory.setPort(Integer.valueOf(properties.getProperty("rabbitmq.port")));

            connectionFactory.setVirtualHost(properties.getProperty("rabbitmq.virtualhost"));
            connectionFactory.setUsername(properties.getProperty("rabbitmq.username"));
            connectionFactory.setPassword(properties.getProperty("rabbitmq.password"));
            connection=connectionFactory.newConnection();
        }catch (IOException | TimeoutException e){
            e.printStackTrace();
            log.error("创建ConnectionFactory失败",e);

        }

        return connection;
    }


    /**
     * 关闭Rabbit Server 连接
     * @param connection
     */
    public static void close(Connection connection){
        if(connection!=null){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
                log.error("关闭RabbitMQ连接失败",e);
            }
        }
    }

}

基于RabbitMQ Java Client生产消息实现

package net.ittimeline.java.middleware.rabbitmq.java.client.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产消息
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 17:50
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class ProducerMessage {


    public static void main(String[] args) {

        Connection connection = RabbitMQUtils.buildConnection();
        /**
         * 交换器的名称
         * 指明消息需要发送到哪个交换器中
         * 如果设置为空字符串,则消息会发送到RabbitMQ默认的交换器中 AMQP Exchange
         * 默认交换隐式绑定到每个队列,路由密钥等于队列名称。无法显式绑定到默认交换或从绑定取消绑定。它也无法删除。
         */
        String exchangeName = "";
        /**
         * 路由键
         * 交换器根据路由键将消息存储到相应的队列中
         * routingKey的值和QueueName相同
         */
        String routingKey = "ittimeline";


        String message = "Hello,RabbitMQ";
        /**
         * 消息体,真正需要发送消息
         */
        byte[] messageBody = message.getBytes();
        try (Channel channel = connection.createChannel()) {
            channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBody);
            log.info("发送RabbitMQ消息成功 ,消息的内容是{}", message);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            log.error("创建Channel失败", e);
        } finally {
           // RabbitMQUtils.close(connection);
        }

    }
}

生产消息管控台界面
生产消息

基于RabbitMQ Java Client消费消息

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 消费消息
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 17:50
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class ConsumerMessage {

    public static void main(String[] args) {


        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();
            boolean autoAck=false;
            String queueName="ittimeline";
            String consumerTag="customConsumerTag";
            //声明(创建)一个队列  启动时先启动Consumer,因为必须要先有队列,生产者才能将消息通过Exchange发送到队列上
            channel.queueDeclare(queueName,true,false,false,null);
            //创建消费者
            channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey=envelope.getRoutingKey();
                    String contentType=properties.getContentType();
                    String message=new String(body);
                    long deliveryTag=envelope.getDeliveryTag();
                    log.info("routingKey {} contentType {},message{},deliveryTag {}",routingKey,contentType,message,deliveryTag);

                    channel.basicAck(deliveryTag,false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        finally{
           // RabbitMQUtils.close(connection);
        }


    }
}

消费消息管控台
消费消息

基于Spring AMQP实现

基于SpringBoot 实现

基于SpringCloud实现

RabbitMQ交换机详解

Exchange:接收客户端发送的消息,并根据消息的routing key转发消息至所绑定的队列
Exchange

交换机属性

  • Name:交换机名称
  • Type:direct,topic,fanout,headers
  • Durability:是否持久化,true为持久化
  • Auto Delete :当最后一个绑定到Exchange上的队列删除之后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议定制化使用

RabbitMQ常用的Exchange

  • Direct Exchange
    Direct Exchange

所有发送到Direct Exchange的消息被转发到Routing key中指定的Queue

Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(bingding)操作,消息传递时,Routing key必须完全匹配才会被队列接收,否则该消息会被抛弃。

基于Direct Exchange 生产消息

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于Direct Exchange 生产消息
 * Producer和Consumer的routing key必须完全一致
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:00
 * @website www.ittimeline.net
 * @since JDK11.03
 */

@Slf4j
public class DirectExchangeProducerMessage {


    public static void main(String[] args) {


        try {
            Connection connection= RabbitMQUtils.buildConnection();
            final Channel channel=connection.createChannel();

            String exchangeName="test_direct_exchange";
            String routingKey="test.direct";
            String message="Hello,RabbitMQ With direct Exchange qqq ";

            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,原因是",e);

        }


    }
}

基于direct exchange消费消息

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于direct exchange消费消息
 *Producer和Consumer的routing key必须完全一致
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:05
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class DirectExchangeConsumerMessage {


    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            final Channel channel=connection.createChannel();

            boolean autoAck=false;

            String exchangeName="test_direct_exchange";
            String exchangeType="direct";
            String queueName="test_direct_queue";
            String routingKey="test.direct";

            //声明exchange
            channel.exchangeDeclare(exchangeName,exchangeType,false,false,false,null);

            //声明队列
            channel.queueDeclare(queueName,true,false,false,null);
            //根据routing key 绑定exchange和队列
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,autoAck,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    log.info("收到消息的内容是{} ",new String(body));
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,异常是",e);
        }

    }
}

Topic Exchange

所有发送到Topic Exchange的消息被转发到所关心Routing Key中所指定Topic的Queue上。
Exchange将Routing key和某个Topic进行匹配,此时队列需要绑定一个Topic。
可以使用通配符进行模糊匹配

  • 符号"#"匹配一个或者多个词
  • 符号 "*"匹配一个词
    例如log.#能够匹配到log.info.oa
    log.*只会匹配到log.error

TopIic Exchange的路由规则
Topic Exchange

基于topic exchange实现的消息生产端

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于topic Exchange生产消息
 * 消息消费端配置account.#,消息生产端配置account.get或者account.list.info都是可以收到消息
 * 消息消费端配置account.*,消息生产端端配置account.get可以收到,但是account.list.info就收不到了,因为account.*只能匹配一个词
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:48
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class TopicExchangeProducerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_topic_exchange";

            String routingKey="account.get";
            String message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.save";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.update";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.delete";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);


            routingKey="account.list.info";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建channel失败,原因是",e);
        }

    }
}

基于topic exchange实现的消息消费端

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于topic exchange 消费消息
 * 消息消费端配置account.#,消息生产端配置account.get或者account.list.info都是可以收到消息
 * 消息消费端配置account.*,消息生产端端配置account.get可以收到,但是account.list.info就收不到了,因为account.*只能匹配一个词
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 15:03
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class TopicExchangeConsumerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_topic_exchange";
            String exchangeType="topic";

            String queueName="test_topic_queue";
            //String routingKey="account.#";
            //如果routingKey换成account.* 那么应该是收不到生产消息的routingKey为 account.list.info,因为*只能匹配一个词
            String routingKey="account.*";

            //声明exchange
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            //声明queue
            channel.queueDeclare(queueName,true,false,false,null);
            //根据routingKey绑定exchange和queue
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    log.info("收到消息的内容是{}",new String(body));
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,失败的原因是",e);
        }

    }
}

在测试基于Topic Exchange的消息发送和接收时,如果发现Exchange和Queue有俩routing key时,可以使用Rabbit Server管控台实现解绑。

解绑Exchange和队列
解绑Exchange和队列

如果不解绑的话,测试程序可能会得到错误的结果
如下图所示,因为消息消费端的routing key是account.*,由于没有解绑之前的account.#,因此收到了routing key 为account.list.info的消息。
基于account.*匹配的消息发送

基于account.*匹配的消息接收
基于account.*匹配的消息接收

Fanout Exchange

Facount Exchange不需要路由键,只需要简单的将队列绑定到交换机上,发送到该交换机的消息都会被转发到与该交换机绑定的队列上。Fanout交换机转发消息是最快的。
Fanout Exchange

基于fanout exchange实现的消息生产端

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于fanout exchange的消息生产端
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 16:53
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class FanoutExchangeProducerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_fanout_exchange";
            //不设置路由键
            //String routingKey="";
            //或者设置任意的路由键
            //消息消费方都可以收到
            String routingKey="any";
            String message="Hello RabbitMQ With Fanout Exchange";
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("RabbitMQ消息发送成功,发送的内容是{}",message);

        } catch (IOException e) {
           log.error("创建Channel失败,原因是",e);
        }

    }
}

基于fanout exchange实现的消息消费端

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于fanout exchange的消息消费端
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 16:54
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class FanoutExchangeConsumerMessage {

    public static void main(String[] args) {
        try {
            Connection connection= RabbitMQUtils.buildConnection();

            Channel channel=connection.createChannel();
            String exchangeName="test_fanout_exchange";
            String exchangeType="fanout";
            //不设置路由键
            String routingKey="";
            String queueName="test_fanout_queue";

            channel.exchangeDeclare(exchangeName,exchangeType,false,false,null);
            channel.queueDeclare(queueName,false,false,false,null);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("收到消息的内容是{}",new String(body));
                }
            });


        } catch (IOException e) {
            log.error("创建Channel失败,原因是",e);
        }

    }
}

helloworld

enter code here


标签:java,运维,Exchange,rabbitmq,ittimeline,RabbitMQ,Java,消息

专注分享技术,共同学习,共同进步。

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有