在一个单独的线程中为所有连接无限期地生成内容?

2022-04-18 00:00:00 python twisted

问题描述

我有一个Twisted项目,该项目寻求在JSON中通过TCP重新广播收集的数据。我实际上有一个USB库,我需要订阅它并在While循环中无限期地同步读取,如下所示:

while True:
    for line in usbDevice.streamData():
        data = MyBrandSpankingNewUSBDeviceData(line)
        # parse the data, convert to JSON
        output = convertDataToJSON(data)
        # broadcast the data
        ...
当然,问题在于...。本质上,我需要在服务器启动时立即启动该进程,并在服务器结束时结束它(Protocol.doStartProtocol.doStop),并使其不断运行并向每个连接的传输广播output

如何在Twisted中执行此操作?显然,我需要让While循环在它自己的线程中运行,但是我如何"订阅"客户端来监听输出呢?USB数据收集只运行一次也很重要,因为它多次运行可能会严重扰乱事情。

简而言之,我的架构如下:

  1. 服务器有一个USB集线器,该集线器一直在传输数据。服务器不断订阅此USB集线器并不断读取数据。
  2. 客户端来来去去,可以随意连接和断开。
我们希望只要数据可用,就向所有连接的客户端发送数据。如何在Twisted中执行此操作?


解决方案

您可能想要做的一件事是尝试扩展公共协议/传输独立性。即使您需要一个具有长时间运行循环的线程,您也可以对协议隐藏这一点。好处与往常一样:协议变得更易于测试,如果您曾经设法拥有读取USB事件的非线程实现,您只需更改传输,而无需更改协议。

from threading import Thread

class USBThingy(Thread):
    def __init__(self, reactor, device, protocol):
        self._reactor = reactor
        self._device = device
        self._protocol = protocol

    def run(self):
        while True:
            for line in self._device.streamData():
                self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)
<2-4]>的使用是此解决方案可用的一部分。它确保usbStreamLineReceived方法在反应线程中调用,而不是在从USB设备读取的线程中调用。因此,从协议对象的角度来看,线程化并没有什么特别之处:它只是在有数据要处理时偶尔调用它的方法。

然后,您的协议只需以某种方式实现usbStreamLineReceived,并实现其他特定于应用程序的逻辑,如保存观察者列表:

class SomeUSBProtocol(object):
    def __init__(self):
        self.observers = []

    def usbStreamLineReceived(self, line):
        data = MyBrandSpankingNewUSBDeviceData(line)
        # broadcast the data
        for obs in self.observers[:]:
            obs(output)

然后观察者可以向此类的实例注册自己,并对数据执行任何他们想要的操作:

class USBObserverThing(Protocol):
    def connectionMade(self):
        self.factory.usbProto.observers.append(self.emit)

    def connectionLost(self):
        self.factory.usbProto.observers.remove(self.emit)

    def emit(self, output):
        # parse the data, convert to JSON
        output = convertDataToJSON(data)
        self.transport.write(output)

全部连接在一起:

usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()

factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()

您可以想象一个更好的观察者注册/注销API(就像使用实际方法而不是直接访问该列表)。您还可以想象为USBThingy提供一个关闭方法,以便SomeUSBProtocol可以控制它何时停止运行(这样您的进程实际上将能够退出)。

相关文章