如何使用 python Tornado 服务器在请求中最好地执行多处理?
问题描述
我正在使用 I/O 非阻塞 python 服务器 Tornado.我有一类 GET
请求可能需要很长时间才能完成(想想在 5-10 秒的范围内).问题是 Tornado 会阻止这些请求,因此后续的快速请求会一直保持到慢速请求完成.
I am using the I/O non-blocking python server Tornado. I have a class of GET
requests which may take a significant amount of time to complete (think in the range of 5-10 seconds). The problem is that Tornado blocks on these requests so that subsequent fast requests are held up until the slow request completes.
我看了:https://github.com/facebook/tornado/wiki/Threading-and-concurrency 并得出结论,我想要#3(其他进程)和#4(其他线程)的某种组合.#4 本身就有问题,当有另一个线程在执行heavy_lifting"时,我无法将可靠的控制权返回给 ioloop.(我认为这是由于 GIL 以及 heavy_lifting 任务具有高 CPU 负载并不断将控制权从主 ioloop 中拉出的事实,但这是一个猜测).
I looked at: https://github.com/facebook/tornado/wiki/Threading-and-concurrency and came to the conclusion that I wanted some combination of #3 (other processes) and #4 (other threads). #4 on its own had issues and I was unable to get reliable control back to the ioloop when there was another thread doing the "heavy_lifting". (I assume that this was due to the GIL and the fact that the heavy_lifting task has high CPU load and keeps pulling control away from the main ioloop, but thats a guess).
所以我一直在设计如何通过在单独的进程中在这些缓慢的 GET
请求中执行繁重"任务来解决这个问题,然后在进程完成后将回调放回 Tornado ioloop完成请求.这释放了 ioloop 来处理其他请求.
So I have been prototyping how to solve this by doing "heavy lifting" tasks within these slow GET
requests in a separate process and then place a callback back into the Tornado ioloop when the process is done to finish the request. This frees up the ioloop to handle other requests.
我创建了一个简单的示例来展示一个可能的解决方案,但我很想从社区中获得反馈.
I have created a simple example demonstrating a possible solution, but am curious to get feedback from the community on it.
我的问题有两个:如何简化当前的方法?它可能存在哪些陷阱?
利用 Tornado 的内置
asynchronous
装饰器,它允许请求保持打开状态并让 ioloop 继续.
Utilize Tornado's builtin
asynchronous
decorator which allows a request to stay open and for the ioloop to continue.
使用 python 的 multiprocessing
模块为繁重"任务生成一个单独的进程.我首先尝试使用 threading
模块,但无法将任何可靠的控制权交还给 ioloop.mutliprocessing
似乎也可以利用多核.
Spawn a separate process for "heavy lifting" tasks using python's multiprocessing
module. I first attempted to use the threading
module but was unable to get any reliable relinquishing of control back to the ioloop. It also appears that mutliprocessing
would also take advantage of multicores.
使用 threading
模块在主 ioloop 进程中启动一个观察者"线程,其工作是观察 multiprocessing.Queue
的结果完成后的繁重"任务.这是必要的,因为我需要一种方法来知道 heavy_lifting 任务已经完成,同时仍然能够通知 ioloop 该请求现已完成.
Start a 'watcher' thread in the main ioloop process using the threading
module who's job it is to watch a multiprocessing.Queue
for the results of the "heavy lifting" task when it completes. This was needed because I needed a way to know that the heavy_lifting task had completed while being able to still notify the ioloop that this request was now finished.
确保观察者"线程经常通过 time.sleep(0)
调用将控制权交给主 ioloop 循环,以便继续轻松处理其他请求.
Be sure that the 'watcher' thread relinquishes control to the main ioloop loop often with time.sleep(0)
calls so that other requests continue to get readily processed.
当队列中有结果时,然后使用 tornado.ioloop.IOLoop.instance().add_callback()
从观察者"线程添加一个回调,记录为从其他线程调用 ioloop 实例的唯一安全方法.
When there is a result in the queue then add a callback from the "watcher" thread using tornado.ioloop.IOLoop.instance().add_callback()
which is documented to be the only safe way to call ioloop instances from other threads.
请务必在回调中调用 finish()
以完成请求并提交回复.
Be sure to then call finish()
in the callback to complete the request and hand over a reply.
下面是一些展示这种方法的示例代码.multi_tornado.py
是实现上述大纲的服务器,call_multi.py
是一个示例脚本,它以两种不同的方式调用服务器来测试服务器.两个测试通过 3 个慢速 GET
请求和 20 个快速 GET
请求调用服务器.结果显示在打开和未打开线程的情况下运行.
Below is some sample code showing this approach. multi_tornado.py
is the server implementing the above outline and call_multi.py
is a sample script that calls the server in two different ways to test the server. Both tests call the server with 3 slow GET
requests followed by 20 fast GET
requests. The results are shown for both running with and without the threading turned on.
在无线程"运行它的情况下,3 个缓慢的请求块(每个需要一秒钟多一点的时间才能完成).20 个快速请求中有几个挤在 ioloop 中的一些慢速请求之间(不完全确定这是如何发生的 - 但可能是我在同一台机器上同时运行服务器和客户端测试脚本的工件).这里的重点是所有快速请求都在不同程度上受到了阻碍.
In the case of running it with "no threading" the 3 slow requests block (each taking a little over a second to complete). A few of the 20 fast requests squeeze through in between some of the slow requests within the ioloop (not totally sure how that occurs - but could be an artifact that I am running both the server and client test script on the same machine). The point here being that all of the fast requests are held up to varying degrees.
在启用线程的情况下,20 个快速请求首先立即完成,三个慢速请求在之后大约同时完成,因为它们每个都并行运行.这是期望的行为.三个慢速请求并行完成需要 2.5 秒 - 而在非线程情况下,三个慢速请求总共需要大约 3.5 秒.所以总体上大约有 35% 的加速(我假设是由于多核共享).但更重要的是 - 快速请求会立即以慢速请求的 leu 处理.
In the case of running it with threading enabled the 20 fast requests all complete first immediately and the three slow requests complete at about the same time afterwards as they have each been running in parallel. This is the desired behavior. The three slow requests take 2.5 seconds to complete in parallel - whereas in the non threaded case the three slow requests take about 3.5 seconds in total. So there is about 35% speed up overall (I assume due to multicore sharing). But more importantly - the fast requests were immediately handled in leu of the slow ones.
我在多线程编程方面没有太多经验 - 所以虽然这在这里看起来可行,但我很想学习:
I do not have a lot experience with multithreaded programming - so while this seemingly works here I am curious to learn:
有没有更简单的方法来做到这一点?这种方法中可能潜伏着什么怪物?
(注意:未来的权衡可能是只运行更多 Tornado 实例,使用反向代理(如 nginx)进行负载平衡.无论如何,我将使用负载平衡器运行多个实例 - 但我担心只是抛出硬件在这个问题上,因为似乎硬件在阻塞方面与问题直接相关.)
(Note: A future tradeoff may be to just run more instances of Tornado with a reverse proxy like nginx doing load balancing. No matter what I will be running multiple instances with a load balancer - but I am concerned about just throwing hardware at this problem since it seems that the hardware is so directly coupled to the problem in terms of the blocking.)
multi_tornado.py
(示例服务器):
multi_tornado.py
(sample server):
import time
import threading
import multiprocessing
import math
from tornado.web import RequestHandler, Application, asynchronous
from tornado.ioloop import IOLoop
# run in some other process - put result in q
def heavy_lifting(q):
t0 = time.time()
for k in range(2000):
math.factorial(k)
t = time.time()
q.put(t - t0) # report time to compute in queue
class FastHandler(RequestHandler):
def get(self):
res = 'fast result ' + self.get_argument('id')
print res
self.write(res)
self.flush()
class MultiThreadedHandler(RequestHandler):
# Note: This handler can be called with threaded = True or False
def initialize(self, threaded=True):
self._threaded = threaded
self._q = multiprocessing.Queue()
def start_process(self, worker, callback):
# method to start process and watcher thread
self._callback = callback
if self._threaded:
# launch process
multiprocessing.Process(target=worker, args=(self._q,)).start()
# start watching for process to finish
threading.Thread(target=self._watcher).start()
else:
# threaded = False just call directly and block
worker(self._q)
self._watcher()
def _watcher(self):
# watches the queue for process result
while self._q.empty():
time.sleep(0) # relinquish control if not ready
# put callback back into the ioloop so we can finish request
response = self._q.get(False)
IOLoop.instance().add_callback(lambda: self._callback(response))
class SlowHandler(MultiThreadedHandler):
@asynchronous
def get(self):
# start a thread to watch for
self.start_process(heavy_lifting, self._on_response)
def _on_response(self, delta):
_id = self.get_argument('id')
res = 'slow result {} <--- {:0.3f} s'.format(_id, delta)
print res
self.write(res)
self.flush()
self.finish() # be sure to finish request
application = Application([
(r"/fast", FastHandler),
(r"/slow", SlowHandler, dict(threaded=False)),
(r"/slow_threaded", SlowHandler, dict(threaded=True)),
])
if __name__ == "__main__":
application.listen(8888)
IOLoop.instance().start()
call_multi.py
(客户端测试人员):
call_multi.py
(client tester):
import sys
from tornado.ioloop import IOLoop
from tornado import httpclient
def run(slow):
def show_response(res):
print res.body
# make 3 "slow" requests on server
requests = []
for k in xrange(3):
uri = 'http://localhost:8888/{}?id={}'
requests.append(uri.format(slow, str(k + 1)))
# followed by 20 "fast" requests
for k in xrange(20):
uri = 'http://localhost:8888/fast?id={}'
requests.append(uri.format(k + 1))
# show results as they return
http_client = httpclient.AsyncHTTPClient()
print 'Scheduling Get Requests:'
print '------------------------'
for req in requests:
print req
http_client.fetch(req, show_response)
# execute requests on server
print '
Start sending requests....'
IOLoop.instance().start()
if __name__ == '__main__':
scenario = sys.argv[1]
if scenario == 'slow' or scenario == 'slow_threaded':
run(scenario)
测试结果
通过运行 python call_multi.py slow
(阻塞行为):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow?id=1
http://localhost:8888/slow?id=2
http://localhost:8888/slow?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
slow result 1 <--- 1.338 s
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
slow result 2 <--- 1.169 s
slow result 3 <--- 1.130 s
fast result 8
fast result 9
fast result 10
fast result 11
fast result 13
fast result 12
fast result 14
fast result 15
fast result 16
fast result 18
fast result 17
fast result 19
fast result 20
通过运行 python call_multi.py slow_threaded
(期望的行为):
By running python call_multi.py slow_threaded
(the desired behavior):
Scheduling Get Requests:
------------------------
http://localhost:8888/slow_threaded?id=1
http://localhost:8888/slow_threaded?id=2
http://localhost:8888/slow_threaded?id=3
http://localhost:8888/fast?id=1
http://localhost:8888/fast?id=2
http://localhost:8888/fast?id=3
http://localhost:8888/fast?id=4
http://localhost:8888/fast?id=5
http://localhost:8888/fast?id=6
http://localhost:8888/fast?id=7
http://localhost:8888/fast?id=8
http://localhost:8888/fast?id=9
http://localhost:8888/fast?id=10
http://localhost:8888/fast?id=11
http://localhost:8888/fast?id=12
http://localhost:8888/fast?id=13
http://localhost:8888/fast?id=14
http://localhost:8888/fast?id=15
http://localhost:8888/fast?id=16
http://localhost:8888/fast?id=17
http://localhost:8888/fast?id=18
http://localhost:8888/fast?id=19
http://localhost:8888/fast?id=20
Start sending requests....
fast result 1
fast result 2
fast result 3
fast result 4
fast result 5
fast result 6
fast result 7
fast result 8
fast result 9
fast result 10
fast result 11
fast result 12
fast result 13
fast result 14
fast result 15
fast result 19
fast result 20
fast result 17
fast result 16
fast result 18
slow result 2 <--- 2.485 s
slow result 3 <--- 2.491 s
slow result 1 <--- 2.517 s
解决方案
如果你愿意使用 concurrent.futures.ProcessPoolExecutor
而不是 multiprocessing
,这个其实很简单.Tornado 的 ioloop 已经支持 concurrent.futures.Future
,因此它们开箱即用可以很好地配合使用.concurrent.futures
包含在 Python 3.2+ 中,并且 已向后移植到 Python 2.x.
If you're willing to use concurrent.futures.ProcessPoolExecutor
instead of multiprocessing
, this is actually very simple. Tornado's ioloop already supports concurrent.futures.Future
, so they'll play nicely together out of the box. concurrent.futures
is included in Python 3.2+, and has been backported to Python 2.x.
这是一个例子:
import time
from concurrent.futures import ProcessPoolExecutor
from tornado.ioloop import IOLoop
from tornado import gen
def f(a, b, c, blah=None):
print "got %s %s %s and %s" % (a, b, c, blah)
time.sleep(5)
return "hey there"
@gen.coroutine
def test_it():
pool = ProcessPoolExecutor(max_workers=1)
fut = pool.submit(f, 1, 2, 3, blah="ok") # This returns a concurrent.futures.Future
print("running it asynchronously")
ret = yield fut
print("it returned %s" % ret)
pool.shutdown()
IOLoop.instance().run_sync(test_it)
输出:
running it asynchronously
got 1 2 3 and ok
it returned hey there
ProcessPoolExecutor
的 API 比 multiprocessing.Pool
更有限,但如果您不需要 multiprocessing.Pool
的更高级功能,值得使用,因为集成要简单得多.
ProcessPoolExecutor
has a more limited API than multiprocessing.Pool
, but if you don't need the more advanced features of multiprocessing.Pool
, it's worth using because the integration is so much simpler.
相关文章