一种带阻塞功能的扭曲环路呼叫

2022-04-18 00:00:00 python multithreading twisted

问题描述

我有一个应用程序,它需要轮询数据库以获取可能的配置更改。该应用程序是一个使用Twisted的简单xmlrpc服务器。我尝试使用Twisted的LoopingCall执行轮询,但因为LoopingCall在主线程上运行,所以对db的调用被阻塞。因此,如果数据库调用由于某种原因而变慢,则对xmlrpc服务器的请求必须等待。因此,我尝试在线程中运行LoopingCall,但无法真正使其工作。我的问题是,我应该在线程中运行它吗?如果是,如何?

from twisted.web import xmlrpc, server
from twisted.internet.threads import deferToThread
from twisted.internet import reactor, task
import platform
from time import sleep

r = reactor

class Agent(xmlrpc.XMLRPC):
    self.routine()
    xmlrpc.XMLRPC.__init__(self)

    def xmlrpc_echo(self, x):
        """
        Return arg as a simple test that the server is running
        """
        return x

    def register(self):
        """
        Register Agent with db and pick up config
        """
        sleep(3)  # simulate slow db call
        print 'registered with db'

    def routine(self):
        looping_register = task.LoopingCall(self.register)
        looping_register.start(7.0, True)

if __name__ == '__main__':
    r.listenTCP(7081, server.Site(Agent()))
    print 'Agent is running on "%s"' % platform.node()
    r.run()

解决方案

您应该使用twisted.enterprise.adbapi模块。它将为所有兼容DBAPI 2.0的客户端提供非阻塞API,方法是在线程池中运行它们并向您返回标准延迟:

from twisted.enterprise import adbapi


dbpool = adbapi.ConnectionPool('psycopg2', 'mydb', 'andrew', 'password') # replace psycopg2 with your db client name.

def getAge(user):
    return dbpool.runQuery('SELECT age FROM users WHERE name = ?', user)

def printResult(l):
    if l:
        print l[0][0], "years old"
    else:
        print "No such user"

getAge("joe").addCallback(printResult)

有关更多信息和示例,请访问官方文档:https://twistedmatrix.com/documents/14.0.0/core/howto/rdbms.html

相关文章