Django Celery与Amazon SQS的集成指南

2023-04-11 00:00:00 celery 集成 指南

Django Celery是一个强大的任务队列框架,而Amazon SQS是一种可靠的消息队列服务,将它们集成在一起可以让我们更好地管理和分配任务。以下是详细的集成指南:

  1. 安装Django Celery

首先,我们需要安装Django Celery。可以通过pip安装:

pip install celery

在Django项目的settings.py文件中添加以下代码:

INSTALLED_APPS = [
    ...
    'django_celery_results',
    'celery',
]

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
  1. 安装Amazon SQS

我们还需要安装Amazon SQS。可以通过AWS SDK for Python (Boto3)安装:

pip install boto3

接下来需要创建一个IAM用户,并授权该用户访问Amazon SQS服务。然后在settings.py文件中添加以下配置:

AWS_ACCESS_KEY_ID = 'YOUR_ACCESS_KEY'
AWS_SECRET_ACCESS_KEY = 'YOUR_SECRET_ACCESS_KEY'
AWS_REGION_NAME = 'YOUR_REGION_NAME'
  1. 配置Celery

现在我们需要配置Celery并将它与Amazon SQS集成。以下是一个示例配置:

from kombu import Queue

CELERY_BEAT_SCHEDULE = {}

CELERY_TIMEZONE = 'UTC'

CELERY_QUEUES = (
    Queue('default', routing_key='task.#'),
    Queue('high', routing_key='task.high.#'),
)

BROKER_TRANSPORT_OPTIONS = {
   'region': AWS_REGION_NAME,
   'polling_interval': 1,
   'visibility_timeout': 3600,
}

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'

CELERY_ROUTES = {
    'tasks.some_task': {'queue': 'high', 'routing_key': 'task.high'}
}

CELERY_TASK_DEFAULT_PRIORITY = 5

在这个示例配置中,我们使用了两个Celery队列:default和high。我们还定义了一个名为some_task的任务,并将其分配到high队列中。

  1. 定义任务

现在我们可以开始定义任务。以下是一个简单的示例:

from celery import shared_task
import time

@shared_task
def some_task():
    print('任务开始...')
    time.sleep(5)
    print('任务结束')

在这个示例中,我们定义了一个名为some_task的任务,它会打印一些文本然后等待5秒钟,在等待结束后再次打印一些文本。

  1. 启动Celery Worker

现在我们可以启动Celery worker。在终端中输入以下命令:

celery -A your_project_name worker -l info --concurrency=4 --logfile=./celery.log

这个命令将启动4个worker,其中info日志级别详细记录了执行过程,而logfile选项将记录日志以便于调试。

  1. 发布任务

最后,我们需要发布任务到Amazon SQS队列。以下是一个示例代码:

from django_celery_results.models import TaskResult
from celery import current_app
from boto3 import resource

sqs = resource('sqs', region_name=AWS_REGION_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
queue = sqs.get_queue_by_name(QueueName='test')

task = some_task.delay()
result = TaskResult(task_id=task.id)
result.save()
queue.send_message(MessageBody=task.id)

在这个示例中,我们获取名为test的队列的引用,然后我们使用some_task.delay()方法发布了一个任务并获取了任务的ID。我们还保存了任务ID以便将来查询任务的结果。最后,我们使用队列的send_message()方法将任务ID发送到Amazon SQS队列中。

这些就是将Django Celery与Amazon SQS集成的详细指南。希望对你有帮助!

相关文章