Python多处理帮助在条件下退出

问题描述

我在 Python 中的多处理方面大吃一惊,但我没有任何运气来围绕这个主题.基本上我有一个运行耗时的程序.我需要在 1 到 100 的范围内运行它,但是一旦满足我正在寻找的条件,我想中止所有进程.条件是返回值 == 90.

I'm breaking my teeth on multiprocessing within Python but I'm not having any luck wrapping my head around the subject. Basically I have a procedure that is time consuming to run. I need to run it for a range of 1 to 100 but I'd like to abort all processes once the condition I'm looking for has been met. The condition being the return value == 90.

这是一段非多进程的代码.谁能给我一个例子,说明他们如何将其转换为多进程函数,一旦满足90"的条件,代码将退出所有进程?

Here is a non multiprocess chunk of code. Can anyone give me an example of how they would convert it to a multiprocess function where the the code will exit all process once the condition of "90" has been met?

def Addsomething(i):
    SumOfSomething = i + 1    
    return SumOfSomething

def RunMyProcess():
    for i in range(100):
        Something = Addsomething(i)
        print Something
    return

if __name__ == "__main__":
    RunMyProcess()

我在测试第三版时遇到了这个错误.知道是什么原因造成的吗?

I got this error while testing the 3rd version. Any idea what is causing this?

Exception in thread Thread-3:
Traceback (most recent call last):
  File "C:Python27lib	hreading.py", line 554, in __bootstrap_inner
    self.run()
  File "C:Python27lib	hreading.py", line 507, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:Python27libmultiprocessingpool.py", line 379, in _handle_results
    cache[job]._set(i, obj)
  File "C:Python27libmultiprocessingpool.py", line 527, in _set
    self._callback(self._value)
  File "N:PV\_Proposals2013ESS - Clear Sky1-CODEMultiTest3.py", line 20, in check_result
    pool.terminate()
  File "C:Python27libmultiprocessingpool.py", line 423, in terminate
    self._terminate()
  File "C:Python27libmultiprocessingutil.py", line 200, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "C:Python27libmultiprocessingpool.py", line 476, in _terminate_pool
    result_handler.join(1e100)
  File "C:Python27lib	hreading.py", line 657, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread


解决方案

也许你正在寻找这样的东西?请记住,我是为 Python 3 编写的.您上面的打印语句是 Python 2,在这种情况下,需要注意的是使用 xrange 而不是 range.

Maybe something like this is what you're looking for? Keep in mind I'm writing for Python 3. Your print statement above is Python 2, in which case a side note would be to use xrange instead of range.

from argparse import ArgumentParser
from random import random
from subprocess import Popen
from sys import exit
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)
    return i + 1

def run_my_process():

    # Start up all of the processes, pass i as command line argument
    # since you have your function in the same file, we'll have to handle that
    # inside 'main' below
    processes = []
    for i in range(100):
        processes.append(Popen(['python', 'thisfile.py', str(i)]))

    # Wait for your desired process result
    # Might want to add a short sleep to the loop
    done = False
    while not done:
       for proc in processes:
            returncode = proc.poll()
            if returncode == 90:
                done = True
                break

    # Kill any process that are still running
    for proc in processes:

        if proc.returncode is None:

            # Might run into a race condition here,
            # so might want to wrap with try block
            proc.kill()

if __name__ == '__main__':

    # Look for optional i argument here
    parser = ArgumentParser()
    parser.add_argument('i', type=int, nargs='?')
    i = parser.parse_args().i

    # If there isn't an i, then run the whole thing
    if i is None:
        run_my_process()

    else:
        # Otherwise, run your expensive calculation and return the result
        returncode = add_something(i)
        print(returncode)
        exit(returncode)

这是一个更简洁的版本,它使用多处理模块而不是子进程:

Here's a somewhat cleaner version that uses the multiprocessing module instead of subprocess:

from random import random
from multiprocessing import Process
from sys import exit
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)

    exitcode = i + 1
    print(exitcode)
    exit(exitcode)

def run_my_process():

    # Start up all of the processes
    processes = []
    for i in range(100):
        proc = Process(target=add_something, args=[i])
        processes.append(proc)
        proc.start()

    # Wait for the desired process result
    done = False
    while not done:
        for proc in processes:
            if proc.exitcode == 90:
                done = True
                break

    # Kill any processes that are still running
    for proc in processes:
        if proc.is_alive():
            proc.terminate()

if __name__ == '__main__':
    run_my_process()

编辑 2:

这是最后一个版本,我认为它比其他两个要好得多:

Here's one last version, which I think is much better than the other two:

from random import random
from multiprocessing import Pool
from time import sleep

def add_something(i):

    # Sleep to simulate the long calculation
    sleep(random() * 30)
    return i + 1

def run_my_process():

    # Create a process pool
    pool = Pool(100)

    # Callback function that checks results and kills the pool
    def check_result(result):
        print(result)
        if result == 90:
            pool.terminate()

    # Start up all of the processes
    for i in range(100):
        pool.apply_async(add_something, args=[i], callback=check_result)

    pool.close()
    pool.join()

if __name__ == '__main__':
    run_my_process()

相关文章