如何使用python的多处理终止进程
问题描述
我有一些代码需要在其他几个可能挂起或出现不受我控制的问题的系统上运行.我想使用 python 的多处理来生成子进程以独立于主程序运行,然后当它们挂起或出现问题时终止它们,但我不确定解决这个问题的最佳方法.
当调用 terminate 时,它确实杀死了子进程,但随后它变成了一个已失效的僵尸,直到进程对象消失才会释放.下面的示例代码中循环永远不会结束的地方可以杀死它并在再次调用时允许重生,但似乎不是解决这个问题的好方法(即在 __init__() 中使用 multiprocessing.Process() 会更好).
有人有什么建议吗?
类进程(对象):def __init__(self):self.thing = 事物()self.running_flag = multiprocessing.Value("i", 1)定义运行(自我):self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))self.process.start()打印 self.process.piddef pause_resume(self):self.running_flag.value = 不是 self.running_flag.valuedef 终止(自我):self.process.terminate()类事物(对象):def __init__(self):自我计数 = 1def 工人(自我,running_flag):而真:如果 running_flag.value:self.do_work()def do_work(self):打印工作 {0} ...".format(self.count)self.count += 1时间.sleep(1)
解决方案 您可以在后台将子进程作为守护进程运行.
process.daemon = True
守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,只有在主进程退出时才会终止.
这将适用于简单的问题,直到您遇到许多子守护进程,这些子守护进程将不断从父进程获取内存而没有任何显式控制.
最好的方法是设置一个 Queue
让所有子进程与父进程通信,这样我们就可以 join
并很好地清理它们.下面是一些简单的代码,它将检查子进程是否挂起(又名 time.sleep(1000)
),并向队列发送消息以供主进程对其采取措施:
import multiprocessing as mp进口时间导入队列running_flag = mp.Value(i", 1)def worker(running_flag, q):计数 = 1而真:如果 running_flag.value:打印工作 {0} ...".format(count)计数 += 1q.put(计数)时间.sleep(1)如果计数 >3:# 模拟挂着睡觉打印挂..."时间.sleep(1000)def 看门狗(q):""这会检查队列是否有更新并向其发送信号当子进程太长时间没有发送任何东西时""而真:尝试:味精 = q.get(超时=10.0)除了 queue.Empty 作为 e:打印[WATCHDOG]:也许 WORKER 偷懒了"q.put("杀死工人")定义主():""""主进程""""q = mp.Queue()worker = mp.Process(target=worker, args=(running_flag, q))wdog = mp.Process(target=watchdog, args=(q,))# 将看门狗作为守护进程运行,因此它以主进程终止wdog.daemon = 真worker.start()print "[MAIN]: 启动进程 P1"wdog.start()# 轮询队列而真:味精 = q.get()如果 msg == 杀死工人":print "[MAIN]: Terminating slacking WORKER"worker.terminate()时间.睡眠(0.1)如果不是 worker.is_alive():print "[MAIN]: WORKER is a goner"worker.join(超时=1.0)print "[MAIN]: 加入 WORKER 成功!"q.close()break # 看门狗进程守护进程被终止如果 __name__ == '__main__':主要的()
如果不终止 worker
,尝试将其 join()
到主进程将永远阻塞,因为 worker
从未完成.p>
I have some code that needs to run against several other systems that may hang or have problems not under my control. I would like to use python's multiprocessing to spawn child processes to run independent of the main program and then when they hang or have problems terminate them, but I am not sure of the best way to go about this.
When terminate is called it does kill the child process, but then it becomes a defunct zombie that is not released until the process object is gone. The example code below where the loop never ends works to kill it and allow a respawn when called again, but does not seem like a good way of going about this (ie multiprocessing.Process() would be better in the __init__()).
Anyone have a suggestion?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
解决方案
You might run the child processes as daemons in the background.
process.daemon = True
Any errors and hangs (or an infinite loop) in a daemon process will not affect the main process, and it will only be terminated once the main process exits.
This will work for simple problems until you run into a lot of child daemon processes which will keep reaping memories from the parent process without any explicit control.
Best way is to set up a Queue
to have all the child processes communicate to the parent process so that we can join
them and clean up nicely. Here is some simple code that will check if a child processing is hanging (aka time.sleep(1000)
), and send a message to the queue for the main process to take action on it:
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print "working {0} ...".format(count)
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print "hanging..."
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print "[WATCHDOG]: Maybe WORKER is slacking"
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print "[MAIN]: starting process P1"
wdog.start()
# Poll the queue
while True:
msg = q.get()
if msg == "KILL WORKER":
print "[MAIN]: Terminating slacking WORKER"
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print "[MAIN]: WORKER is a goner"
workr.join(timeout=1.0)
print "[MAIN]: Joined WORKER successfully!"
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
Without terminating worker
, attempt to join()
it to the main process would have blocked forever since worker
has never finished.
相关文章