来自python的asyncio是否支持用于UDP网络的基于协程的API?
问题描述
今天晚上我浏览了pythonasyncio
模块文档,为我的一个课程项目寻找一些想法,但我很快发现Python的标准aysncio
模块可能缺少功能。
如果您仔细阅读文档,您会发现有一个基于回调的API和一个基于协程的API。回调API可以同时用于构建UDP和TCP应用程序,而协程API看起来只能用于构建TCP应用程序,因为它使用了流样式API。
这给我带来了相当大的问题,因为我正在寻找用于UDP网络的基于协程的API,尽管我确实发现asyncio
支持sock_recv
和sock_sendall
等基于协程的低级套接字方法,但是UDP网络的关键APIrecvfrom
和sendto
并不存在。
我想做的是编写一些代码,如:
async def handle_income_packet(sock):
await data, addr = sock.recvfrom(4096)
# data handling here...
await sock.sendto(addr, response)
我知道这可以使用回调API等效地实现,但这里的问题是回调不是协程而是常规函数,因此您不能在其中将控制权交还给事件循环并保留函数执行状态。
看看上面的代码,如果我们需要在数据处理部分做一些阻塞-IO操作,只要我们的IO操作也是在协程中完成的,协程版本就不会有问题:
async def handle_income_packet(sock):
await data, addr = sock.recvfrom(4096)
async with aiohttp.ClientSession() as session:
info = await session.get(...)
response = generate_response_from_info(info)
await sock.sendto(addr, response)
只要我们使用await
,事件循环就会从该点获取控制流来处理其他事情,直到IO完成。但遗憾的是,这些代码目前不可用,因为我们在asyncio
中没有socket.sendto
和socket.recvfrom
的协同版本。
我们可以使用传输协议回调API实现此功能:
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
self.transport = transport
def data_received(self, data):
info = requests.get(...)
response = generate_response_from_info(info)
self.transport.write(response)
self.transport.close()
我们不能在那里执行协程await
,因为回调不是协程,使用上面这样的阻塞IO调用会使回调中的控制流停止,并阻止循环处理任何其他事件,直到IO完成
data_received
函数中创建Future
对象,将其添加到事件循环中,并将所需的任何状态变量存储在协议类中,然后显式地将控制权返回给循环。虽然这可以工作,但它确实创建了许多复杂的代码,而在协程版本中,它们以任何方式都不需要。
还有here我们有一个使用非阻塞套接字和add_reader
处理UDP套接字的示例。但与协程版本的几行代码相比,代码看起来仍然很复杂。
我想指出的一点是,协程是一个非常好的设计,它可以在一个线程中利用并发的力量,同时也有一个非常简单的设计模式,可以节省脑力和不必要的代码行,但是让它在UDP网络中工作的关键部分在我们的asyncio
标准库中确实缺乏。
你们觉得这件事怎么样?
此外,如果有任何关于第三方库支持这种用于UDP网络的API的建议,为了我的课程项目,我将非常感激。我发现Bluelet很像这样的事情,但它似乎没有被主动维护。
编辑:
似乎PR确实实现了此功能,但被asyncio
开发人员拒绝。开发人员声称,所有功能都可以使用create_datagram_endpoint()
协议传输接口实现。但就像我在上面讨论的那样,与在许多用例中使用回调API相比,协程API具有简单性的力量,非常不幸的是,我们在UDP中没有这些功能。
udp
之所以没有提供流型推荐答案,是因为流在回调之上提供排序,而udp通信本身就是无序的,所以两者根本不兼容。
但这并不意味着您不能从回调中调用协程-事实上,这非常简单!从EchoServerProtocol
example开始,您可以这样做:
def datagram_received(self, data, addr):
loop = asyncio.get_event_loop()
loop.create_task(self.handle_income_packet(data, addr))
async def handle_income_packet(self, data, addr):
# echo back the message, but 2 seconds later
await asyncio.sleep(2)
self.transport.sendto(data, addr)
这里datagram_received
启动您的handle_income_packet
协同例程,它可以自由地等待任意数量的协同例程。由于协程在"后台"运行,因此事件循环在任何时候都不会被阻塞,datagram_received
会立即返回,就像预期的那样。
相关文章