python multiprocessing.Pool kill *特定*长时间运行或挂起的进程
问题描述
我需要执行一个包含许多并行数据库连接和查询的池.我想使用 multiprocessing.Pool 或 concurrent.futures ProcessPoolExecutor.Python 2.7.5
I need to execute a pool of many parallel database connections and queries. I would like to use a multiprocessing.Pool or concurrent.futures ProcessPoolExecutor. Python 2.7.5
在某些情况下,查询请求耗时过长或永远无法完成(挂起/僵尸进程).我想从已超时的 multiprocessing.Pool 或 concurrent.futures ProcessPoolExecutor 中杀死 特定 进程.
In some cases, query requests take too long or will never finish (hung/zombie process). I would like to kill the specific process from the multiprocessing.Pool or concurrent.futures ProcessPoolExecutor that has timed out.
这是一个如何杀死/重新生成整个进程池的示例,但理想情况下,我会尽量减少 CPU 抖动,因为我只想杀死超时秒后未返回数据的特定长时间运行的进程.
Here is an example of how to kill/re-spawn the entire process pool, but ideally I would minimize that CPU thrashing since I only want to kill a specific long running process that has not returned data after timeout seconds.
由于某种原因,在所有结果返回并完成后,下面的代码似乎无法终止/加入进程池.它可能与在超时发生时杀死工作进程有关,但是当它们被杀死并且结果符合预期时,池会创建新的工作进程.
For some reason the code below does not seem to be able to terminate/join the process Pool after all results are returned and completed. It may have to do with killing worker processes when a timeout occurs, however the Pool creates new workers when they are killed and results are as expected.
from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys
def f(x):
time.sleep(x)
return x
if __name__ == '__main__':
pool = Pool(processes=4, maxtasksperchild=4)
results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]
while results:
try:
x, result = results.pop(0)
start = time.time()
print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
except Exception as e:
print str(e)
print '%d Timeout Exception! in %f' % (x, time.time()-start)
for p in pool._pool:
if p.exitcode is None:
p.terminate()
pool.terminate()
pool.join()
解决方案
我没有完全理解你的问题.你说你想停止一个特定的进程,但是在你的异常处理阶段,你在所有作业上调用终止.不知道你为什么这样做.另外,我很确定使用 multiprocessing.Pool
中的内部变量不是很安全.说了这么多,我认为你的问题是为什么这个程序在超时发生时没有完成.如果这是问题所在,那么以下方法可以解决问题:
I am not fully understanding your question. You say you want to stop one specific process, but then, in your exception handling phase, you are calling terminate on all jobs. Not sure why you are doing that. Also, I am pretty sure using internal variables from multiprocessing.Pool
is not quite safe. Having said all of that, I think your question is why this program does not finish when a time out happens. If that is the problem, then the following does the trick:
from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys
def f(x):
time.sleep(x)
return x
if __name__ == '__main__':
pool = Pool(processes=4, maxtasksperchild=4)
results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]
result = None
start = time.time()
while results:
try:
x, result = results.pop(0)
print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
except Exception as e:
print str(e)
print '%d Timeout Exception! in %f' % (x, time.time()-start)
for i in reversed(range(len(pool._pool))):
p = pool._pool[i]
if p.exitcode is None:
p.terminate()
del pool._pool[i]
pool.terminate()
pool.join()
关键是您需要从池中移除项目;仅仅对它们调用终止是不够的.
The point is you need to remove items from the pool; just calling terminate on them is not enough.
相关文章