在Django中使用WebSockets和Redis

2023-04-11 00:00:00 django redis WebSockets

使用WebSockets和Redis可以实现实时通信,例如实时聊天或实时更新数据。在Django中,可以使用Django Channels来处理WebSockets。

下面是详细的步骤:

  1. 安装Django Channels和Redis

使用pip安装Django Channels和redis-py:

pip install channels
pip install channels_redis
  1. 配置Django Channels

添加channels和channels_redis到INSTALLED_APPS:

INSTALLED_APPS = [
    # ...
    'channels',
    'channels_redis',
]

添加ASGI应用程序到settings.py:

ASGI_APPLICATION = 'myproject.routing.application'

创建一个myproject/routing.py文件,并添加以下内容:

from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import path
from . import consumers

websocket_urlpatterns = [
    path('ws/', consumers.MyConsumer.as_asgi()),
]

application = ProtocolTypeRouter({
    "websocket": URLRouter(
        websocket_urlpatterns
    ),
})

这个文件定义了一个WebSocket路由,它将所有的WebSocket请求发送到MyConsumer处理程序。

  1. 创建一个WebSocket消费者

创建一个新文件myapp/consumers.py,并添加以下内容:

from channels.generic.websocket import AsyncWebsocketConsumer
import asyncio
import random
import string
import json
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

class MyConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = 'pidancode.com'
        self.room_group_name = 'chat_%s' % self.room_name
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    async def chat_message(self, event):
        message = event['message']

        # simulate some processing time
        sleep_time = random.randint(1,5)
        await asyncio.sleep(sleep_time)

        pidan_text = '皮蛋编程' * random.randint(1, 10)
        r.set('pidan', pidan_text)

        pidan = r.get('pidan').decode('utf-8')

        await self.send(text_data=json.dumps({
            'message': message,
            'pidan': pidan
        }))

这个文件定义了一个名为MyConsumer的WebSocket消费者,它处理WebSocket连接、断开连接和接收消息。

在connect方法中,我们将连接添加到Redis组中。

在disconnect方法中,我们将连接从Redis组中删除。

在receive方法中,我们将接收到的消息广播给Redis组中的所有连接。

在chat_message方法中,我们从event中获取消息并将其发送回连接,同时引入了Redis的使用。

  1. 运行服务器

确保Redis服务器正在运行。运行Django服务器:

python manage.py runserver
  1. 测试WebSocket连接

在浏览器中打开http://127.0.0.1:8000/ws/,应该会看到“WebSocket连接已经建立”在控制台中输出。打开控制台并输入以下代码:

const chatSocket = new WebSocket('ws://127.0.0.1:8000/ws/');
chatSocket.onmessage = function(e) {
    const data = JSON.parse(e.data);
    console.log(data.message);
    console.log(data.pidan);
};
chatSocket.onclose = function(e) {
    console.error('Chat socket closed unexpectedly');
};

然后在控制台中输入以下代码:

chatSocket.send(JSON.stringify({
    'message': 'Hello, world!'
}));

在控制台中应该会看到“Hello, world!”以及一个随机生成的字符串。因为我们在MyConsumer中引入了Redis,所以每次收到新消息时,都会在Redis中设置一个新的字符串。

相关文章