来自python的asyncio是否支持用于UDP网络的基于协程的API?

2022-03-25 00:00:00 python python-asyncio

问题描述

今天晚上我浏览了pythonasyncio模块文档,为我的一个课程项目寻找一些想法,但我很快发现Python的标准aysncio模块可能缺少功能。

如果您仔细阅读文档,您会发现有一个基于回调的API和一个基于协程的API。回调API可以同时用于构建UDP和TCP应用程序,而协程API看起来只能用于构建TCP应用程序,因为它使用了流样式API。

这给我带来了相当大的问题,因为我正在寻找用于UDP网络的基于协程的API,尽管我确实发现asyncio支持sock_recvsock_sendall等基于协程的低级套接字方法,但是UDP网络的关键APIrecvfromsendto并不存在。

我想做的是编写一些代码,如:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    # data handling here...
    await sock.sendto(addr, response)

我知道这可以使用回调API等效地实现,但这里的问题是回调不是协程而是常规函数,因此您不能在其中将控制权交还给事件循环并保留函数执行状态。

看看上面的代码,如果我们需要在数据处理部分做一些阻塞-IO操作,只要我们的IO操作也是在协程中完成的,协程版本就不会有问题:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    async with aiohttp.ClientSession() as session:
        info = await session.get(...)
    response = generate_response_from_info(info)
    await sock.sendto(addr, response)

只要我们使用await,事件循环就会从该点获取控制流来处理其他事情,直到IO完成。但遗憾的是,这些代码目前不可用,因为我们在asyncio中没有socket.sendtosocket.recvfrom的协同版本。

我们可以使用传输协议回调API实现此功能:

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        self.transport = transport

    def data_received(self, data):
        info = requests.get(...)
        response = generate_response_from_info(info)
        self.transport.write(response)
        self.transport.close()

我们不能在那里执行协程await,因为回调不是协程,使用上面这样的阻塞IO调用会使回调中的控制流停止,并阻止循环处理任何其他事件,直到IO完成

推荐的另一个实现思想是在data_received函数中创建Future对象,将其添加到事件循环中,并将所需的任何状态变量存储在协议类中,然后显式地将控制权返回给循环。虽然这可以工作,但它确实创建了许多复杂的代码,而在协程版本中,它们以任何方式都不需要。

还有here我们有一个使用非阻塞套接字和add_reader处理UDP套接字的示例。但与协程版本的几行代码相比,代码看起来仍然很复杂。

我想指出的一点是,协程是一个非常好的设计,它可以在一个线程中利用并发的力量,同时也有一个非常简单的设计模式,可以节省脑力和不必要的代码行,但是让它在UDP网络中工作的关键部分在我们的asyncio标准库中确实缺乏。

你们觉得这件事怎么样?

此外,如果有任何关于第三方库支持这种用于UDP网络的API的建议,为了我的课程项目,我将非常感激。我发现Bluelet很像这样的事情,但它似乎没有被主动维护。

编辑:

似乎PR确实实现了此功能,但被asyncio开发人员拒绝。开发人员声称,所有功能都可以使用create_datagram_endpoint()协议传输接口实现。但就像我在上面讨论的那样,与在许多用例中使用回调API相比,协程API具有简单性的力量,非常不幸的是,我们在UDP中没有这些功能。

udp

之所以没有提供流型推荐答案,是因为流在回调之上提供排序,而udp通信本身就是无序的,所以两者根本不兼容。

但这并不意味着您不能从回调中调用协程-事实上,这非常简单!从EchoServerProtocol example开始,您可以这样做:

def datagram_received(self, data, addr):
    loop = asyncio.get_event_loop()
    loop.create_task(self.handle_income_packet(data, addr))

async def handle_income_packet(self, data, addr):
    # echo back the message, but 2 seconds later
    await asyncio.sleep(2)
    self.transport.sendto(data, addr)
这里datagram_received启动您的handle_income_packet协同例程,它可以自由地等待任意数量的协同例程。由于协程在"后台"运行,因此事件循环在任何时候都不会被阻塞,datagram_received会立即返回,就像预期的那样。

相关文章