Django Celery与Amazon SQS的集成指南
Django Celery是一个强大的任务队列框架,而Amazon SQS是一种可靠的消息队列服务,将它们集成在一起可以让我们更好地管理和分配任务。以下是详细的集成指南:
- 安装Django Celery
首先,我们需要安装Django Celery。可以通过pip安装:
pip install celery
在Django项目的settings.py文件中添加以下代码:
INSTALLED_APPS = [ ... 'django_celery_results', 'celery', ] CELERY_RESULT_BACKEND = 'django-db' CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
- 安装Amazon SQS
我们还需要安装Amazon SQS。可以通过AWS SDK for Python (Boto3)安装:
pip install boto3
接下来需要创建一个IAM用户,并授权该用户访问Amazon SQS服务。然后在settings.py文件中添加以下配置:
AWS_ACCESS_KEY_ID = 'YOUR_ACCESS_KEY' AWS_SECRET_ACCESS_KEY = 'YOUR_SECRET_ACCESS_KEY' AWS_REGION_NAME = 'YOUR_REGION_NAME'
- 配置Celery
现在我们需要配置Celery并将它与Amazon SQS集成。以下是一个示例配置:
from kombu import Queue CELERY_BEAT_SCHEDULE = {} CELERY_TIMEZONE = 'UTC' CELERY_QUEUES = ( Queue('default', routing_key='task.#'), Queue('high', routing_key='task.high.#'), ) BROKER_TRANSPORT_OPTIONS = { 'region': AWS_REGION_NAME, 'polling_interval': 1, 'visibility_timeout': 3600, } CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = 'tasks' CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' CELERY_DEFAULT_ROUTING_KEY = 'task.default' CELERY_ROUTES = { 'tasks.some_task': {'queue': 'high', 'routing_key': 'task.high'} } CELERY_TASK_DEFAULT_PRIORITY = 5
在这个示例配置中,我们使用了两个Celery队列:default和high。我们还定义了一个名为some_task的任务,并将其分配到high队列中。
- 定义任务
现在我们可以开始定义任务。以下是一个简单的示例:
from celery import shared_task import time @shared_task def some_task(): print('任务开始...') time.sleep(5) print('任务结束')
在这个示例中,我们定义了一个名为some_task的任务,它会打印一些文本然后等待5秒钟,在等待结束后再次打印一些文本。
- 启动Celery Worker
现在我们可以启动Celery worker。在终端中输入以下命令:
celery -A your_project_name worker -l info --concurrency=4 --logfile=./celery.log
这个命令将启动4个worker,其中info日志级别详细记录了执行过程,而logfile选项将记录日志以便于调试。
- 发布任务
最后,我们需要发布任务到Amazon SQS队列。以下是一个示例代码:
from django_celery_results.models import TaskResult from celery import current_app from boto3 import resource sqs = resource('sqs', region_name=AWS_REGION_NAME, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY) queue = sqs.get_queue_by_name(QueueName='test') task = some_task.delay() result = TaskResult(task_id=task.id) result.save() queue.send_message(MessageBody=task.id)
在这个示例中,我们获取名为test的队列的引用,然后我们使用some_task.delay()方法发布了一个任务并获取了任务的ID。我们还保存了任务ID以便将来查询任务的结果。最后,我们使用队列的send_message()方法将任务ID发送到Amazon SQS队列中。
这些就是将Django Celery与Amazon SQS集成的详细指南。希望对你有帮助!
相关文章