芹菜中如何向特定队列发送周期性任务

2022-02-27 00:00:00 python celery django-celery

问题描述

默认情况下,芹菜将所有任务发送到‘celery’队列,但您可以通过添加额外参数来更改此行为:

@task(queue='celery_periodic')
def recalc_last_hour():
    log.debug('sending new task')
    recalc_hour.delay(datetime(2013, 1, 1, 2)) # for example

日程安排程序设置:

CELERYBEAT_SCHEDULE = {
   'installer_recalc_hour': {
        'task': 'stats.installer.tasks.recalc_last_hour',
        'schedule': 15  # every 15 sec for test
    },
}
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

运行工作进程:

python manage.py celery worker -c 1 -Q celery_periodic -B -E

此方案未按预期工作:此工作进程将定期任务发送到"celery"队列,而不是"celery_periical"。我如何修复它?

P.S.芹菜==3.0.16


解决方案

我找到了此问题的解决方案:

1)首先,我更改了周期性任务的配置方式。我使用了这样的@PERIONIC_TASK修饰符:

@periodic_task(run_every=crontab(minute='5'),
               queue='celery_periodic',
               options={'queue': 'celery_periodic'})
def recalc_last_hour():
    dt = datetime.utcnow()
    prev_hour = datetime(dt.year, dt.month, dt.day, dt.hour) 
                - timedelta(hours=1)
    log.debug('Generating task for hour %s', str(prev_hour))
    recalc_hour.delay(prev_hour)

2)我在@periical_task的参数中写了两次celery_periical:

  • queue=‘celery_periodic’选项在您从代码(.Delay或.Apply_Async)调用任务时使用

  • options={‘queue’:‘celery_periical’}选项在芹菜节拍调用时使用。

我确信,如果您使用CELERYBEAT_SCHEDUE变量配置定期任务,同样的事情也是可能的。

更新。此解决方案适用于CELERYBEAT_SCHEDULER的基于数据库和基于文件的存储。

相关文章