如何使用初始化程序来设置我的多进程池?

2022-01-12 00:00:00 python multiprocessing

问题描述

我正在尝试使用多进程池对象.我希望每个进程在启动时打开一个数据库连接,然后使用该连接来处理传入的数据.(而不是为每一位数据打开和关闭连接.)这似乎是初始化程序对于,但我无法理解工作人员和初始化程序是如何通信的.所以我有这样的事情:

I'm trying to use the multiprocess Pool object. I'd like each process to open a database connection when it starts, then use that connection to process the data that is passed in. (Rather than opening and closing the connection for each bit of data.) This seems like what the initializer is for, but I can't wrap my head around how the worker and the initializer communicate. So I have something like this:

def get_cursor():
  return psycopg2.connect(...).cursor()

def process_data(data):
   # here I'd like to have the cursor so that I can do things with the data

if __name__ == "__main__":
  pool = Pool(initializer=get_cursor, initargs=())
  pool.map(process_data, get_some_data_iterator())

我如何(或我如何)将光标从 get_cursor() 取回 process_data()?

how do I (or do I) get the cursor back from get_cursor() into the process_data()?


解决方案

初始化函数是这样调用的:

The initialize function is called thus:

def worker(...):
    ...
    if initializer is not None:
        initializer(*args)

所以在任何地方都没有保存返回值.你可能认为这注定了你的命运,但不是!每个工人都在一个单独的进程中.因此,您可以使用普通的 global 变量.

so there is no return value saved anywhere. You might think this dooms you, but no! Each worker is in a separate process. Thus, you can use an ordinary global variable.

这不是很漂亮,但它确实有效:

This is not exactly pretty, but it works:

cursor = None
def set_global_cursor(...):
    global cursor
    cursor = ...

现在您可以在 process_data 函数中使用 cursor.每个独立进程内的cursor变量与所有其他进程是分开的,所以它们不会互相踩踏.

Now you can just use cursor in your process_data function. The cursor variable inside each separate process is separate from all the other processes, so they do not step on each other.

(我不知道 psycopg2 是否有不同的方法来处理这个问题,首先不涉及使用 multiprocessing;这是一个一般性的答案multiprocessing 模块的一般问题.)

(I have no idea whether psycopg2 has a different way to deal with this that does not involve using multiprocessing in the first place; this is meant as a general answer to a general problem with the multiprocessing module.)

相关文章