带有生成器的 Python 多处理
问题描述
我正在尝试处理一个文件(每一行都是一个 json 文档).文件的大小可以达到 100 mbs 到 gb 的.所以我写了一个生成器代码来逐行从文件中获取每个文档.
I'm trying to process a file(every line is a json document). The size of the file can go up to 100's of mbs to gb's. So I wrote a generator code to fetch each document line by line from file.
def jl_file_iterator(file):
with codecs.open(file, 'r', 'utf-8') as f:
for line in f:
document = json.loads(line)
yield document
我的系统有 4 个核心,所以我想并行处理 4 行文件.目前我有这段代码,一次需要 4 行,并调用代码进行并行处理
My system has 4 cores, So I would like to process 4 lines of the file in parallel. Currently I have this code which takes 4 lines at a time and calls the code for parallel processing
threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
files.append(jl)
if i % (threads) == 0:
# pool.map(processFile, files)
parallelProcess(files, o)
files = []
i += 1
if files:
parallelProcess(files, o)
files = []
这是我进行实际处理的代码
This is my code where actual processing happens
def parallelProcess(files, outfile):
processes = []
for i in range(len(files)):
p = Process(target=processFile, args=(files[i],))
processes.append(p)
p.start()
for i in range(len(files)):
processes[i].join()
def processFile(doc):
extractors = {}
... do some processing on doc
o.write(json.dumps(doc) + '
')
如您所见,我等待所有 4 行完成处理,然后再发送接下来的 4 个文件进行处理.但是我想做的是,一旦一个进程完成处理文件,我就想开始下一行以分配给已发布的处理器.我怎么做?
As you can see I wait for all the 4 lines to finish processing before I send the next 4 files to process. But what I would like to do is as soon as one process finish processing file I want to start the next line to be assigned to realeased processor. How do I do that?
PS:问题是因为它是一个生成器,所以我无法加载所有文件并使用 map 之类的东西来运行进程.
PS: The problem is since its an generator I cannot load all the files and use something like map to run the processes.
感谢您的帮助
解决方案
正如@pvg 在评论中所说,(有界)队列是在不同速度的生产者和消费者之间进行调解的自然方式,确保他们都保持不变尽可能忙,但不让制作人领先.
As @pvg said in a comment, a (bounded) queue is the natural way to mediate among a producer and consumers with different speeds, ensuring they all stay as busy as possible but without letting the producer get way ahead.
这是一个独立的可执行示例.队列被限制为等于工作进程数的最大大小.如果消费者的运行速度比生产者快得多,那么让队列变得更大是很有意义的.
Here's a self-contained, executable example. The queue is restricted to a maximum size equal to the number of worker processes. If the consumers run much faster than the producer, it could make good sense to let the queue get bigger than that.
在您的特定情况下,将行传递给消费者并让他们并行执行 document = json.loads(line)
部分可能是有意义的.
In your specific case, it would probably make sense to pass lines to the consumers and let them do the document = json.loads(line)
part in parallel.
import multiprocessing as mp
NCORE = 4
def process(q, iolock):
from time import sleep
while True:
stuff = q.get()
if stuff is None:
break
with iolock:
print("processing", stuff)
sleep(stuff)
if __name__ == '__main__':
q = mp.Queue(maxsize=NCORE)
iolock = mp.Lock()
pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
for stuff in range(20):
q.put(stuff) # blocks until q below its max size
with iolock:
print("queued", stuff)
for _ in range(NCORE): # tell workers we're done
q.put(None)
pool.close()
pool.join()
相关文章