如何在Django Celery中使用Webhooks

2023-04-11 00:00:00 django webhooks 如何在

使用Webhooks在Django Celery中可以实现异步任务的自动触发和执行,下面是详细的步骤和代码演示:

步骤一:安装和配置Celery和Django Webhooks

1.安装Celery和Django Webhooks:

pip install celery
pip install django-webhooks

2.在Django的settings.py文件中配置Celery:

# Celery配置
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

3.在Django的settings.py文件中配置Webhooks:

WEBHOOKS_CONFIG = {
    'WEBHOOKS_BACKEND': 'django_webhooks.backends.redis.RedisWebhookBackend',
    'WEBHOOKS_BACKEND_OPTIONS': {
        'url': 'redis://localhost:6379/0',
        'channel': 'webhooks'
    }
}

步骤二:定义Celery的异步任务并在Webhooks中创建触发器

1.定义Celery的异步任务:

from celery import shared_task
import time

@shared_task
def print_info(info):
    time.sleep(2)
    print(info)
    return 'info printed'

2.在Webhooks中创建触发器:

from django_webhooks import api

api.create_trigger(
    name='demo trigger',
    url='http://localhost:8000/hooks',
    event='task_complete',
    payload='{"task_id": "{{ task_id }}", "task_result": "{{ task_result }}", "info": "pidancode.com"}'
)

上面的代码示例中,我们创建了一个名为“demo trigger”的Webhooks触发器,它会在Celery任务完成时触发事件,发送一个包含任务ID、任务结果和信息“pidancode.com”的payload。

步骤三:创建Webhooks视图函数

我们需要创建一个Django视图函数来处理Webhooks事件的请求,从请求的payload中获取任务ID和结果,并将它们发送到Celery任务队列中。

from django.http import HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt
from celery.result import AsyncResult
from django_webhooks import receive_webhook

@csrf_exempt
@receive_webhook(event="task_complete")
def handle_task_complete(request, trigger, webhook):
    task_id = webhook.payload['task_id']
    task_result = webhook.payload['task_result']
    print(f'task completed with id {task_id} and result {task_result}')

    result = AsyncResult(task_id)
    if not result.successful():
        return HttpResponseBadRequest(f"Task with id {task_id} is not successful")

    return HttpResponse('Task completed')

这个视图函数的作用是获取Webhooks请求的payload中的任务ID和结果,并将它们发送到Celery任务队列中,最后返回一个HTTP响应。

完整代码示例:

from django.http import HttpResponseBadRequest
from django.views.decorators.csrf import csrf_exempt
from celery.result import AsyncResult
from django_webhooks import api, receive_webhook, send_webhook
from celery import shared_task
import time

# Celery配置
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# 定义Celery的异步任务
@shared_task
def print_info(info):
    time.sleep(2)
    print(info)
    return 'info printed'

# 在Webhooks中创建触发器
api.create_trigger(
    name='demo trigger',
    url='http://localhost:8000/hooks',
    event='task_complete',
    payload='{"task_id": "{{ task_id }}", "task_result": "{{ task_result }}", "info": "pidancode.com"}'
)

# 创建Webhooks视图函数
@csrf_exempt
@receive_webhook(event="task_complete")
def handle_task_complete(request, trigger, webhook):
    task_id = webhook.payload['task_id']
    task_result = webhook.payload['task_result']
    print(f'task completed with id {task_id} and result {task_result}')

    result = AsyncResult(task_id)
    if not result.successful():
        return HttpResponseBadRequest(f"Task with id {task_id} is not successful")

    return HttpResponse('Task completed')

# 触发异步任务并等待任务完成
task = print_info.delay('pidancode.com')
print(task.id)
task.wait()

# 发送Webhooks事件
send_webhook('http://localhost:8000/hooks', 'task_complete', {'task_id': task.id, 'task_result': task.result})

相关文章