laravel集成workerman实现websocket多端及时通讯代码示例

2023-06-01 00:00:00 代码 示例 集成

1、创建启动命令文件

在 app/Console/Commands/ 目录下创建 WorkerMan.php

namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerMan extends Command
{
    protected $signature = 'wk {action} {--d}';
    protected $description = 'Start a Workerman server.';
    public function handle()
    {
        global $argv;
        $action = $this->argument('action');
        $argv[0] = 'wk';
        $argv[1] = $action;
        $argv[2] = $this->option('d') ? '-d' : '';
        $this->start();
    }
    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        $workerPath = storage_path('workerman/');
        if (!is_dir($workerPath))
            mkdir($workerPath, 0755, true);
        Worker::$pidFile = $workerPath . config('app.name') . '_workman.pid';
        $logPath = $workerPath . date('Ym') . '/';
        if (!is_dir($logPath))
            mkdir($logPath, 0755, true);
        Worker::$logFile = $logPath . date('d') . '.log';
        Worker::runAll();
    }
    private function startBusinessWorker()
    {
        $worker = new BusinessWorker();
        $worker->name = 'BusinessWorker';
        $worker->count = 3;
        $worker->registerAddress = '127.0.0.1:1236';
        $worker->eventHandler = \App\Workerman\Events::class;
    }
    private function startGateWay()
    {
        //http://doc.workerman.net/faq/secure-websocket-server.html
        // 证书最好是申请的证书
//        $context = array(
//            // 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
//            'ssl' => array(
        // 请使用绝对路径
        // 'local_cert' => '/www/wwwroot/cattle_car.com/public/ssl/4035525_niuniu.micropig.cn.pem', // 也可以是crt文件
        //'local_pk' => '/www/wwwroot/cattle_car.com/public/ssl/4035525_niuniu.micropig.cn.key',
//                'local_cert' => '/www/server/panel/vhost/cert/cattle_car.com/fullchain.pem',
//                'local_pk' => '/www/server/panel/vhost/cert/cattle_car.com/privkey.pem',
//                'verify_peer' => false,
//                'verify_peer_name' => false,
        // 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
//            )
//        );
        //$gateway = new Gateway("websocket://0.0.0.0:2346", $context);
        $gateway = new Gateway("websocket://0.0.0.0:2346");
        //$gateway->transport = 'ssl';
        $gateway->name = 'Gateway';
        $gateway->count = 1;
        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2300;
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 0;
        $gateway->pingData = '{"type":"ping"}';
        $gateway->registerAddress = '127.0.0.1:1236';
    }
    private function startRegister()
    {
        new Register('text://127.0.0.1:1236');
    }
}

启动命令

[[email protected]]# php artisan wk start

守护模式启动

[[email protected]]# php artisan wk start --d

其它命令:

查看状态:status 
停止: stop 
重载: reload 
重启: restart [–d]


2、创建 windows 环境启动命令文件

由于 windows 不支持批量启动服务,所以每个服务需要单独启动

namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerManWin extends Command
{
    //兼容win
    protected $signature = 'wk
                            {action : action}
                            {--start=all : start}
                            {--d : daemon mode}';
    protected $description = 'Start a Workerman server.';
    public function handle()
    {
        global $argv;
        $action = $this->argument('action');
        //针对 Windows 一次执行,无法注册多个协议的特殊处理
        if ($action === 'single') {
            $start = $this->option('start');
            if ($start === 'register') {
                $this->startRegister();
            } elseif ($start === 'gateway') {
                $this->startGateWay();
            } elseif ($start === 'worker') {
                $this->startBusinessWorker();
            }
            Worker::runAll();
            return;
        }
        $argv[1] = $action;
        $argv[2] = $this->option('d') ? '-d' : '';
        $this->start();
    }
    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }
    private function startBusinessWorker()
    {
        $worker = new BusinessWorker();
        $worker->name = 'BusinessWorker';
        $worker->count = 1;
        $worker->registerAddress = '127.0.0.1:1236';
        $worker->eventHandler = \App\Workerman\Events::class;
    }
    private function startGateWay()
    {
        $gateway = new Gateway("websocket://0.0.0.0:2346");
        $gateway->name = 'Gateway';
        $gateway->count = 1;
        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2300;
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 0;
        $gateway->pingData = '{"type":"ping"}';
        $gateway->registerAddress = '127.0.0.1:1236';
    }
    private function startRegister()
    {
        new Register('text://0.0.0.0:1236');
    }
}

启动命令

[[email protected]]# php artisan wk single --start=gateway
[[email protected]]# php artisan wk single --start=worker
[[email protected]]# php artisan wk single --start=register


3、定义事件类

在 app/Workerman/ 目录下创建 Events.php

namespace App\Workerman;
use App\Helpers\Jwt;
use App\Models\UserModel;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Lib\Gateway;
use Illuminate\Support\Facades\Log;
use Workerman\Lib\Timer;
class Events
{
    /**
     * 业务服务启动事件
     * @param BusinessWorker $businessWorker
     * @return void
     */
    public static function onWorkerStart(BusinessWorker $businessWorker)
    {
        self::log(__FUNCTION__, $businessWorker->workerId);
        Timer::add(1, function () use ($businessWorker) {
            $time_now = time();
            foreach ($businessWorker->connections as $connection) {
                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                if ($time_now - $connection->lastMessageTime > 30) {
                    if ($connection->id) {
                        //todo
                    }
                    //断开后的回调
                    echo "Client ip {$connection->getRemoteIp()} timeout!!!\n";
                    $connection->close();
                }
            }
        });
    }
    /**
     * 客户端连接事件
     * @param string $clientId
     * @return void
     */
    public static function onConnect(string $clientId)
    {
        self::log(__FUNCTION__, $clientId);
    }
    /**
     * 客户端websocket 连接事件
     * @param string $clientId
     * @param mixed $data
     * @return void
     */
    public static function onWebSocketConnect(string $clientId, $data)
    {
        self::log(__FUNCTION__, $clientId, $data);
    }
    /**
     * 客户端websocket消息
     * @param string $clientId
     * @param string $messageJson
     * @return void
     */
    public static function onMessage(string $clientId, string $messageJson)
    {
        self::log(__FUNCTION__, $clientId, $messageJson);
        $message = json_decode($messageJson);
        if (empty($message->type)) {
            self::sendMessage(500, '请配置type');
            return;
        }
        switch ($message->type) {
            case 'login':
                // 登录业务
                break;
            case 'ping':
                self::sendMessage(201, 'pong');
                break;
            default:
                self::sendMessage(500, '消息类型不支持');
        }
    }
    /**
     * 关闭客户端websocket
     * @param string $clientId
     * @return void
     */
    public static function onClose(string $clientId)
    {
        self::log(__FUNCTION__, $clientId);
        Gateway::destoryClient($clientId);
    }
    /**
     * 写日志
     * @param string $title
     * @param $data
     * @return void
     */
    protected static function log(string $title, ...$data): void
    {
        if (config('app.debug')) {
            var_dump("========== {$title} ==========");
            var_dump($data);
            Log::info("{$title} | " . json_encode($data, 256));
        }
    }
    /**
     * 发送客户端消息
     * @param int $code
     * @param mixed $message
     * @param array|null $data
     * @param string $clientId
     * @return void
     */
    protected static function sendMessage(int $code, $message, ?array $data = null, string $clientId = ''): void
    {
        $sendMessage = json_encode([
            'code' => $code,
            'msg' => $message,
            'data' => $data,
        ]);
        if ($clientId)
            Gateway::sendToClient($clientId, $sendMessage);
        else
            Gateway::sendToCurrentClient($sendMessage);
    }
}

测试可以在:www.websocket-test.com/ 连接服务并接收消息


4、js 连接 websocket 服务

let ws = new WebSocket('ws://192.168.0.100:2346');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
    console.log('ws连接状态:' + ws.readyState);
    //连接成功则发送登录请求
 let message =  {type: "login", token: "JWT授权码"};
 ws.send(JSON.stringify(message));
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
    console.log('接收到来自服务器的消息:');
    console.log(data);
    //完成通信后关闭WebSocket连接
    ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
    // 监听整个过程中websocket的状态
    console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
    console.log(error);
}


5、后端向客户端发送消息

\GatewayClient\Gateway::$registerAddress = '127.0.0.1:1236';
        Gateway::sendToUid(29, '{"type": "update", "data": {"name": "张三", "aratar": "..."}}');

注意:

此处是单向 api 方式发送消息,registerAddress 地址与启动服务注册的 registerAddress 地址保持对应,如本地可直接使用默认设置。内网调用时,服务与客户端使用内网 IP。


Gateway 服务端与客户端使用方法,请查阅官方手册:

www.workerman.net/doc/gateway-work


相关文章

Laravel框架中集成并使用workerman、websocket流程步骤

https://www.zongscan.com/demo333/89432.html


相关文章