在Django中配置Celery以处理多个任务队列

2023-04-11 00:00:00 多个 队列 配置
  1. 安装Celery

在Django项目的虚拟环境下运行以下命令安装Celery:

pip install Celery
  1. 配置Celery

在Django项目的settings.py文件中,加入如下配置:

INSTALLED_APPS = [
    # ...
    'django_celery_results',
    'django_celery_beat',
]

CELERY_BROKER_URL = 'redis://localhost:6379/0' # 指定消息代理的地址,这里使用Redis作为消息代理
CELERY_RESULT_BACKEND = 'django-db'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERY_BEAT_SCHEDULE = {
    'task1': {
        'task': 'tasks.task1',
        'schedule': timedelta(minutes=5), # 定时任务1,每5分钟执行一次
    },
    'task2': {
        'task': 'tasks.task2',
        'schedule': crontab(minute='*/15'), # 定时任务2,每15分钟执行一次
    },
}

CELERY_IMPORTS = ('tasks',)
  1. 定义任务

在Django项目下创建一个名为tasks.py的文件,定义两个任务task1()和task2():

from celery import shared_task

@shared_task
def task1():
    print('Task1: pidancode.com')

@shared_task
def task2():
    print('Task2: 皮蛋编程')
  1. 运行Celery

在虚拟环境中运行以下命令启动Celery worker和Celery beat:

celery -A proj worker --loglevel=info
celery -A proj beat --loglevel=info

以上命令中,-A参数指定Django项目的名称,worker参数指定启动worker进程,beat参数指定启动beat进程。

  1. 使用Celery

在Django项目中调用任务task1和task2:

from tasks import task1, task2

task1.delay()
task2.delay()

以上代码中,task1.delay()和task2.delay()表示异步调用任务task1和task2。

注意:

为了使用Celery,还需要安装Redis和Django Celery Results,使用以下命令安装:

pip install redis
pip install django-celery-results
pip install django-celery-beat

在Django项目的urls.py文件中加入如下路由,检查Celery是否正常运行:

from django.urls import path
from django_celery_results.views import TaskResultView

urlpatterns = [
    path('celery-progress/<task_id>/', TaskResultView.as_view(), name='celery_progress'),
]

相关文章