一旦任何一个进程在python中找到匹配项,如何让所有pool.apply_async进程停止
问题描述
我有以下代码,它利用多处理来遍历一个大列表并找到匹配项.一旦在任何一个进程中找到匹配项,如何让所有进程停止?我看过一些例子,但我似乎都不适合我在这里所做的事情.
I have the following code that is leveraging multiprocessing to iterate through a large list and find a match. How can I get all processes to stop once a match is found in any one processes? I have seen examples but I none of them seem to fit into what I am doing here.
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
def do_job(first_bits):
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
# CHECK FOR MATCH HERE
print(''.join(x))
# EXIT ALL PROCESSES IF MATCH FOUND
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = []
for i in range(num_parts):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
pool.apply_async(do_job, (first_bit,))
pool.close()
pool.join()
感谢您的宝贵时间.
更新 1:
我已经实现了@ShadowRanger 在伟大方法中建议的更改,它几乎按照我想要的方式工作.因此,我添加了一些日志记录以指示进度,并在其中放置了一个测试"键以进行匹配.我希望能够独立于 num_parts 增加/减少 iNumberOfProcessors.在这个阶段,当我将它们都设置为 4 时,一切都按预期工作,启动了 4 个进程(控制台额外运行了一个).当我更改 iNumberOfProcessors = 6 时,有 6 个进程启动,但只有其中一个进程有任何 CPU 使用率.所以看起来2是空闲的.就像我之前的解决方案一样,我能够在不增加 num_parts 的情况下将核心数量设置得更高,并且所有进程都会被使用.
I have implemented the changes suggested in the great approach by @ShadowRanger and it is nearly working the way I want it to. So I have added some logging to give an indication of progress and put a 'test' key in there to match. I want to be able to increase/decrease the iNumberOfProcessors independently of the num_parts. At this stage when I have them both at 4 everything works as expected, 4 processes spin up (one extra for the console). When I change the iNumberOfProcessors = 6, 6 processes spin up but only for of them have any CPU usage. So it appears 2 are idle. Where as my previous solution above, I was able to set the number of cores higher without increasing the num_parts, and all of the processes would get used.
我不确定如何重构这种新方法以提供相同的功能.您能否看一下并给我一些重构的方向,以便能够相互独立地设置 iNumberOfProcessors 和 num_parts 并且仍然使用所有进程?
I am not sure about how to refactor this new approach to give me the same functionality. Can you have a look and give me some direction with the refactoring needed to be able to set iNumberOfProcessors and num_parts independently from each other and still have all processes used?
这是更新后的代码:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6
def do_job(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
if __name__ == '__main__':
# with statement with Py3 multiprocessing.Pool terminates when block exits
with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:
# Don't need special case for final block; slices can
for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
if gotmatch:
break
else:
print("No matches found")
更新 2:
好的,这是我尝试@noxdafox 建议的尝试.我根据他的建议提供的链接整理了以下内容.不幸的是,当我运行它时,我得到了错误:
Ok here is my attempt at trying @noxdafox suggestion. I have put together the following based on the link he provided with his suggestion. Unfortunately when I run it I get the error:
... 第 322 行,在 apply_async 中raise ValueError("池未运行")ValueError: 池未运行
... line 322, in apply_async raise ValueError("Pool not running") ValueError: Pool not running
谁能给我一些关于如何让它工作的指导.
Can anyone give me some direction on how to get this working.
基本上问题是我第一次尝试进行多处理,但不支持在找到匹配项后取消所有进程.
Basically the issue is that my first attempt did multiprocessing but did not support canceling all processes once a match was found.
我的第二次尝试(基于@ShadowRanger 的建议)解决了这个问题,但破坏了能够独立扩展进程数量和 num_parts 大小的功能,这是我第一次尝试可以做到的.
My second attempt (based on @ShadowRanger suggestion) solved that problem, but broke the functionality of being able to scale the number of processes and num_parts size independently, which is something my first attempt could do.
我的第三次尝试(基于@noxdafox 的建议)抛出了上述错误.
My third attempt (based on @noxdafox suggestion), throws the error outlined above.
如果有人能给我一些关于如何维护我第一次尝试的功能的指导(能够独立缩放进程数量和 num_parts 大小),并添加一旦找到匹配项就取消所有进程的功能不胜感激.
If anyone can give me some direction on how to maintain the functionality of my first attempt (being able to scale the number of processes and num_parts size independently), and add the functionality of canceling all processes once a match was found it would be much appreciated.
感谢您的宝贵时间.
这是我根据@noxdafox 建议第三次尝试的代码:
Here is the code from my third attempt based on @noxdafox suggestion:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4
def find_match(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
class Worker():
def __init__(self, workers):
self.workers = workers
def callback(self, result):
if result:
self.pool.terminate()
def do_job(self):
print(self.workers)
pool = multiprocessing.Pool(processes=self.workers)
for part in grouper(alphabet, part_size):
pool.apply_async(do_job, (part,), callback=self.callback)
pool.close()
pool.join()
print("All Jobs Queued")
if __name__ == '__main__':
w = Worker(4)
w.do_job()
解决方案
可以查看this question 查看解决您问题的实现示例.
You can check this question to see an implementation example solving your problem.
这也适用于 concurrent.futures 池.
This works also with concurrent.futures pool.
只需将 map
方法替换为 apply_async
并从调用方迭代您的列表.
Just replace the map
method with apply_async
and iterated over your list from the caller.
类似的东西.
for part in grouper(alphabet, part_size):
pool.apply_async(do_job, part, callback=self.callback)
石斑鱼食谱
相关文章