在 asyncio.Protocol.data_received 中调用协程
问题描述
我在新 Python asyncio 模块的 asyncio.Protocol.data_received
回调中执行异步操作时遇到问题.
I am having a problem doing asynchronous stuff in the asyncio.Protocol.data_received
callback of the new Python asyncio module.
考虑以下服务器:
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(1)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def connection_made(self, transport):
self.transport = transport
#@asyncio.coroutine
def data_received(self, data):
print('data received: {}'.format(data.decode()))
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
self.transport.close()
用于以下客户端:
class MathClient(asyncio.Protocol):
def connection_made(self, transport):
transport.write(json.dumps(2.).encode('utf8'))
def data_received(self, data):
print('data received: {}'.format(data.decode()))
def connection_lost(self, exc):
asyncio.get_event_loop().stop()
调用 self.fast_sqrt
后,一切正常.
With self.fast_sqrt
being called, everything works as expected.
使用 self.slow_sqrt
,它不起作用.
它也不适用于 self.fast_sqrt
和 data_received
上的 @asyncio.coroutine
装饰器.
It also does not work with self.fast_sqrt
and the @asyncio.coroutine
decorator on data_received
.
我觉得我在这里遗漏了一些基本的东西.
I feel I am missing something fundamental here.
完整代码在这里:
- 服务器
- 客户端
测试:
- Python 3.4.0b1 (Windows)
- Python 3.3.3 + asyncio-0.2.1 (FreeBSD)
两者的问题是相同的:使用 slow_sqrt
,客户端/服务器将无所事事地挂起.
The issue is the same on both: with slow_sqrt
, the client/server will just hang doing nothing.
解决方案
看来,这需要通过 Future
来解耦——尽管我仍然不确定这是否是正确的方法.
It seems, this needs to be decoupled via a Future
- though I am still not sure if this is the right way.
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(2)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def consume(self):
while True:
self.waiter = asyncio.Future()
yield from self.waiter
while len(self.receive_queue):
data = self.receive_queue.popleft()
if self.transport:
try:
res = self.process(data)
if isinstance(res, asyncio.Future) or
inspect.isgenerator(res):
res = yield from res
except Exception as e:
print(e)
def connection_made(self, transport):
self.transport = transport
self.receive_queue = deque()
asyncio.Task(self.consume())
def data_received(self, data):
self.receive_queue.append(data)
if not self.waiter.done():
self.waiter.set_result(None)
print("data_received {} {}".format(len(data), len(self.receive_queue)))
def process(self, data):
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
#self.transport.close()
def connection_lost(self, exc):
self.transport = None
这是 Guido van 的答案罗森:
Here is an answer by Guido van Rossum:
解决方案很简单:将该逻辑编写为标记为单独的方法使用 @coroutine
,并在 data_received()
中使用async()
(在本例中为 == Task()
).不建这个的原因进入协议的是,如果是,则需要备用事件处理协程的循环实现.
The solution is simple: write that logic as a separate method marked with
@coroutine
, and fire it off indata_received()
usingasync()
(== Task()
, in this case). The reason why this isn't built into the protocol is that if it was, it would require alternate event loop implementations to deal with coroutines.
def data_received(self, data):
asyncio.ensure_future(self.process_data(data))
@asyncio.coroutine
def process_data(self, data):
# ...stuff using yield from...
完整代码在这里:- 客户端- 服务器
Full code is here: - Client - Server
相关文章