Python多处理-进程数
问题描述
我正在执行下面的代码,它工作得很好,但它不会产生不同的进程,相反,有时所有的都运行在同一个进程中,有时一个进程中有两个。我使用的是一台4 CPU的机器。此代码有什么问题?
def f(values):
print(multiprocessing.current_process())
return values
def main():
p = Pool(4) #number of processes = number of CPUs
keys, values= zip(*data.items()) #ordered keys and values
processed_values= p.map( f, values )
result= dict( zip(keys, processed_values ) )
p.close() # no more tasks
p.join() # wrap up current tasks
,结果为
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
有时像这样
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
有时,
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
我的问题是,它是根据什么将职能分配给工人的?我编写代码的方式是,它根据我的字典中的键的数量来决定进程的数量(考虑到我的数据的键总是少于我的CPU)。我的代码将开始类似-主代码读取文件并使用单个进程创建字典,应该将其分支到并发进程数并等待它们处理数据(我使用pool.map来处理数据),然后一旦获得子进程的结果,它就开始处理它们。如何实现此父进程的等待子进程步骤?
解决方案
您的代码没有任何错误。您的工作项非常快--如此之快,以至于同一个工作进程可以运行函数,返回结果,然后赢得使用multiprocessing.Pool
用于分发工作的内部队列中的下一个任务的竞赛。当您调用map
时,工作项被分成批并放置到Queue
中。以下是pool.map
实现的一部分,它将可迭代项分块并将它们放入队列:
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
每个工作进程都运行一个函数,该函数有一个无限的While循环,该循环使用队列中的项*:
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get() # Pulls an item off the taskqueue
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds)) # Runs the function you passed to map
except Exception, e:
result = (False, e)
try:
put((job, i, result)) # Sends the result back to the parent
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))
很可能是同一个Worker碰巧能够消费一个项目,运行func
,然后消费下一个项目。这个有点奇怪--我无法在运行与您的示例相同的代码的机器上重现它--但让同一个Worker从队列中抓取四个项目中的两个是很正常的。
如果通过插入对time.sleep
:
def f(values):
print(multiprocessing.current_process())
time.sleep(1)
return values
*这实际上并不完全正确-在主进程中运行的一个线程使用taskqueue
,然后将它拉出的内容放到另一个Queue
中,这就是子进程使用的内容)
相关文章