如何跟踪形成芹菜和弦首部的组内单个任务的进度?
问题描述
import celery
def temptask(n):
header=list(tempsubtask.si(i) for i in range(n))
callback=templink.si('printed at last?')
r = celery.chord(celery.group(header))(callback)
return r
@task()
def tempsubtask(i):
print i
for x in range(i):
time.sleep(2)
current_task.update_state(
state='PROGRESS', meta={'completed': x, 'total': i })
@task()
def templink(x):
print 'this should be run at last %s'%x
#executing temptask
r = temptask(100)
我希望访问由tempsubtask更新的进度状态。我如何着手实现它?
解决方案
搜索了几个小时后,我偶然发现了http://www.manasupo.com/2012/03/chord-progress-in-celery.html。虽然那里的解决方案对我来说不是开箱即用的,但它确实激励了我尝试类似的东西。
from celery.utils import uuid
from celery import chord
class ProgressChord(chord):
def __call__(self, body=None, **kwargs):
_chord = self.type
body = (body or self.kwargs['body']).clone()
kwargs = dict(self.kwargs, body=body, **kwargs)
if _chord.app.conf.CELERY_ALWAYS_EAGER:
return self.apply((), kwargs)
callback_id = body.options.setdefault('task_id', uuid())
r= _chord(**kwargs)
return _chord.AsyncResult(callback_id), r
我使用的不是celery.chord,而是ProgressChord,如下所示:
def temptask(n):
header=list(tempsubtask.si(i) for i in range(n))
callback=templink.si('printed at last?')
r = celery.Progresschord(celery.group(header))(callback)
return r
r的返回值包含同时具有回调异步结果和组结果的元组。所以成功看起来是这样的:
In [3]: r
Out[3]:
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
<GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)
我继承并重写了[celery.chord][1]
而不是[celery.task.chords.Chord][2]
,因为我在任何地方都找不到它的源。
相关文章