ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python获取mq队列数据报Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg ‘x-max-priority‘

2021-11-26 17:33:58  阅读:318  来源: 互联网

标签:queue task name exchange python priority 406 kombu example


缘由

使用kombu读取队列数据的时候报如下错误

amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg ‘x-max-priority’ for queue ‘douyin.pg.logo.ready’ in vhost ‘douyin_pggolden’: received none but current is the value ‘5’ of type ‘signedint’

错误原因

从队列中读取消息的代码如下

from kombu import Connection
rabitmq_user = "admin"
rabitmq_pwd = "admin"
rabitmq_ip = "8.8.8.8"
queue_name = "example.queue"
virtual_host = "example"
src_rabitmq_host = "amqp://%s:%s@%s:5672/%s"%(rabitmq_user,rabitmq_pwd,rabitmq_ip,virtual_host)
conn = Connection(src_rabitmq_host)
rec = conn.SimpleQueue(queue_name)
while True:
    msg = rec.get(block=True, timeout=1)
    msg.ack()
    #从队列中获取消息
    print(msg.payload)
    break

因为rabitmq的队列中设置了x-max-priority
在这里插入图片描述

解决办法

我们需要将SimpleQueue改成Queue即可

  • 使用kombu
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcall
from kombu import Exchange, Queue

logger = get_logger(__name__)
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri',queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5)]


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         accept=['pickle', 'json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        fun = body['fun']
        args = body['args']
        kwargs = body['kwargs']
        logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwargs)
        except Exception as exc:
            logger.error('task raised exception: %r', exc)
        message.ack()


if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging

    # setup root logger
    setup_logging(loglevel='INFO', loggers=[''])

    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        try:
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')
  • 使用celery和kombu
from celery import Celery
from kombu import Exchange, Queue, binding

def route_task(exchange_name, exchange_type):
    def wrapper(name, args, kwargs, options, task=None, **kw):
        return {
            'exchange': exchange_name,
            'exchange_type': exchange_type,
            'routing_key': name
        }

    return wrapper

def create_task_queues(binding_list, exchange):
    binding_map = {}
    queues = []

    for routing_key, queue_name in binding_list:
        binding_map.setdefault(queue_name, [])
        binding_map[queue_name].append(routing_key)

    for queue_name, routing_keys in binding_map.items():
        queues.append(
            Queue(
                queue_name,
                [binding(exchange, routing_key=routing_key) for routing_key in routing_keys],
                queue_arguments={'x-queue-mode': 'lazy', 'x-max-priority': 10},max_priority=5
            )
        )
    return queues

class ExampleConfig(object):
    broker_url = 'amqp://guest:guest@localhost:5672//'
    task_default_exchange = 'example'
    task_default_exchange_type = 'topic'
    bindings = [
        ('example.queue', 'example.queue')
    ]
    task_queues = create_task_queues(bindings, Exchange(task_default_exchange, type=task_default_exchange_type))
    task_routes = (route_task(task_default_exchange, task_default_exchange_type),)


app_example= Celery("example")
app_example.config_from_object(ExampleConfig)

@app_example.task(name="example.queue")
def pg_logo_recon(**kwargs):
	pass

标签:queue,task,name,exchange,python,priority,406,kombu,example
来源: https://blog.csdn.net/sinat_29957455/article/details/121560808

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有