ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ优先级队列机制(八)

2022-01-29 21:04:39  阅读:147  来源: 互联网

标签:优先级 队列 RabbitMQ priority rabbitmq import com channel


一、什么是优先级队列  

    在服务级级别的测试中需要考虑被执行任务的优先级机制,也就是通过线程优先级来进行,设置优先级的目的

是在资源非常紧张的情况下,让优先级高的任务优先执行,而优先级低的任务排后执行,当然这样的一种设置机制

只能是异步的模式下执行,如果是设计在同步的模式下执行,那这个设计从系统上来说就缺少宏观维度的思考。在

RabbitMQ的机制中也是提供了队列的优先级机制,这样设计的目的也是在在生产者生产过快,而消费者消费不过

来的情况下,也就是资源在紧张或者说是在有限的情况下,设置的队列优先级高的任务它的消息优先进行消费,而

优先级低的消息排后消费。当然,如果是在资源不紧张的情况下,设置优先级其实没多大的意义,因为这个时候优

先过来的消息先进行消费,也谈不上排队的机制和优先级的机制。

二、优先级的实现机制

     针对优先级的设置,在消费者端进行设置,参数具体是x-max-priority,涉及的代码具体如下:

            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);
            channel.queueDeclare(queueName,true,false,false,arguments);

这样消费者的代码执行后,在RabbitMQ的WEB控制台,就可以看到该消息队列显示设置的优先级,具体如下所

示:

如上,我们演示了配置一个队列的最大优先级,其实核心的是需要在生产者发送消息的时候设置当前发送任务的优先级

涉及代码如下:

               AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());

在如上中,我们设置的发送任务的优先级是8。

三、优先级队列实战代码

3.1、生产者代码

package com.example.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerPriority
{
    private  static  final  String exchangeName="test_priority_exchange";
    private  static  final  String routyKey="priority.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i++)
        {
            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

            if (i==0)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else if (i==1)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(3)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(9)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }


        }

    }
}
在如上中,我们针对发送的任务依据编号进行了优先级的设置。

3.2、消费者代码

package com.example.rabbitmq.priority;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerPriority
{
    private static final String exchangeName = "test_priority_exchange";
    private  static  final String queueName="test_priority_queue";
    private  static  final  String routingKey="priority.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);

            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3.3、自定义消费者代码

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    //开启限流,所以需要设置channel
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("the message received:"+new String(body));
        System.err.println("message priority:"+properties.getPriority());

    }
}

3.4、执行结果信息

如上的代码执行后,当然当前的资源不存在紧张的情况,那么就会按正常的顺序消费,具体输出结果如下:

     如上,主要总结了消息队列优先级这部分的总结和它的案例应用。感谢您的阅读!

标签:优先级,队列,RabbitMQ,priority,rabbitmq,import,com,channel
来源: https://www.cnblogs.com/weke/p/15855792.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有