RabbitMQ消费者连接在空闲90秒后死亡(RabbitMQ consumer connection dies after 90 seconds idle)

编程入门 行业动态 更新时间:2024-10-26 13:29:22
RabbitMQ消费者连接在空闲90秒后死亡(RabbitMQ consumer connection dies after 90 seconds idle)

我有一个RabbitMQ任务队列和一个Pika消费者来使用这些任务(使用acks)。 问题是连接在90秒空闲后死亡但我的任务通常需要更长的时间。 这意味着,当仍在计算任务时,它们将返回到任务队列并且永远不会被激活。

使用带有channel.basic_consume()方法的RabbitMQ 3.5.3和Pika 0.9.14。 该连接的heartbeat_interval为30秒。

消费代码:

import pika from time import sleep RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30" QUEUE_NAME = "my_queue" def callback(ch, method, properties, body): print body sleep(91) # if sleep value < 90 this code works (even 89) ch.basic_ack(delivery_tag=method.delivery_tag) parameters = pika.URLParameters(RABBITMQ_URL) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=QUEUE_NAME) channel.start_consuming()

追溯:

Traceback (most recent call last): File "main.py", line 19, in <module> channel.basic_consume(callback, queue=QUEUE_NAME) File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume {'consumer_tag': consumer_tag})]) File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc self.connection.process_data_events() File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events if self._handle_read(): File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read if self._read_poller.ready(): File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner return f(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready self.poll_timeout) select.error: (9, 'Bad file descriptor')

I have a RabbitMQ task queue and a Pika consumer to consume these tasks (with acks). The problem is that the connection dies after 90 seconds Idle but my tasks will often take longer than that. That means that while tasks are still being computed they are returned to the task queue and never acked.

Using RabbitMQ 3.5.3 and Pika 0.9.14 with the channel.basic_consume() method. The connection has a heartbeat_interval of 30 seconds.

Consume code:

import pika from time import sleep RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30" QUEUE_NAME = "my_queue" def callback(ch, method, properties, body): print body sleep(91) # if sleep value < 90 this code works (even 89) ch.basic_ack(delivery_tag=method.delivery_tag) parameters = pika.URLParameters(RABBITMQ_URL) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=QUEUE_NAME, durable=True) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue=QUEUE_NAME) channel.start_consuming()

Traceback:

Traceback (most recent call last): File "main.py", line 19, in <module> channel.basic_consume(callback, queue=QUEUE_NAME) File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume {'consumer_tag': consumer_tag})]) File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc self.connection.process_data_events() File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events if self._handle_read(): File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read if self._read_poller.ready(): File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner return f(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready self.poll_timeout) select.error: (9, 'Bad file descriptor')

最满意答案

这里的问题是因为你睡了这么久,pika无法响应来自RabbitMQ的心跳请求,当发生这种情况时,RabbitMQ将关闭连接。

解决这个问题的唯一方法是禁用心跳或以较小的间隔睡眠,并连续运行process_data_events()以便鼠兔可以处理心跳。

比如这样的事情

def amqp_sleep(connection, time_to_sleep=20): remaining = time_to_sleep while remaining > 0: connection.process_data_events() time.sleep(5) remaining -= 5

就个人而言,我会去一个自动处理后台心跳的图书馆,所以你不必处理它们,例如兔子或我自己的amqp风暴 。

The problem here is that because you are sleeping for so long, pika cannot respond to heartbeat requests from RabbitMQ, and when this happens, RabbitMQ will close the connection.

The only way around this is to either disable heartbeats or sleep in smaller intervals and run process_data_events() continuously so that pika can handle the heartbeats.

e.g. something like this

def amqp_sleep(connection, time_to_sleep=20): remaining = time_to_sleep while remaining > 0: connection.process_data_events() time.sleep(5) remaining -= 5

Personally though I would go for a library that automatically handles the heartbeats in the background so you don't have to deal with them, e.g. rabbitpy or my own amqp-storm.

更多推荐

本文发布于:2023-07-26 10:33:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1274356.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:消费者   consumer   RabbitMQ   seconds   idle

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!