多进程池:确定进程名称(无法终止其进程)
问题描述
我有一些代码,我尝试在Pool
中创建4个进程。
一旦我得到任何异常(例如,它尝试连接到的数据库已关闭),我想要终止池,休眠10秒,然后创建一个包含4个进程的新池。
但是,池似乎永远不会被终止,因为进程名称每次都会递增。 池是否有用于保存名称计数的缓存?def connect_db()
pass
while True:
p = Pool(4)
for process in multiprocessing.active_children():
print(process.name) #why is the name incremented by 1 each time while loop iterates?
try:
r = p.map(connect_db, ())
except Exception as e:
pool.close()
pool.join()
time.sleep(10)
前四个进程是SpawnPoolWorker-1到4,接下来4个是SpawnPoolWorker-5到8。它怎么知道我之前已经创建了4个进程?我每次都在创建Pool
的新实例,还是我做错了什么?
解决方案
您没有看到预期的内容的主要原因是以下代码行:
r = p.map(connect_db, ())
您正在调用multiprocess.map
时使用的是一个空的迭代器,因此connect_db
根本没有被调用,并且您没有到达代码的except
部分,没有关闭池等。
这里是一个可以工作的框架,其中包含一组print
用于调试的语句。我附上了下面的输出,如您所见,每轮恰好有四个子进程。
import multiprocessing
import time
import random
def connect_db(i):
print(f"Trying to connect {i}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
while True:
p = multiprocessing.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
p.close()
p.join()
time.sleep(5)
输出:
active children are:
Child number 0 is ForkPoolWorker-2
Child number 1 is ForkPoolWorker-1
Child number 2 is ForkPoolWorker-4
Child number 3 is ForkPoolWorker-3
About to create a pool
Trying to connect 0
Trying to connect 1
Trying to connect 2
Trying to connect 3
Failed to connect
terminating threads
active children are:
Child number 0 is ForkPoolWorker-5
Child number 1 is ForkPoolWorker-6
Child number 2 is ForkPoolWorker-8
Child number 3 is ForkPoolWorker-7
About to create a pool
Trying to connect 0
Trying to connect 1
...
最后要注意的是,如果用例确实是数据库连接,则有现成的连接池,您可能应该使用其中一个。此外,我不确定是否可以跨进程共享数据库连接。
控制池中的进程名称
如果出于某种原因,您希望控制池中的进程名称,可以通过创建您自己的池上下文来实现:
import multiprocessing
from multiprocessing import context
import time
import random
process_counter = 0
class MyForkProcess(multiprocessing.context.ForkProcess):
def __init__(self, *args, **kwargs):
global process_counter
name = f"MyForkProcess-{process_counter}"
process_counter += 1
super(MyForkProcess, self).__init__(*args, name = name, **kwargs)
class MyContext(multiprocessing.context.ForkContext):
_name = 'MyForkContext'
Process = MyForkProcess
def connect_db(i):
print(f"Trying to connect {i}")
cp = multiprocessing.current_process()
print(f"The name of the child process is {cp.name}")
time.sleep(random.random() * 2)
raise Exception("Failed to connect")
context = MyContext()
while True:
p = context.Pool(4)
print("active children are:")
for idx, process in enumerate(multiprocessing.active_children()):
print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates?
try:
print("About to create a pool")
r = p.map(connect_db, range(4))
print("Created a pool")
except Exception as e:
print(e)
print("terminating threads")
p.terminate()
process_counter = 0
p.close()
p.join()
time.sleep(5)
现在的输出是:
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-3
Child number 3 is MyForkPoolWorker-1
About to create a pool
Trying to connect 0
The name of the child process is MyForkPoolWorker-0
Trying to connect 1
The name of the child process is MyForkPoolWorker-1
Trying to connect 2
The name of the child process is MyForkPoolWorker-2
Trying to connect 3
The name of the child process is MyForkPoolWorker-3
Failed to connect
terminating threads
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-1
Child number 3 is MyForkPoolWorker-3
About to create a pool
...
相关文章