具有多处理工作人员的扭曲网络客户端?
问题描述
所以,我有一个使用 Twisted + Stomper 作为 STOMP 客户端的应用程序,它将工作分配给 multiprocessing.Pool 的工作人员.
So, I've got an application that uses Twisted + Stomper as a STOMP client which farms out work to a multiprocessing.Pool of workers.
当我只使用 python 脚本启动它时,这似乎工作正常,它(简化)看起来像这样:
This appears to work ok when I just use a python script to fire this up, which (simplified) looks something like this:
# stompclient.py
logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)
# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start()
# initialize the process pool. (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)
StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
reactor.connectTCP(host, port, StompClientFactory())
reactor.run()
当它被打包用于部署时,我想我会利用 twistd 脚本并从 tac 文件运行它.
As this gets packaged for deployment, I thought I would take advantage of the twistd script and run this from a tac file.
这是我看起来非常相似的 tac 文件:
Here's my very-similar-looking tac file:
# stompclient.tac
logging.config.fileConfig(config_path)
logger = logging.getLogger(__name__)
# Add observer to make Twisted log via python
twisted.python.log.PythonLoggingObserver().start()
# initialize the process pool. (child processes get forked off immediately)
pool = multiprocessing.Pool(processes=processes)
StompClientFactory.username = username
StompClientFactory.password = password
StompClientFactory.destination = destination
application = service.Application('myapp')
service = internet.TCPClient(host, port, StompClientFactory())
service.setServiceParent(application)
为了便于说明,我已经折叠或更改了一些细节;希望它们不是问题的本质.例如,我的应用程序有一个插件系统,池由一个单独的方法初始化,然后使用 pool.apply_async() 将工作委托给池,传递我的插件的一个 process() 方法.
For the sake of illustration, I have collapsed or changed a few details; hopefully they were not the essence of the problem. For example, my app has a plugin system, the pool is initialized by a separate method, and then work is delegated to the pool using pool.apply_async() passing one of my plugin's process() methods.
所以,如果我运行脚本 (stompclient.py),一切都会按预期运行.
So, if I run the script (stompclient.py), everything works as expected.
如果我在非守护程序模式 (-n) 下运行 twist,它似乎也可以正常工作:
It also appears to work OK if I run twist in non-daemon mode (-n):
twistd -noy stompclient.tac
但是,当我在守护程序模式下运行时,它确实不工作:
however, it does not work when I run in daemon mode:
twistd -oy stompclient.tac
应用程序似乎可以正常启动,但是当它尝试分叉工作时,它只是挂起.通过挂起",我的意思是似乎从未要求子进程执行任何操作,而父进程(称为 pool.apply_async())只是坐在那里等待响应返回.
The application appears to start up OK, but when it attempts to fork off work, it just hangs. By "hangs", I mean that it appears that the child process is never asked to do anything and the parent (that called pool.apply_async()) just sits there waiting for the response to return.
我确信我在使用 Twisted + 多处理做一些愚蠢的事情,但我真的希望有人能向我解释我的方法中的缺陷.
I'm sure that I'm doing something stupid with Twisted + multiprocessing, but I'm really hoping that someone can explain to my the flaw in my approach.
提前致谢!
解决方案
由于您的工作调用和非工作调用之间的区别只是-n"选项,因此问题很可能是由守护进程(-n"阻止发生).
Since the difference between your working invocation and your non-working invocation is only the "-n" option, it seems most likely that the problem is caused by the daemonization process (which "-n" prevents from happening).
在 POSIX 上,守护进程涉及的步骤之一是分叉并让父级退出.其中,这会导致您的代码在与评估 .tac 文件的进程不同的进程中运行.这也重新安排了在 .tac 文件中启动的进程的子/父关系 - 就像您的多处理进程池一样.
On POSIX, one of the steps involved in daemonization is forking and having the parent exit. Among of things, this has the consequence of having your code run in a different process than the one in which the .tac file was evaluated. This also re-arranges the child/parent relationship of processes which were started in the .tac file - as your pool of multiprocessing processes were.
多处理池的进程从您启动的 twistd 进程的父进程开始.但是,当该进程作为守护进程的一部分退出时,它们的父进程将成为系统初始化进程.这可能会导致一些问题,尽管可能不是您描述的悬挂问题.可能还有其他类似的低级实现细节,它们通常允许多处理模块工作,但会被守护进程中断.
The multiprocessing pool's processes start off with a parent of the twistd process you start. However, when that process exits as part of daemonization, their parent becomes the system init process. This may cause some problems, although probably not the hanging problem you described. There are probably other similarly low-level implementation details which normally allow the multiprocessing module to work but which are disrupted by the daemonization process.
幸运的是,避免这种奇怪的交互应该很简单.Twisted 的服务 API 允许您在守护进程完成后运行代码.如果您使用这些 API,那么您可以将多处理模块的进程池的初始化延迟到守护进程之后,并有望避免该问题.下面是一个可能看起来像的示例:
Fortunately, avoiding this strange interaction should be straightforward. Twisted's service APIs allow you to run code after daemonization has completed. If you use these APIs, then you can delay the initialization of the multiprocessing module's process pool until after daemonization and hopefully avoid the problem. Here's an example of what that might look like:
from twisted.application.service import Service
class MultiprocessingService(Service):
def startService(self):
self.pool = multiprocessing.Pool(processes=processes)
MultiprocessingService().setServiceParent(application)
现在,另外,您可能还会遇到与清理多处理模块的子进程相关的问题,或者可能与使用 Twisted 的进程创建 API、reactor.spawnProcess 创建的进程有关的问题.这是因为正确处理子进程的一部分通常涉及处理 SIGCHLD 信号.但是,Twisted 和 multiprocessing 在这方面不会合作,因此其中一个将收到所有孩子退出的通知,而另一个永远不会收到通知.如果您根本不使用 Twisted 的 API 来创建子进程,那么这对您来说可能没问题 - 但您可能需要检查以确保多处理模块尝试安装的任何信号处理程序实际上获胜"并且没有得到替换为 Twisted 自己的处理程序.
Now, separately, you may also run into problems relating to clean up of the multiprocessing module's child processes, or possibly issues with processes created with Twisted's process creation API, reactor.spawnProcess. This is because part of dealing with child processes correctly generally involves handling the SIGCHLD signal. Twisted and multiprocessing aren't going to be cooperating in this regard, though, so one of them is going to get notified of all children exiting and the other will never be notified. If you don't use Twisted's API for creating child processes at all, then this may be okay for you - but you might want to check to make sure any signal handler the multiprocessing module tries to install actually "wins" and doesn't get replaced by Twisted's own handler.
相关文章