在Django中配置Celery以处理多个任务队列
- 安装Celery
在Django项目的虚拟环境下运行以下命令安装Celery:
pip install Celery
- 配置Celery
在Django项目的settings.py文件中,加入如下配置:
INSTALLED_APPS = [ # ... 'django_celery_results', 'django_celery_beat', ] CELERY_BROKER_URL = 'redis://localhost:6379/0' # 指定消息代理的地址,这里使用Redis作为消息代理 CELERY_RESULT_BACKEND = 'django-db' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_BEAT_SCHEDULE = { 'task1': { 'task': 'tasks.task1', 'schedule': timedelta(minutes=5), # 定时任务1,每5分钟执行一次 }, 'task2': { 'task': 'tasks.task2', 'schedule': crontab(minute='*/15'), # 定时任务2,每15分钟执行一次 }, } CELERY_IMPORTS = ('tasks',)
- 定义任务
在Django项目下创建一个名为tasks.py的文件,定义两个任务task1()和task2():
from celery import shared_task @shared_task def task1(): print('Task1: pidancode.com') @shared_task def task2(): print('Task2: 皮蛋编程')
- 运行Celery
在虚拟环境中运行以下命令启动Celery worker和Celery beat:
celery -A proj worker --loglevel=info celery -A proj beat --loglevel=info
以上命令中,-A参数指定Django项目的名称,worker参数指定启动worker进程,beat参数指定启动beat进程。
- 使用Celery
在Django项目中调用任务task1和task2:
from tasks import task1, task2 task1.delay() task2.delay()
以上代码中,task1.delay()和task2.delay()表示异步调用任务task1和task2。
注意:
为了使用Celery,还需要安装Redis和Django Celery Results,使用以下命令安装:
pip install redis pip install django-celery-results pip install django-celery-beat
在Django项目的urls.py文件中加入如下路由,检查Celery是否正常运行:
from django.urls import path from django_celery_results.views import TaskResultView urlpatterns = [ path('celery-progress/<task_id>/', TaskResultView.as_view(), name='celery_progress'), ]
相关文章