我正在尝试使用多重处理来并行化应用程序 很大的csv文件(64MB至500MB),逐行执行一些工作,然后输出固定的小尺寸文件 文件.
I'm trying to a parallelize an application using multiprocessing which takes in a very large csv file (64MB to 500MB), does some work line by line, and then outputs a small, fixed size file.
当前我正在执行list(file_obj),不幸的是它已完全加载 进入内存(我认为),然后我将该列表分成n个部分,n为 我要运行的进程数.然后,我对分解进行pool.map() 列表.
Currently I do a list(file_obj), which unfortunately is loaded entirely into memory (I think) and I then I break that list up into n parts, n being the number of processes I want to run. I then do a pool.map() on the broken up lists.
与单个相比,这似乎具有非常非常糟糕的运行时 线程化,只需打开文件并迭代即可.有人可以吗 提出更好的解决方案?
This seems to have a really, really bad runtime in comparison to a single threaded, just-open-the-file-and-iterate-over-it methodology. Can someone suggest a better solution?
此外,我需要按组处理文件中的行,以保留 某一列的值.这些行组可以自己拆分, 但该列中的任何组都不能包含多个值.
Additionally, I need to process the rows of the file in groups which preserve the value of a certain column. These groups of rows can themselves be split up, but no group should contain more than one value for this column.
推荐答案list(file_obj)在fileobj大时可能需要大量内存.我们可以通过使用 itertools 减少一行代码来减少内存需求因为我们需要它们.
list(file_obj) can require a lot of memory when fileobj is large. We can reduce that memory requirement by using itertools to pull out chunks of lines as we need them.
尤其是,我们可以使用
reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc)将文件拆分为可处理的块,然后
to split the file into processable chunks, and
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] result = pool.map(worker, groups)让多处理池一次处理num_chunks个块.
to have the multiprocessing pool work on num_chunks chunks at a time.
这样做,我们大约只需要足够的内存即可在内存中保存几个(num_chunks)块,而不是整个文件.
By doing so, we need roughly only enough memory to hold a few (num_chunks) chunks in memory, instead of the whole file.
import multiprocessing as mp import itertools import time import csv def worker(chunk): # `chunk` will be a list of CSV rows all with the same name column # replace this with your real computation # print(chunk) return len(chunk) def keyfunc(row): # `row` is one row of the CSV file. # replace this with the name column. return row[0] def main(): pool = mp.Pool() largefile = 'test.dat' num_chunks = 10 results = [] with open(largefile) as f: reader = csv.reader(f) chunks = itertools.groupby(reader, keyfunc) while True: # make a list of num_chunks chunks groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] if groups: result = pool.map(worker, groups) results.extend(result) else: break pool.close() pool.join() print(results) if __name__ == '__main__': main()更多推荐
将大文件中的数据分块进行多处理?
发布评论