如何限制多处理进程的范围?

2022-01-12 00:00:00 python multiprocessing

问题描述

使用python的multiprocessing模块,如下人为的示例以最少的内存要求运行:

Using python's multiprocessing module, the following contrived example runs with minimal memory requirements:

import multiprocessing 
# completely_unrelated_array = range(2**25)

def foo(x):
    for x in xrange(2**28):pass
    print x**2

P = multiprocessing.Pool()

for x in range(8):
    multiprocessing.Process(target=foo, args=(x,)).start()

取消注释 completely_unrelated_array 的创建,您会发现每个生成的进程都为 completely_unrelated_array 的副本分配内存!这是一个更大的项目的最小示例,我无法弄清楚如何解决;多处理似乎复制了全局的所有内容.我不需要共享内存对象,我只需要传入x,处理它没有整个程序的内存开销.

Uncomment the creation of the completely_unrelated_array and you'll find that each spawned process allocates the memory for a copy of the completely_unrelated_array! This is a minimal example of a much larger project that I can't figure out how to workaround; multiprocessing seems to make a copy of everything that is global. I don't need a shared memory object, I simply need to pass in x, and process it without the memory overhead of the entire program.

侧面观察:有趣的是 foo 中的 print id(completely_unrelated_array) 给出了相同的值,这表明这可能不是副本...

Side observation: What's interesting is that print id(completely_unrelated_array) inside foo gives the same value, suggesting that somehow that might not be copies...


解决方案

由于 os.fork() 的性质,你的 __main__ 模块将由子进程继承(假设您在 Posix 平台上),因此您将看到子进程的内存使用情况在创建后立即反映出来.我不确定是否真的分配了所有内存,据我所知,内存是共享的,直到您实际尝试在子进程中更改它,此时会创建一个新副本.另一方面,Windows 不使用 os.fork() - 它在每个子模块中重新导入主模块,并腌制您想要发送给子模块的任何局部变量.因此,使用 Windows,您实际上可以通过仅在 if __name__ == "__main__": 保护中定义它来避免在子中复制大型全局,因为该保护中的所有内容都只会在父进程:

Because of the nature of os.fork(), any variables in the global namespace of your __main__ module will be inherited by the child processes (assuming you're on a Posix platform), so you'll see the memory usage in the children reflect that as soon as they're created. I'm not sure if all that memory is really being allocated though, as far as I know that memory is shared until you actually try to change it in the child, at which point a new copy is made. Windows, on the other hand, doesn't use os.fork() - it re-imports the main module in each child, and pickles any local variables you want sent to the children. So, using Windows you can actually avoid the large global ending up copied in the child by only defining it inside an if __name__ == "__main__": guard, because everything inside that guard will only run in the parent process:

import time
import multiprocessing 


def foo(x):
    for x in range(2**28):pass
    print(x**2)

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**25)) # This will only be defined in the parent on Windows
    P = multiprocessing.Pool()

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()

现在,在 Python 2.x 中,如果您使用的是 Posix 平台,则只能通过分叉创建新的 multiprocessing.Process 对象.但是在 Python 3.4 上,您可以通过使用上下文来指定新进程的创建方式.因此,我们可以指定 spawn" 上下文,这是 Windows 用来创建新进程的上下文,并使用相同的技巧:

Now, in Python 2.x, you can only create new multiprocessing.Process objects by forking if you're using a Posix platform. But on Python 3.4, you can specify how the new processes are created, by using contexts. So, we can specify the "spawn" context, which is the one Windows uses, to create our new processes, and use the same trick:

# Note that this is Python 3.4+ only
import time
import multiprocessing 

def foo(x):
    for x in range(2**28):pass
    print(x**2)


if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))  # Again, this only exists in the parent
    ctx = multiprocessing.get_context("spawn") # Use process spawning instead of fork
    P = ctx.Pool()

    for x in range(8):
        ctx.Process(target=foo, args=(x,)).start()

如果你需要 2.x 支持,或者想坚持使用 os.fork() 来创建新的 Process 对象,我认为你能做的最好降低报告的内存使用情况是立即删除孩子中的违规对象:

If you need 2.x support, or want to stick with using os.fork() to create new Process objects, I think the best you can do to get the reported memory usage down is immediately delete the offending object in the child:

import time
import multiprocessing 
import gc

def foo(x):
    init()
    for x in range(2**28):pass
    print(x**2)

def init():
    global completely_unrelated_array
    completely_unrelated_array = None
    del completely_unrelated_array
    gc.collect()

if __name__ == "__main__":
    completely_unrelated_array = list(range(2**23))
    P = multiprocessing.Pool(initializer=init)

    for x in range(8):
        multiprocessing.Process(target=foo, args=(x,)).start()
    time.sleep(100)

相关文章