我有一个 asyncio.Protocol 子类,它从服务器接收数据。 我将此数据(每行,因为数据是文本)存储在 asyncio.Queue 中。
I have a asyncio.Protocol subclass receiving data from a server. I am storing this data (each line, because the data is text) in a asyncio.Queue.
import asyncio q = asyncio.Queue() class StreamProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): for message in data.decode().splitlines(): yield q.put(message.rstrip()) def connection_lost(self, exc): self.loop.stop() loop = asyncio.get_event_loop() coro = loop.create_connection(lambda: StreamProtocol(loop), '127.0.0.1', '42') loop.run_until_complete(coro) loop.run_forever() loop.close()我想让另一个协程负责消耗队列中的数据,
I want to have another coroutine responsible for consuming the data in the queue and processing it.
- 这应该是 asyncio.Task 吗?
- 如果由于几秒钟没有收到数据而导致队列变空怎么办?我如何确保我的使用者不会停止( run_until_complete )?
- 有没有一种比使用全局变量更干净的方法我的队列?
- Should this be a asyncio.Task?
- What if the queue becomes empty because for a few seconds no data is received? How can I make sure my consumer doesn't stop (run_until_complete)?
- Is there a cleaner way than using a global variable for my queue?
这应该是asyncio.Task ?
Should this be a asyncio.Task?
是的,使用 asyncio.ensure_future 或 loop.create_task 。
如果队列由于几秒钟而变空怎么办
What if the queue becomes empty because for a few seconds no data is received?
只需使用 queue.get 等到某个商品可用:
Simply use queue.get to wait until an item is available:
async def consume(queue): while True: item = await queue.get() print(item)
是否有比在队列中使用全局变量更清洁的方法?
Is there a cleaner way than using a global variable for my queue?
是的,只需将其作为参数传递给消费者协程,然后流协议:
Yes, simply pass it as argument to the consumer coroutine and stream protocol:
class StreamProtocol(asyncio.Protocol): def __init__(self, loop, queue): self.loop = loop self.queue = queue def data_received(self, data): for message in data.decode().splitlines(): self.queue.put_nowait(message.rstrip()) def connection_lost(self, exc): self.loop.stop()
如何确保我的消费者不会停止(run_until_complete)?
How can I make sure my consumer doesn't stop (run_until_complete)?
一旦关闭连接,请使用 queue.join 到
Once the connection is closed, use queue.join to wait until the queue is empty.
完整示例:
loop = asyncio.get_event_loop() queue = asyncio.Queue() # Connection coroutine factory = lambda: StreamProtocol(loop, queue) connection = loop.create_connection(factory, '127.0.0.1', '42') # Consumer task consumer = asyncio.ensure_future(consume(queue)) # Set up connection loop.run_until_complete(connection) # Wait until the connection is closed loop.run_forever() # Wait until the queue is empty loop.run_until_complete(queue.join()) # Cancel the consumer consumer.cancel() # Let the consumer terminate loop.run_until_complete(consumer) # Close the loop loop.close()或者,您也可以使用流:
async def tcp_client(host, port, loop=None): reader, writer = await asyncio.open_connection(host, port, loop=loop) async for line in reader: print(line.rstrip()) writer.close() loop = asyncio.get_event_loop() loop.run_until_complete(tcp_client('127.0.0.1', 42, loop)) loop.close()更多推荐
异步队列消费者协程
发布评论