如何在Django Celery中使用Webhooks
使用Webhooks在Django Celery中可以实现异步任务的自动触发和执行,下面是详细的步骤和代码演示:
步骤一:安装和配置Celery和Django Webhooks
1.安装Celery和Django Webhooks:
pip install celery pip install django-webhooks
2.在Django的settings.py文件中配置Celery:
# Celery配置 CELERY_TASK_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
3.在Django的settings.py文件中配置Webhooks:
WEBHOOKS_CONFIG = { 'WEBHOOKS_BACKEND': 'django_webhooks.backends.redis.RedisWebhookBackend', 'WEBHOOKS_BACKEND_OPTIONS': { 'url': 'redis://localhost:6379/0', 'channel': 'webhooks' } }
步骤二:定义Celery的异步任务并在Webhooks中创建触发器
1.定义Celery的异步任务:
from celery import shared_task import time @shared_task def print_info(info): time.sleep(2) print(info) return 'info printed'
2.在Webhooks中创建触发器:
from django_webhooks import api api.create_trigger( name='demo trigger', url='http://localhost:8000/hooks', event='task_complete', payload='{"task_id": "{{ task_id }}", "task_result": "{{ task_result }}", "info": "pidancode.com"}' )
上面的代码示例中,我们创建了一个名为“demo trigger”的Webhooks触发器,它会在Celery任务完成时触发事件,发送一个包含任务ID、任务结果和信息“pidancode.com”的payload。
步骤三:创建Webhooks视图函数
我们需要创建一个Django视图函数来处理Webhooks事件的请求,从请求的payload中获取任务ID和结果,并将它们发送到Celery任务队列中。
from django.http import HttpResponseBadRequest from django.views.decorators.csrf import csrf_exempt from celery.result import AsyncResult from django_webhooks import receive_webhook @csrf_exempt @receive_webhook(event="task_complete") def handle_task_complete(request, trigger, webhook): task_id = webhook.payload['task_id'] task_result = webhook.payload['task_result'] print(f'task completed with id {task_id} and result {task_result}') result = AsyncResult(task_id) if not result.successful(): return HttpResponseBadRequest(f"Task with id {task_id} is not successful") return HttpResponse('Task completed')
这个视图函数的作用是获取Webhooks请求的payload中的任务ID和结果,并将它们发送到Celery任务队列中,最后返回一个HTTP响应。
完整代码示例:
from django.http import HttpResponseBadRequest from django.views.decorators.csrf import csrf_exempt from celery.result import AsyncResult from django_webhooks import api, receive_webhook, send_webhook from celery import shared_task import time # Celery配置 CELERY_TASK_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['application/json'] CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 定义Celery的异步任务 @shared_task def print_info(info): time.sleep(2) print(info) return 'info printed' # 在Webhooks中创建触发器 api.create_trigger( name='demo trigger', url='http://localhost:8000/hooks', event='task_complete', payload='{"task_id": "{{ task_id }}", "task_result": "{{ task_result }}", "info": "pidancode.com"}' ) # 创建Webhooks视图函数 @csrf_exempt @receive_webhook(event="task_complete") def handle_task_complete(request, trigger, webhook): task_id = webhook.payload['task_id'] task_result = webhook.payload['task_result'] print(f'task completed with id {task_id} and result {task_result}') result = AsyncResult(task_id) if not result.successful(): return HttpResponseBadRequest(f"Task with id {task_id} is not successful") return HttpResponse('Task completed') # 触发异步任务并等待任务完成 task = print_info.delay('pidancode.com') print(task.id) task.wait() # 发送Webhooks事件 send_webhook('http://localhost:8000/hooks', 'task_complete', {'task_id': task.id, 'task_result': task.result})
相关文章