Django中的定时任务:使用Celery Beat

2023-04-11 00:00:00 django 定时 Beat

Django中的定时任务通常使用Celery Beat来实现。Celery Beat是Celery的一个子模块,它提供了定时调度任务的功能。

首先,在Django项目的settings.py文件中添加以下配置:

# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'Asia/Shanghai'

# Celery Beat配置
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

这里使用Redis作为消息代理和结果存储,设置Celery的时区为上海,使用Django自带的数据库作为Celery Beat的调度器。

接下来,创建一个定时任务,例如发送邮件。首先在Django项目中创建一个tasks.py文件,并在其中定义发送邮件的任务:

from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_email(subject, message, from_email, recipient_list):
    send_mail(subject, message, from_email, recipient_list)

然后,在Django的app中的models.py文件中定义一个定时任务模型ScheduledTask:

from django.db import models
from django.utils import timezone

class ScheduledTask(models.Model):
    task_name = models.CharField(max_length=200)
    task_args = models.CharField(max_length=200, blank=True)
    task_kwargs = models.CharField(max_length=200, blank=True)
    enabled = models.BooleanField(default=True)
    schedule_type = models.CharField(max_length=200)
    schedule_args = models.CharField(max_length=200)
    last_run = models.DateTimeField(null=True, blank=True)

    def __str__(self):
        return self.task_name

    def run(self):
        task = app.tasks.get(self.task_name)
        if task:
            self.last_run = timezone.now()
            if self.schedule_type == 'interval':
                task.apply_async(args=self.task_args, kwargs=self.task_kwargs,
                    interval=self.schedule_args, expires=60)
            elif self.schedule_type == 'crontab':
                task.apply_async(args=self.task_args, kwargs=self.task_kwargs,
                    crontab=self.schedule_args, expires=60)
            self.save()

定时任务模型ScheduledTask包含了任务名称、参数、调度类型、调度参数等字段,还有一个run()方法,执行该任务的逻辑。在这个方法中,先获取任务对象,然后根据不同的调度类型,调用Celery的apply_async()方法来启动异步任务,最后更新任务的最后执行时间。

接着,在Django项目的urls.py文件中,添加一个视图函数schedule_task(),用于启动定时任务:

from django.shortcuts import HttpResponse
from app.models import ScheduledTask

def schedule_task(request):
    task = ScheduledTask.objects.get(task_name='app.tasks.send_email')
    task.run()
    return HttpResponse('启动定时任务成功')

这个视图函数只是简单地获取任务模型对象,然后调用run()方法来启动任务。

最后,在命令行中启动Celery Beat:

$ celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

这里使用了myproject作为Django项目的名称,上面提到的CELERY_BEAT_SCHEDULER配置指定了使用Django自带的数据库作为Celery Beat的调度器,使用info级别输出日志。

现在,可以通过访问schedule_task()视图函数启动定时任务了:

$ curl http://localhost:8000/schedule_task

以上就是使用Celery Beat在Django中实现定时任务的示例代码。在调用完run()方法后,Celery Beat会将该任务的调度信息存储在数据库中,并定期从数据库中获取任务信息并启动异步任务。

相关文章