我可以以某种方式与子进程共享一个异步队列吗?
问题描述
我想使用队列将数据从父进程传递到通过 multiprocessing.Process
启动的子进程.但是,由于父进程使用 Python 的新 asyncio
库,队列方法需要是非阻塞的.据我了解, asyncio.Queue
用于任务间通信,不能用于进程间通信.另外,我知道 multiprocessing.Queue
有 put_nowait()
和 get_nowait()
方法,但我实际上需要仍然会阻塞当前的协程任务(但不是整个过程).有没有办法创建包装 put_nowait()
/get_nowait()
的协程?另一方面,multiprocessing.Queue
使用的线程在内部是否与在同一进程中运行的事件循环完全兼容?
I would like to use a queue for passing data from a parent to a child process which is launched via multiprocessing.Process
. However, since the parent process uses Python's new asyncio
library, the queue methods need to be non-blocking. As far as I understand, asyncio.Queue
is made for inter-task communication and cannot be used for inter-process communication. Also, I know that multiprocessing.Queue
has the put_nowait()
and get_nowait()
methods but I actually need coroutines that would still block the current task (but not the whole process). Is there some way to create coroutines that wrap put_nowait()
/get_nowait()
? On another note, are the threads that multiprocessing.Queue
uses internally compatible after all with an event loop running in the same process?
如果没有,我还有什么其他选择?我知道我可以通过使用异步套接字自己实现这样一个队列,但我希望我可以避免这种情况……
If not, what other options do I have? I know I could implement such a queue myself by making use of asynchronous sockets but I hoped I could avoid that…
我也考虑过使用管道而不是套接字,但似乎 asyncio
与 multiprocessing.Pipe()
.更准确地说,Pipe()
返回一个元组 Connection
对象,它们是非 类文件对象.但是,asyncio.BaseEventLoop
的方法 add_reader()
/add_writer()
方法和 connect_read_pipe()
/connect_write_pipe()
都期望类似文件的对象,所以不可能异步读取/写入这样的Connection
.相比之下,subprocess
包用作管道的通常类似文件的对象完全没有问题,并且 可以轻松地与 asyncio
结合使用.
I also considered using pipes instead of sockets but it seems asyncio
is not compatible with multiprocessing.Pipe()
. More precisely, Pipe()
returns a tuple of Connection
objects which are not file-like objects. However, asyncio.BaseEventLoop
's methods add_reader()
/add_writer()
methods and connect_read_pipe()
/connect_write_pipe()
all expect file-like objects, so it is impossible to asynchronously read from/write to such a Connection
. In contrast, the usual file-like objects that the subprocess
package uses as pipes pose no problem at all and can easily be used in combination with asyncio
.
更新:我决定进一步探索管道方法:我通过通过 fileno()
并传递它到 os.fdopen()
.最后,我将生成的类文件对象传递给事件循环的 connect_read_pipe()
/connect_write_pipe()
.(如果有人在相关问题上有一些邮件列表讨论对确切的代码感兴趣.)但是,read()
ing 流给了我一个 OSError: [Errno 9] Bad file descriptor
我没有设法修复这.还考虑到 缺少对 Windows 的支持,我不会追求这个任何进一步.
UPDATE:
I decided to explore the pipe approach a bit further: I converted the Connection
objects returned by multiprocessing.Pipe()
into file-like objects by retrieving the file descriptor via fileno()
and passing it to os.fdopen()
. Finally, I passed the resulting file-like object to the event loop's connect_read_pipe()
/connect_write_pipe()
. (There is some mailing list discussion on a related issue if someone is interested in the exact code.) However, read()
ing the stream gave me an OSError: [Errno 9] Bad file descriptor
and I didn't manage to fix this. Also considering the missing support for Windows, I will not pursue this any further.
解决方案
这是一个 multiprocessing.Queue
对象的实现,它可以与 asyncio
一起使用.它提供了整个 multiprocessing.Queue
接口,增加了 coro_get
和 coro_put
方法,它们是 asyncio.coroutine
可用于从队列中异步获取/放入队列.实现细节与我的另一个答案的第二个示例基本相同: ThreadPoolExecutor
用于使 get/put 异步,以及 multiprocessing.managers.SyncManager.Queue
用于在进程之间共享队列.唯一的附加技巧是实现 __getstate__
以保持对象可拾取,尽管使用不可拾取的 ThreadPoolExecutor
作为实例变量.
Here is an implementation of a multiprocessing.Queue
object that can be used with asyncio
. It provides the entire multiprocessing.Queue
interface, with the addition of coro_get
and coro_put
methods, which are asyncio.coroutine
s that can be used to asynchronously get/put from/into the queue. The implementation details are essentially the same as the second example of my other answer: ThreadPoolExecutor
is used to make the get/put asynchronous, and a multiprocessing.managers.SyncManager.Queue
is used to share the queue between processes. The only additional trick is implementing __getstate__
to keep the object picklable despite using a non-picklable ThreadPoolExecutor
as an instance variable.
from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def AsyncProcessQueue(maxsize=0):
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)
class _ProcQueue(object):
def __init__(self, q):
self._queue = q
self._real_executor = None
self._cancelled_join = False
@property
def _executor(self):
if not self._real_executor:
self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
return self._real_executor
def __getstate__(self):
self_dict = self.__dict__
self_dict['_real_executor'] = None
return self_dict
def __getattr__(self, name):
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
'get', 'get_nowait', 'close']:
return getattr(self._queue, name)
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
@asyncio.coroutine
def coro_put(self, item):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put, item))
@asyncio.coroutine
def coro_get(self):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))
def cancel_join_thread(self):
self._cancelled_join = True
self._queue.cancel_join_thread()
def join_thread(self):
self._queue.join_thread()
if self._real_executor and not self._cancelled_join:
self._real_executor.shutdown()
@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
print("Passing %s to parent" % ok)
yield from q.coro_put(ok) # Non-blocking
item = q.get() # Can be used with the normal blocking API, too
print("got %s back from parent" % item)
def do_coro_proc_work(q, stuff, stuff2):
loop = asyncio.get_event_loop()
loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_coro_proc_work, q, 1, 2)
item = yield from q.coro_get()
print("Got %s from worker" % item)
item = item + 25
q.put(item)
if __name__ == "__main__":
q = AsyncProcessQueue()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
输出:
Passing 3 to parent
Got 3 from worker
got 28 back from parent
如您所见,您可以从父进程或子进程同步和异步地使用 AsyncProcessQueue
.它不需要任何全局状态,并且通过将大部分复杂性封装在一个类中,使用起来比我原来的答案更优雅.
As you can see, you can use the AsyncProcessQueue
both synchronously and asynchronously, from either the parent or child process. It doesn't require any global state, and by encapsulating most of the complexity in a class, is more elegant to use than my original answer.
您可能能够直接使用套接字获得更好的性能,但是以跨平台的方式使其工作似乎相当棘手.这还具有可跨多个工人使用的优点,不需要您自己腌制/解封等.
You'll probably be able to get better performance using sockets directly, but getting that working in a cross-platform way seems to be pretty tricky. This also has the advantage of being usable across multiple workers, won't require you to pickle/unpickle yourself, etc.
相关文章