ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java架构师成长之道之RabbitMQ开发与运维入门-基础篇

2019-08-09 23:54:28  阅读:607  来源: 互联网

标签:java 运维 Exchange rabbitmq ittimeline RabbitMQ Java 消息


Java架构师成长之道之RabbitMQ开发与运维入门-基础篇

Java架构师成长之道

主流消息中间件介绍

MQ衡量指标

  • 服务性能
  • 数据存储
  • 集群架构

ActiveMQ

  • Apache出品
  • 完全支持JMS规范的消息中间件
  • 丰富的API、多种集群构建(Master-Slave,NetWork)模式
  • 广泛运用于中小型企业
  • 性能一般,不适用于高并发、大数据的业务场景

ActiveMQ 集群模式
ActiveMQ 集群模式

Kafka

  • LinkedIn开源的分布式发布-订阅消息系统,后来贡献给Apache,成为顶级开源项目
  • 基于Pull模式处理消费消息
  • 追求高吞吐量,一开始的目的就是用于日志收集和传输,适合大数据的互联网服务的数据收集业务
  • 0.8版本开始支持复制
  • 不支持事务,对于消息的重复、丢失、错误没有严格要求
  • 高性能读写是基于操作系统底层的Page Cache实现,使用内存存储。

基于内存、高性能,节点之间相互复制
Kafka集群模式

RocketMQ

  • 阿里开源的消息中间件,后来贡献给Apache,成为顶级开源项目
  • 使用Java开发,具有高吞吐量、高可用性、适用大规模分布式系统的特点
  • RocketMQ思路源于Kakfa,对消息的可靠传输以及事务性做了优化
  • 目前被阿里集团广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景
  • 商业版收费

RocketMQ集群拓扑图
RocketMQ集群拓扑图

RabbitMQ

  • RabbitMQ是Erlang语言开发的开源消息队列系统,基于AMQP协议实现,用来通过普通协议在完全不同的应用之间共享数据
  • AMQP的主要特征是面向消息、队列、路由(包括点对点发布订阅)、可靠性、安全
  • AMQP协议更多用在企业系统内部,对数据一致性、稳定性、可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
  • 适用在金融交易场景

基于镜像队列实现高可用
RabbitMQ集群架构

RabbitMQ核心概念与AMQP协议

互联网大厂为什么会选择RabbitMQ

RabbitMQ凭借着以下特性在美团、滴滴、去哪儿、头条、艺龙等一线互联网大厂广泛使用

  • 开源、性能优秀、稳定
  • 提供可靠性消息投递模式,返回模式,
  • 与Spring AMPQ完美的整合,API丰富。
  • 集群模式丰富、支持表达式配置、HA模式、镜像队列模型。
  • 保证数据不丢失的前提下做到高可靠性、可用性。

RabbitMQ的高性能之道是如何做到的

RabbitMQ是使用Erlang语言开发,Erlang语言广泛用于网络交换机领域,Erlang有着与原生Socket一样的延迟,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀。

什么是AMQP高级协议

AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和 IEC 国际化标准。目前基于AMQP协议实现的产品包括:

  • Apache Qpid, an Apache project
  • Fedora Linux AMQP Infrastructure
  • IIT Software's SwiftMQ is a enterprise grade JMS messaging product with full support for AMQP 1.0
  • INETCO's AMQP protocol analyzer
  • JORAM: open reliable asynchronous messaging, 100% pure Java implementation of JMS
  • Kaazing's AMQP Web Client
  • Microsoft's Windows Azure Service Bus
  • JBoss A-MQ by Red Hat built from Qpid
  • StormMQ a cloud hosted messaging service based on AMQP
  • VMware Inc RabbitMQ; also supported by SpringSource
  • MQlight by IBM

关于AMPQ协议的完整介绍可以参考官方文档

AMQP核心概念

AMQP协议模型
AMQP协议模型
AMQP协议的原理
AMQP协议的原理

  • AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等

  • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。

  • 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成。

  • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息。

  • Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中。

  • Binding-绑定:Exchange和Exchange、Queue之间的连接关系,绑定中可以包含routing key或者参数。

  • Queue:消息队列,实际存储消息数据,常用的属性有Durability:是否持久化,Durable:是,Transient:否,Auto delete:如果选yes,代表最后一个监听被移除之后,该Queue会自动删除。

  • Message:服务器和应用程序之间传送个的数据,本质上就是一段数据,由Properties和Payload(Body)组成,消息中常用的属性有 delivery mode,headers(自定义属性),content_type,content_encoding,priority,correlation_id,reply_to,expiration,message_id,timestamp,type,user_id,app_id,cluster_id

  • Virtual Host-虚拟主机:虚拟地址,用于进行逻辑隔离,最上层的消息路由,同一个Virtual Host里面不能有同名的Exchange或Queue。

RabbitMQ整体架构模型

RabbitMQ整体架构模型
RabbitMQ整体架构模型

RabbitMQ消息是如何流转

AMPQ消息的流转流程
AMPQ消息流转流程
1.在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了。

2.Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式,我们随后会讲到。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。

3.Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。

4.Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel。

RabbitMQ安装和基本使用

RabbitMQ安装

  1. 系统基础组件和配置

1.1 查看服务器版本
服务端操作系统基于CentOS7.3搭建,可以使用cat /etc/centos-release查看系统版本

[root@ittimeline ~]# cat /etc/centos-release
CentOS Linux release 7.3.1611 (Core) 

1.2 配置hostname
然后使用vim /etc/hostname修改host配置,因为RabbitMQ Server日志的文件名会依赖hostname
vim编辑器的基本使用:
首先使用vim修改文件时进入的浏览模式,需要使用i键盘进入编辑模式
修改完成文件之后,需要使用esc退出编辑模式,然后使用:wq退出vim并保存修改的内容

[root@ittimeline ~]# vim  /etc/hostname 

查看修改的内容

[root@ittimeline ~]# cat /etc/hostname                      
ittimeline.net

1.3 关闭防火墙
CentOS7的防火墙默认是开机启动的,这里为了方便使用RabbitMQ基于BS的管控台,需要关闭防火墙。

[root@ittimeline ~]# systemctl stop firewalld.service

还可以使用如下命令开机禁用防火墙

[root@ittimeline ~]# systemctl disable  firewalld.service

1.4 网卡开机启动

[root@ittimeline ~]# cd /etc/sysconfig/network-scripts/
 [root@ittimeline network-scripts]# cat ifcfg-eth0 
# Generated by parse-kickstart
DEVICE="eth0"
IPV6INIT="yes"
BOOTPROTO="dhcp"
UUID="c79432fa-c801-4f60-b4c1-a9cb34eeae2c"
ONBOOT="yes"
TYPE=Ethernet
DEFROUTE=yes
PEERDNS=yes
PEERROUTES=yes
IPV4_FAILURE_FATAL=no
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_PEERDNS=yes
IPV6_PEERROUTES=yes
IPV6_FAILURE_FATAL=no
IPV6_PRIVACY=no
NAME="System eth0" 

将属性ONBOOT的值改为yes即可,然后重启系统,就可以实现开机启动网络连接。

1.5 安装系统基础组件

在安装rabbitMQ之前使用yum install安装以下组件,否则rabbitMQ会安装失败。

[root@ittimeline ~]# yum install -y build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m3 ncurse-devel tk tc xz
  1. erlang、socat、rabbitmq下载
    首先根据官网给出的Erlang和RabbitMQ版本对应关系
    选择Erlang 22.0.X +RabbitMQ 3.7.17,版本确定以后,可以从官网下载erlang和rabbitmq安装包。

erlang针对CentOS7的rpm包
erlang针对CentOS7的rpm包
abbitmq针对CentOS7的rpm包
rabbitmq针对CentOS7的rpm包

下载erlang和rabbitmq只需要使用wget命令然后加上下载的地址即可
由于rabbitmq server依赖socat,还需要从阿里云镜像站上下载socat,注意它们的下载顺序和安装顺序一致

[root@ittimeline ~]# cd /home/liuguanglei/Downloads/
root@ittimeline Downloads]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.0.7-1.el7.x86_64.rpm/download.rpm

[root@ittimeline Downloads]# wget https://mirrors.aliyun.com/centos/7.6.1810/os/x86_64/Packages/socat-1.7.3.2-2.el7.x86_64.rpm

[root@ittimeline Downloads]# wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.7.17-1.el7.noarch.rpm/download.rpm

查看下载的软件版本

[root@ittimeline Downloads]# ls -al
total 29428
drwxr-xr-x.  2 liuguanglei liuguanglei     4096 Aug  6 11:50 .
drwx------. 16 liuguanglei liuguanglei     4096 Aug  6 10:19 ..
-rw-r--r--.  1 root        root        19384788 Jul 12 01:31 erlang-22.0.7-1.el7.x86_64.rpm
-rw-r--r--.  1 root        root        10438300 Jul 29 12:07 rabbitmq-server-3.7.17-1.el7.noarch.rpm
-rw-r--r--.  1 root        root          296632 Aug 11  2017 socat-1.7.3.2-2.el7.x86_64.rpm
  1. erlang、socat、rabbitmq安装
    rpm的安装只需要使用rpm -ivh命令安装即可。
[root@ittimeline Downloads]# rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm 
[root@ittimeline Downloads]# rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm 
[root@ittimeline Downloads]# rpm -ivh rabbitmq-server-3.7.17-1.el7.noarch.rpm 

4.修改配置
使用rpm安装方式安装的rabbitmq的配置默认路径是/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app
这里暂时只是修改loopback_users,让guest用户能够使用ip登录,如果不修改的话,登录RabbitMQ 管控台会看到如下提示
用户只能使用localhost登录

[root@ittimeline Downloads]# vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app 

修改之前
loopback_users
修改之后
loopback_users
就是把loopback_users的["<<guest>>"]修改为[guest]即可

RabbitMQ基本使用

  1. 启动rabbitmq-server
    使用rabbitmq-server start & 启动rabbitmq服务
[root@ittimeline Downloads]# rabbitmq-server start &
[1] 25834
[root@ittimeline Downloads]# 
  ##  ##
  ##  ##      RabbitMQ 3.7.17. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /var/log/rabbitmq/rabbit@ittimeline.log
                    /var/log/rabbitmq/rabbit@ittimeline_upgrade.log

              Starting broker...
 completed with 0 plugins.

RabbitMQ服务启动后会输出版本信息以及日志路径,其日志路径
Logs: /var/log/rabbitmq/rabbit@ittimeline.log
/var/log/rabbitmq/rabbit@ittimeline_upgrade.log

如果想要在终端上输入其他命令,回车即可,此时RabbitMQ服务进入后台运行,可以使用ps命令查看rabbitmq进程信息

Last login: Tue Aug  6 10:20:39 2019 from 172.16.237.111
[root@ittimeline ~]# ps -ef|grep rabbitmq
root     25834  4615  0 12:12 pts/1    00:00:00 /sbin/runuser -u rabbitmq -- /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 25843 25834  0 12:12 pts/1    00:00:00 /bin/sh /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 25940     1  0 12:12 ?        00:00:00 /usr/lib64/erlang/erts-10.4.4/bin/epmd -daemon
rabbitmq 25995 25843  0 12:12 pts/1    00:00:04 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -B i -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin  -noshell -noinput -s rabbit boot -sname rabbit@ittimeline -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit lager_log_root "/var/log/rabbitmq" -rabbit lager_default_file "/var/log/rabbitmq/rabbit@ittimeline.log" -rabbit lager_upgrade_file "/var/log/rabbitmq/rabbit@ittimeline_upgrade.log" -rabbit feature_flags_file "/var/lib/rabbitmq/mnesia/rabbit@ittimeline-feature_flags" -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@ittimeline-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@ittimeline" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 start
rabbitmq 26097 25995  0 12:12 ?        00:00:00 erl_child_setup 1024
rabbitmq 26127 26097  0 12:12 ?        00:00:00 inet_gethost 4
rabbitmq 26128 26127  0 12:12 ?        00:00:00 inet_gethost 4
root     28310 28242  0 12:21 pts/0    00:00:00 grep --color=auto rabbitmq

使用lsof(list open files)查看进程进程打开的文件,进程打开的端口,打开文件的进程。。

[root@ittimeline ~]# lsof -i:5672
COMMAND    PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
beam.smp 25995 rabbitmq   77u  IPv6 148008      0t0  TCP *:amqp (LISTEN)

2.关闭rabbitmq server

[root@ittimeline Downloads]# rabbitmqctl stop
Stopping and halting node rabbit@ittimeline ...
Gracefully halting Erlang VM

3.启用 RabbitMQ Server Web管控台

使用rabbitmq-plugins enable rabbitmq_management启用RabbitMQ Server的Web管理界面

[root@ittimeline Downloads]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@ittimeline:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@ittimeline...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

set 3 plugins.
Offline change; changes will take effect at broker restart.

然后就可以使用浏览器访问地址http://172.16.238.193:15672/
然后输入guest/guest后点击Login按钮登录RabbitMQ管控台
登录RabbitMQ管控台
RabbitMQ管控台预览页面
RabbitMQ管控台预览页面

其中172.16.238.193是CentOS的ip地址,可以使用ifconfig命令查看

[root@ittimeline Downloads]# ifconfig
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 172.16.238.193  netmask 255.255.252.0  broadcast 172.16.239.255
        inet6 fe80::21c:42ff:feb5:d22e  prefixlen 64  scopeid 0x20<link>
        ether 00:1c:42:b5:d2:2e  txqueuelen 1000  (Ethernet)
        RX packets 1110406  bytes 326972310 (311.8 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 39752  bytes 5885101 (5.6 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73<UP,LOOPBACK,RUNNING>  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        inet6 ::1  prefixlen 128  scopeid 0x10<host>
        loop  txqueuelen 1  (Local Loopback)
        RX packets 1616  bytes 105899 (103.4 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 1616  bytes 105899 (103.4 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

virbr0: flags=4099<UP,BROADCAST,MULTICAST>  mtu 1500
        inet 192.168.122.1  netmask 255.255.255.0  broadcast 192.168.122.255
        ether 52:54:00:16:27:85  txqueuelen 1000  (Ethernet)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 0  bytes 0 (0.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

其中eth0表示第一块网卡。
而15672是RabbitMQ管控台的访问端口,常用的端口还有客户端程序(Java,Python)通讯的5672端口和集群的25672端口。

RabbitMQ管理

RabbitMQ命令使用

RabbitMQ提供了rabbitmqctl工具用于管理RabbitMQ,具体的使用可以使用rabbitmqctl --helpman rabbitmqctl查看对应的帮助以及手册,因为命令提供的功能在管控台中已经实现,而基于管控台的操作更加便捷和直观。

  • rabbitmqctl 应用管理

    • rabbitmqctl stop_app:关闭应用
    • rabbitmqctl start_app:启动应用
    • rabbitmqctl status:节点状态
  • rabbitmqctl用户管理

    • rabbitmqctl add_user username password :添加用户
    • rabbitmqctl list_users:列出所有用户
    • rabbitmqctl delete_user username:删除用户
    • rabbitmqctl clear_permissions -p vhostpath username:清除权限
    • rabbitmqctl list_user_permissions username:列出用户权限
    • rabbitmqctl change_password username newpassword:修改密码
    • rabbitmqctl set_permissions -p vhostpath username "".""."*":设置用户权限
  • rabbitmqctl 虚拟主机管理

    • rabbitmqctl add_vhost vhostpath:创建虚拟主机
    • rabbitmqctl list_vhosts:列出所有虚拟主机
    • rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上所有的权限
    • rabbitmqctl delete_vhost vhostpath :删除虚拟主机
  • rabbitmqctl 队列管理

    • rabbitmqctl list_queues:查看所有队列消息
    • rabbitmqctl -p vhostpath purge_queue blue:清除队列的消息
  • rabbitmqctl 高级操作(集群)

    • rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用
    • rabbitmqctl join_cluster [--ram]:组成集群命令
    • rabbitmqctl cluster_status:查看集群状态
    • rabbitmqctl change_cluster_node_type disc|ram:修改集群节点的存储形式
    • rabbitmqctl forget_cluster_node [--offline]:忘记节点(摘除节点)
    • rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2][newnode2]:修改节点名称

RabbitMQ管控台使用

浏览器中输入地址 http://172.16.238.193:15672/ 后,并输入guest/guest账号访问RabbitMQ
登录成功之后默认进入的是预览页面,预览页面给出了erlang和rabbit server的版本信息
以及Connetions,Channels,Exxhanges,Queues、Consumers信息以及当前节点的信息。
RabbitMQ管控台预览页面

RabbitMQ 消息生产与消费

基于Java-Client API实现

首先实现基于RabbitMQ java-client实现的获取RabbitMQ Server的连接

在java-client-utils的src/main/resources目录下定义rabbitmq.properties
同学可以根据自己的RabbitMQ Server配置作相应的修改

#rabbitmq.host=192.168.0.102
rabbitmq.host=172.16.238.193
rabbitmq.port=5672
rabbitmq.virtualhost=/
rabbitmq.username=guest
rabbitmq.password=guest

然后定义RabbitMQUtils类,主要用于读取rabbitmq.properties,然后借助RabbitMQ Java Client的ConnectionFactory创建连接以及关闭连接。

 package net.ittimeline.java.middleware.rabbitmq.java.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

/**
 * Rabbit Message Queue 工具类
 * 封装获取连接和关闭连接方法
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 19:53
 * @website www.ittimeline.net
 * @since JDK11.03
 * @see https://www.rabbitmq.com/java-client.html
 */
@Slf4j
public class RabbitMQUtils {


    /**
     * 基于rabbitmq java client构建客户端与rabbitmq server连接
     * @return
     */
    public static Connection buildConnection(){
        Connection connection=null;
        try(
                //读取src/main/resources目录下的rabbitmq.properties
                InputStream inputStream= RabbitMQUtils.class.getClassLoader().getResourceAsStream("rabbitmq.properties")
        ){
            ConnectionFactory connectionFactory=new ConnectionFactory();
            Properties properties = new Properties();
            properties.load(inputStream);
            connectionFactory.setHost(properties.getProperty("rabbitmq.host"));
            connectionFactory.setPort(Integer.valueOf(properties.getProperty("rabbitmq.port")));

            connectionFactory.setVirtualHost(properties.getProperty("rabbitmq.virtualhost"));
            connectionFactory.setUsername(properties.getProperty("rabbitmq.username"));
            connectionFactory.setPassword(properties.getProperty("rabbitmq.password"));
            connection=connectionFactory.newConnection();
        }catch (IOException | TimeoutException e){
            e.printStackTrace();
            log.error("创建ConnectionFactory失败",e);

        }

        return connection;
    }


    /**
     * 关闭Rabbit Server 连接
     * @param connection
     */
    public static void close(Connection connection){
        if(connection!=null){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
                log.error("关闭RabbitMQ连接失败",e);
            }
        }
    }

}

基于RabbitMQ Java Client生产消息实现

package net.ittimeline.java.middleware.rabbitmq.java.client.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生产消息
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 17:50
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class ProducerMessage {


    public static void main(String[] args) {

        Connection connection = RabbitMQUtils.buildConnection();
        /**
         * 交换器的名称
         * 指明消息需要发送到哪个交换器中
         * 如果设置为空字符串,则消息会发送到RabbitMQ默认的交换器中 AMQP Exchange
         * 默认交换隐式绑定到每个队列,路由密钥等于队列名称。无法显式绑定到默认交换或从绑定取消绑定。它也无法删除。
         */
        String exchangeName = "";
        /**
         * 路由键
         * 交换器根据路由键将消息存储到相应的队列中
         * routingKey的值和QueueName相同
         */
        String routingKey = "ittimeline";


        String message = "Hello,RabbitMQ";
        /**
         * 消息体,真正需要发送消息
         */
        byte[] messageBody = message.getBytes();
        try (Channel channel = connection.createChannel()) {
            channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBody);
            log.info("发送RabbitMQ消息成功 ,消息的内容是{}", message);

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            log.error("创建Channel失败", e);
        } finally {
           // RabbitMQUtils.close(connection);
        }

    }
}

生产消息管控台界面
生产消息

基于RabbitMQ Java Client消费消息

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 消费消息
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-06 17:50
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class ConsumerMessage {

    public static void main(String[] args) {


        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();
            boolean autoAck=false;
            String queueName="ittimeline";
            String consumerTag="customConsumerTag";
            //声明(创建)一个队列  启动时先启动Consumer,因为必须要先有队列,生产者才能将消息通过Exchange发送到队列上
            channel.queueDeclare(queueName,true,false,false,null);
            //创建消费者
            channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String routingKey=envelope.getRoutingKey();
                    String contentType=properties.getContentType();
                    String message=new String(body);
                    long deliveryTag=envelope.getDeliveryTag();
                    log.info("routingKey {} contentType {},message{},deliveryTag {}",routingKey,contentType,message,deliveryTag);

                    channel.basicAck(deliveryTag,false);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        finally{
           // RabbitMQUtils.close(connection);
        }


    }
}

消费消息管控台
消费消息

基于Spring AMQP实现

基于SpringBoot 实现

基于SpringCloud实现

RabbitMQ交换机详解

Exchange:接收客户端发送的消息,并根据消息的routing key转发消息至所绑定的队列
Exchange

交换机属性

  • Name:交换机名称
  • Type:direct,topic,fanout,headers
  • Durability:是否持久化,true为持久化
  • Auto Delete :当最后一个绑定到Exchange上的队列删除之后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议定制化使用

RabbitMQ常用的Exchange

  • Direct Exchange
    Direct Exchange

所有发送到Direct Exchange的消息被转发到Routing key中指定的Queue

Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(bingding)操作,消息传递时,Routing key必须完全匹配才会被队列接收,否则该消息会被抛弃。

基于Direct Exchange 生产消息

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于Direct Exchange 生产消息
 * Producer和Consumer的routing key必须完全一致
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:00
 * @website www.ittimeline.net
 * @since JDK11.03
 */

@Slf4j
public class DirectExchangeProducerMessage {


    public static void main(String[] args) {


        try {
            Connection connection= RabbitMQUtils.buildConnection();
            final Channel channel=connection.createChannel();

            String exchangeName="test_direct_exchange";
            String routingKey="test.direct";
            String message="Hello,RabbitMQ With direct Exchange qqq ";

            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,原因是",e);

        }


    }
}

基于direct exchange消费消息

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于direct exchange消费消息
 *Producer和Consumer的routing key必须完全一致
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:05
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class DirectExchangeConsumerMessage {


    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            final Channel channel=connection.createChannel();

            boolean autoAck=false;

            String exchangeName="test_direct_exchange";
            String exchangeType="direct";
            String queueName="test_direct_queue";
            String routingKey="test.direct";

            //声明exchange
            channel.exchangeDeclare(exchangeName,exchangeType,false,false,false,null);

            //声明队列
            channel.queueDeclare(queueName,true,false,false,null);
            //根据routing key 绑定exchange和队列
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,autoAck,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    log.info("收到消息的内容是{} ",new String(body));
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,异常是",e);
        }

    }
}

Topic Exchange

所有发送到Topic Exchange的消息被转发到所关心Routing Key中所指定Topic的Queue上。
Exchange将Routing key和某个Topic进行匹配,此时队列需要绑定一个Topic。
可以使用通配符进行模糊匹配

  • 符号"#"匹配一个或者多个词
  • 符号 "*"匹配一个词
    例如log.#能够匹配到log.info.oa
    log.*只会匹配到log.error

TopIic Exchange的路由规则
Topic Exchange

基于topic exchange实现的消息生产端

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于topic Exchange生产消息
 * 消息消费端配置account.#,消息生产端配置account.get或者account.list.info都是可以收到消息
 * 消息消费端配置account.*,消息生产端端配置account.get可以收到,但是account.list.info就收不到了,因为account.*只能匹配一个词
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 14:48
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class TopicExchangeProducerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_topic_exchange";

            String routingKey="account.get";
            String message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.save";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.update";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

            routingKey="account.delete";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);


            routingKey="account.list.info";
            message="Hello RabbitMQ With Topic Exchange ..."+routingKey;
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("发送RabbitMQ消息成功,消息的内容是{}",message);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建channel失败,原因是",e);
        }

    }
}

基于topic exchange实现的消息消费端

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于topic exchange 消费消息
 * 消息消费端配置account.#,消息生产端配置account.get或者account.list.info都是可以收到消息
 * 消息消费端配置account.*,消息生产端端配置account.get可以收到,但是account.list.info就收不到了,因为account.*只能匹配一个词
 *
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 15:03
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class TopicExchangeConsumerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_topic_exchange";
            String exchangeType="topic";

            String queueName="test_topic_queue";
            //String routingKey="account.#";
            //如果routingKey换成account.* 那么应该是收不到生产消息的routingKey为 account.list.info,因为*只能匹配一个词
            String routingKey="account.*";

            //声明exchange
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            //声明queue
            channel.queueDeclare(queueName,true,false,false,null);
            //根据routingKey绑定exchange和queue
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    log.info("收到消息的内容是{}",new String(body));
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
            log.error("创建Channel失败,失败的原因是",e);
        }

    }
}

在测试基于Topic Exchange的消息发送和接收时,如果发现Exchange和Queue有俩routing key时,可以使用Rabbit Server管控台实现解绑。

解绑Exchange和队列
解绑Exchange和队列

如果不解绑的话,测试程序可能会得到错误的结果
如下图所示,因为消息消费端的routing key是account.*,由于没有解绑之前的account.#,因此收到了routing key 为account.list.info的消息。
基于account.*匹配的消息发送

基于account.*匹配的消息接收
基于account.*匹配的消息接收

Fanout Exchange

Facount Exchange不需要路由键,只需要简单的将队列绑定到交换机上,发送到该交换机的消息都会被转发到与该交换机绑定的队列上。Fanout交换机转发消息是最快的。
Fanout Exchange

基于fanout exchange实现的消息生产端

package net.ittimeline.java.middleware.rabbitmq.java.client.producer.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于fanout exchange的消息生产端
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 16:53
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class FanoutExchangeProducerMessage {

    public static void main(String[] args) {

        Connection connection= RabbitMQUtils.buildConnection();
        try {
            Channel channel=connection.createChannel();

            String exchangeName="test_fanout_exchange";
            //不设置路由键
            //String routingKey="";
            //或者设置任意的路由键
            //消息消费方都可以收到
            String routingKey="any";
            String message="Hello RabbitMQ With Fanout Exchange";
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            log.info("RabbitMQ消息发送成功,发送的内容是{}",message);

        } catch (IOException e) {
           log.error("创建Channel失败,原因是",e);
        }

    }
}

基于fanout exchange实现的消息消费端

package net.ittimeline.java.middleware.rabbitmq.java.client.consumer.exchange;

import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import net.ittimeline.java.middleware.rabbitmq.java.utils.RabbitMQUtils;

import java.io.IOException;

/**
 * 基于fanout exchange的消息消费端
 * @author liuguanglei 18601767221@163.com
 * @create 2019-08-07 16:54
 * @website www.ittimeline.net
 * @since JDK11.03
 */
@Slf4j
public class FanoutExchangeConsumerMessage {

    public static void main(String[] args) {
        try {
            Connection connection= RabbitMQUtils.buildConnection();

            Channel channel=connection.createChannel();
            String exchangeName="test_fanout_exchange";
            String exchangeType="fanout";
            //不设置路由键
            String routingKey="";
            String queueName="test_fanout_queue";

            channel.exchangeDeclare(exchangeName,exchangeType,false,false,null);
            channel.queueDeclare(queueName,false,false,false,null);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    log.info("收到消息的内容是{}",new String(body));
                }
            });


        } catch (IOException e) {
            log.error("创建Channel失败,原因是",e);
        }

    }
}

helloworld

enter code here

标签:java,运维,Exchange,rabbitmq,ittimeline,RabbitMQ,Java,消息
来源: https://www.cnblogs.com/ittimeline/p/11330116.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有