我有以下代码,这些代码按顺序为每个 id 进行读写.
I have the following code which read and write for each id sequentially.
async def main(): while id < 1000: data = await read_async(id) await data.write_async(f'{id}.csv') id += 1read_async()只需几分钟,而 write_async()只需不到一分钟即可运行.现在我要
read_async() takes several minutes and write_async() takes less than one minute to run. Now I want to
您可以使用队列和固定数量的任务来读取和写入主任务.主要任务可以使用事件来发现读者可以使用新数据,并可以通过共享指令从读者那里获取新数据.例如(未试用):
You could use a queue and a fixed number of tasks for reading, and write from the main task. The main task can use an event to find out that new data is available from the readers and and a shared dict to get it from them. For example (untested):
async def reader(q, id_to_data, data_ready): while True: id = await q.get() data = await read_async(id) id_to_data[id] = data data_ready.set() async def main(): q = asyncio.Queue() for id in range(1000): await q.put(id) id_to_data = {} data_ready = asyncio.Event() readers = [asyncio.create_task(reader(q, id_to_data, data_ready)) for _ in 3] for id in range(1000): while True: # wait for the current ID to appear before writing if id in id_to_data: data = id_to_data.pop(id) await data.write_async(f'{id}.csv') break # move on to the next ID else: # wait for new data and try again await data_ready.wait() data_ready.clear() for r in readers: r.cancel()因为结果是无序的,所以使用单独的结果队列代替事件将不起作用.优先级队列可以解决该问题,但仍然可以立即返回当前可用的最低ID,而编写者需要 next id才能按顺序处理所有ID.
Using a separate queue for results instead of the event wouldn't work because a queue is unordered. A priority queue would fix that, bit it would still immediately return the lowest id currently available, whereas the writer needs the next id in order to process all ids in order.
更多推荐
并行读取并顺序写入?
发布评论