定期调用deferToThread

2022-04-18 00:00:00 python twisted

问题描述

我有一个字符串列表,我希望定期处理这些字符串。

开始处理新字符串的时间为1秒,处理一个字符串需要3秒。

我希望看到的是,从第三秒开始,我将每秒看到一个新结果,直到处理完所有字符串。

然而,我实际看到的是,当生成所有结果时,所有结果都一起出现。 因此,问题是,如何修改代码以实现我期望看到的结果?

from twisted.internet import reactor, threads
import json
import time


def process(string):
    print "Processing " + string + "
"
    time.sleep(3)  # simulate computation time

    # write result to file; result is mocked by string*3
    file_name = string + ".txt"
    with open(file_name, "w") as fp:
        json.dump(string*3, fp)

    print string + " processed
"

string_list = ["AAAA", "BBBB", "CCCC", "XXXX", "YYYY", "ZZZZ"]

for s in string_list:
    # start a new thread every second
    time.sleep(1)
    threads.deferToThread(process, s)

reactor.run()
同时,看起来生成结果的顺序与处理字符串的顺序不一致。我猜它只是印得乱七八糟,但实际上它们是按顺序处理的。如何验证我的猜测?

我注意到的另一件小事是Processing YYYY没有打印在正确的位置。为什么会这样呢?(在它和上一个结果之间应该有一个空行。)

Processing AAAA

Processing BBBB

Processing CCCC

Processing XXXX
Processing YYYY


Processing ZZZZ

YYYY processed

CCCC processed

AAAA processed

BBBB processed

XXXX processed

ZZZZ processed

解决方案

这部分代码的作用:

for s in string_list:
    # start a new thread every second
    time.sleep(1)
    threads.deferToThread(process, s)

reactor.run()
在每个调度操作之间以一秒延迟来调度每个工作块。然后,最后,它启动反应堆,允许处理开始。在reactor.run()之前没有处理。

使用time.sleep(1)还意味着您的延迟被阻止,一旦您解决了上述问题,这将是一个问题。

一种解决方案是将for循环和time.sleep(1)替换为LoopingCall

from twisted.internet.task import LoopingCall, react

string_list = [...]
def process(string):
    ...

def process_strings(the_strings, f):
    def dispatch(s):
        d = deferToThread(f, s)
        # Add callback / errback to d here to process the
        # result or report any problems.
        # Do _not_ return `d` though.  LoopingCall will
        # wait on it before running the next iteration if
        # we do.

    string_iter = iter(the_strings)
    c = LoopingCall(lambda: dispatch(next(string_iter)))
    d = c.start(1)
    d.addErrback(lambda err: err.trap(StopIteration))
    return d

def main(reactor):
    return process_strings(string_list, process)

react(main, [])

此代码使用react启动和停止反应器(当main返回的Deferred触发时停止)。它使用LoopingCall以句点1开始在线程池中运行f(next(string_iter)),直到遇到StopIteration(或某些其他错误)。

(LoopingCalldeferToThread都接受*args**kwargs传递给它们的可调用对象,因此如果您愿意(这是一个风格问题),也可以将该表达式编写为LoopingCall(lambda: deferToThread(f, next(string_iter)))。您不能"展开"剩余的lambda,因为这将导致LoopingCall(deferToThread, f, next(string_iter))在调用LoopingCall时只计算一次next(string_iter),因此您将永远处理第一个字符串。)

也有其他可能的调度方法。例如,您可以使用cooperate一次只运行3个处理线程-在旧线程完成后立即启动一个新线程。

from twisted.internet.defer import gatherResults
from twisted.internet.task import cooperate

def process_strings(the_strings, f):
    # Define a generator of all of the jobs to be accomplished.
    work_iter = (
        deferToThread(lambda: f(a_string))
        for a_string
        in the_strings
    )
    # Consume jobs from the generator in parallel until done.
    tasks = list(cooperate(work_iter) for i in range(3))

    # Return a Deferred that fires when all three tasks have
    # finished consuming all available jobs.
    return gatherResults(list(
        t.whenDone()
        for t
        in tasks
    ))

在这两种情况下,请注意不使用time.sleep

相关文章