异步队列消费者协程

编程入门 行业动态 更新时间:2024-10-25 05:19:37
本文介绍了异步队列消费者协程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个 asyncio.Protocol 子类,它从服务器接收数据。 我将此数据(每行,因为数据是文本)存储在 asyncio.Queue 中。

I have a asyncio.Protocol subclass receiving data from a server. I am storing this data (each line, because the data is text) in a asyncio.Queue.

import asyncio q = asyncio.Queue() class StreamProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): for message in data.decode().splitlines(): yield q.put(message.rstrip()) def connection_lost(self, exc): self.loop.stop() loop = asyncio.get_event_loop() coro = loop.create_connection(lambda: StreamProtocol(loop), '127.0.0.1', '42') loop.run_until_complete(coro) loop.run_forever() loop.close()

我想让另一个协程负责消耗队列中的数据,

I want to have another coroutine responsible for consuming the data in the queue and processing it.

  • 这应该是 asyncio.Task 吗?
  • 如果由于几秒钟没有收到数据而导致队列变空怎么办?我如何确保我的使用者不会停止( run_until_complete )?
  • 有没有一种比使用全局变量更干净的方法我的队列?
  • Should this be a asyncio.Task?
  • What if the queue becomes empty because for a few seconds no data is received? How can I make sure my consumer doesn't stop (run_until_complete)?
  • Is there a cleaner way than using a global variable for my queue?
推荐答案

这应该是asyncio.Task ?

Should this be a asyncio.Task?

是的,使用 asyncio.ensure_future 或 loop.create_task 。

如果队列由于几秒钟而变空怎么办

What if the queue becomes empty because for a few seconds no data is received?

只需使用 queue.get 等到某个商品可用:

Simply use queue.get to wait until an item is available:

async def consume(queue): while True: item = await queue.get() print(item)

是否有比在队列中使用全局变量更清洁的方法?

Is there a cleaner way than using a global variable for my queue?

是的,只需将其作为参数传递给消费者协程,然后流协议:

Yes, simply pass it as argument to the consumer coroutine and stream protocol:

class StreamProtocol(asyncio.Protocol): def __init__(self, loop, queue): self.loop = loop self.queue = queue def data_received(self, data): for message in data.decode().splitlines(): self.queue.put_nowait(message.rstrip()) def connection_lost(self, exc): self.loop.stop()

如何确保我的消费者不会停止(run_until_complete)?

How can I make sure my consumer doesn't stop (run_until_complete)?

一旦关闭连接,请使用 queue.join 到

Once the connection is closed, use queue.join to wait until the queue is empty.

完整示例:

loop = asyncio.get_event_loop() queue = asyncio.Queue() # Connection coroutine factory = lambda: StreamProtocol(loop, queue) connection = loop.create_connection(factory, '127.0.0.1', '42') # Consumer task consumer = asyncio.ensure_future(consume(queue)) # Set up connection loop.run_until_complete(connection) # Wait until the connection is closed loop.run_forever() # Wait until the queue is empty loop.run_until_complete(queue.join()) # Cancel the consumer consumer.cancel() # Let the consumer terminate loop.run_until_complete(consumer) # Close the loop loop.close()

或者,您也可以使用流:

async def tcp_client(host, port, loop=None): reader, writer = await asyncio.open_connection(host, port, loop=loop) async for line in reader: print(line.rstrip()) writer.close() loop = asyncio.get_event_loop() loop.run_until_complete(tcp_client('127.0.0.1', 42, loop)) loop.close()

更多推荐

异步队列消费者协程

本文发布于:2023-11-23 09:59:18,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1621032.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   消费者

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!