Django中的定时任务:使用Celery Beat
Django中的定时任务通常使用Celery Beat来实现。Celery Beat是Celery的一个子模块,它提供了定时调度任务的功能。
首先,在Django项目的settings.py文件中添加以下配置:
# Celery配置 CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_TIMEZONE = 'Asia/Shanghai' # Celery Beat配置 CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
这里使用Redis作为消息代理和结果存储,设置Celery的时区为上海,使用Django自带的数据库作为Celery Beat的调度器。
接下来,创建一个定时任务,例如发送邮件。首先在Django项目中创建一个tasks.py文件,并在其中定义发送邮件的任务:
from celery import shared_task from django.core.mail import send_mail @shared_task def send_email(subject, message, from_email, recipient_list): send_mail(subject, message, from_email, recipient_list)
然后,在Django的app中的models.py文件中定义一个定时任务模型ScheduledTask:
from django.db import models from django.utils import timezone class ScheduledTask(models.Model): task_name = models.CharField(max_length=200) task_args = models.CharField(max_length=200, blank=True) task_kwargs = models.CharField(max_length=200, blank=True) enabled = models.BooleanField(default=True) schedule_type = models.CharField(max_length=200) schedule_args = models.CharField(max_length=200) last_run = models.DateTimeField(null=True, blank=True) def __str__(self): return self.task_name def run(self): task = app.tasks.get(self.task_name) if task: self.last_run = timezone.now() if self.schedule_type == 'interval': task.apply_async(args=self.task_args, kwargs=self.task_kwargs, interval=self.schedule_args, expires=60) elif self.schedule_type == 'crontab': task.apply_async(args=self.task_args, kwargs=self.task_kwargs, crontab=self.schedule_args, expires=60) self.save()
定时任务模型ScheduledTask包含了任务名称、参数、调度类型、调度参数等字段,还有一个run()方法,执行该任务的逻辑。在这个方法中,先获取任务对象,然后根据不同的调度类型,调用Celery的apply_async()方法来启动异步任务,最后更新任务的最后执行时间。
接着,在Django项目的urls.py文件中,添加一个视图函数schedule_task(),用于启动定时任务:
from django.shortcuts import HttpResponse from app.models import ScheduledTask def schedule_task(request): task = ScheduledTask.objects.get(task_name='app.tasks.send_email') task.run() return HttpResponse('启动定时任务成功')
这个视图函数只是简单地获取任务模型对象,然后调用run()方法来启动任务。
最后,在命令行中启动Celery Beat:
$ celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
这里使用了myproject作为Django项目的名称,上面提到的CELERY_BEAT_SCHEDULER配置指定了使用Django自带的数据库作为Celery Beat的调度器,使用info级别输出日志。
现在,可以通过访问schedule_task()视图函数启动定时任务了:
$ curl http://localhost:8000/schedule_task
以上就是使用Celery Beat在Django中实现定时任务的示例代码。在调用完run()方法后,Celery Beat会将该任务的调度信息存储在数据库中,并定期从数据库中获取任务信息并启动异步任务。
相关文章