定期调用deferToThread
问题描述
我有一个字符串列表,我希望定期处理这些字符串。
开始处理新字符串的时间为1秒,处理一个字符串需要3秒。
我希望看到的是,从第三秒开始,我将每秒看到一个新结果,直到处理完所有字符串。
然而,我实际看到的是,当生成所有结果时,所有结果都一起出现。 因此,问题是,如何修改代码以实现我期望看到的结果?
from twisted.internet import reactor, threads
import json
import time
def process(string):
print "Processing " + string + "
"
time.sleep(3) # simulate computation time
# write result to file; result is mocked by string*3
file_name = string + ".txt"
with open(file_name, "w") as fp:
json.dump(string*3, fp)
print string + " processed
"
string_list = ["AAAA", "BBBB", "CCCC", "XXXX", "YYYY", "ZZZZ"]
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
同时,看起来生成结果的顺序与处理字符串的顺序不一致。我猜它只是印得乱七八糟,但实际上它们是按顺序处理的。如何验证我的猜测?
我注意到的另一件小事是Processing YYYY
没有打印在正确的位置。为什么会这样呢?(在它和上一个结果之间应该有一个空行。)
Processing AAAA
Processing BBBB
Processing CCCC
Processing XXXX
Processing YYYY
Processing ZZZZ
YYYY processed
CCCC processed
AAAA processed
BBBB processed
XXXX processed
ZZZZ processed
解决方案
这部分代码的作用:
for s in string_list:
# start a new thread every second
time.sleep(1)
threads.deferToThread(process, s)
reactor.run()
在每个调度操作之间以一秒延迟来调度每个工作块。然后,最后,它启动反应堆,允许处理开始。在reactor.run()
之前没有处理。
使用time.sleep(1)
还意味着您的延迟被阻止,一旦您解决了上述问题,这将是一个问题。
一种解决方案是将for
循环和time.sleep(1)
替换为LoopingCall
。
from twisted.internet.task import LoopingCall, react
string_list = [...]
def process(string):
...
def process_strings(the_strings, f):
def dispatch(s):
d = deferToThread(f, s)
# Add callback / errback to d here to process the
# result or report any problems.
# Do _not_ return `d` though. LoopingCall will
# wait on it before running the next iteration if
# we do.
string_iter = iter(the_strings)
c = LoopingCall(lambda: dispatch(next(string_iter)))
d = c.start(1)
d.addErrback(lambda err: err.trap(StopIteration))
return d
def main(reactor):
return process_strings(string_list, process)
react(main, [])
此代码使用react
启动和停止反应器(当main
返回的Deferred
触发时停止)。它使用LoopingCall
以句点1开始在线程池中运行f(next(string_iter))
,直到遇到StopIteration
(或某些其他错误)。
(LoopingCall
和deferToThread
都接受*args
和**kwargs
传递给它们的可调用对象,因此如果您愿意(这是一个风格问题),也可以将该表达式编写为LoopingCall(lambda: deferToThread(f, next(string_iter)))
。您不能"展开"剩余的lambda,因为这将导致LoopingCall(deferToThread, f, next(string_iter))
在调用LoopingCall
时只计算一次next(string_iter)
,因此您将永远处理第一个字符串。)
也有其他可能的调度方法。例如,您可以使用cooperate
一次只运行3个处理线程-在旧线程完成后立即启动一个新线程。
from twisted.internet.defer import gatherResults
from twisted.internet.task import cooperate
def process_strings(the_strings, f):
# Define a generator of all of the jobs to be accomplished.
work_iter = (
deferToThread(lambda: f(a_string))
for a_string
in the_strings
)
# Consume jobs from the generator in parallel until done.
tasks = list(cooperate(work_iter) for i in range(3))
# Return a Deferred that fires when all three tasks have
# finished consuming all available jobs.
return gatherResults(list(
t.whenDone()
for t
in tasks
))
在这两种情况下,请注意不使用time.sleep
。
相关文章