laravel集成workerman实现websocket多端及时通讯代码示例
1、创建启动命令文件
在 app/Console/Commands/ 目录下创建 WorkerMan.php
namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerMan extends Command
{
protected $signature = 'wk {action} {--d}';
protected $description = 'Start a Workerman server.';
public function handle()
{
global $argv;
$action = $this->argument('action');
$argv[0] = 'wk';
$argv[1] = $action;
$argv[2] = $this->option('d') ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
$workerPath = storage_path('workerman/');
if (!is_dir($workerPath))
mkdir($workerPath, 0755, true);
Worker::$pidFile = $workerPath . config('app.name') . '_workman.pid';
$logPath = $workerPath . date('Ym') . '/';
if (!is_dir($logPath))
mkdir($logPath, 0755, true);
Worker::$logFile = $logPath . date('d') . '.log';
Worker::runAll();
}
private function startBusinessWorker()
{
$worker = new BusinessWorker();
$worker->name = 'BusinessWorker';
$worker->count = 3;
$worker->registerAddress = '127.0.0.1:1236';
$worker->eventHandler = \App\Workerman\Events::class;
}
private function startGateWay()
{
//http://doc.workerman.net/faq/secure-websocket-server.html
// 证书最好是申请的证书
// $context = array(
// // 更多ssl选项请参考手册 http://php.net/manual/zh/context.ssl.php
// 'ssl' => array(
// 请使用绝对路径
// 'local_cert' => '/www/wwwroot/cattle_car.com/public/ssl/4035525_niuniu.micropig.cn.pem', // 也可以是crt文件
//'local_pk' => '/www/wwwroot/cattle_car.com/public/ssl/4035525_niuniu.micropig.cn.key',
// 'local_cert' => '/www/server/panel/vhost/cert/cattle_car.com/fullchain.pem',
// 'local_pk' => '/www/server/panel/vhost/cert/cattle_car.com/privkey.pem',
// 'verify_peer' => false,
// 'verify_peer_name' => false,
// 'allow_self_signed' => true, //如果是自签名证书需要开启此选项
// )
// );
//$gateway = new Gateway("websocket://0.0.0.0:2346", $context);
$gateway = new Gateway("websocket://0.0.0.0:2346");
//$gateway->transport = 'ssl';
$gateway->name = 'Gateway';
$gateway->count = 1;
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2300;
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 0;
$gateway->pingData = '{"type":"ping"}';
$gateway->registerAddress = '127.0.0.1:1236';
}
private function startRegister()
{
new Register('text://127.0.0.1:1236');
}
}
启动命令
[[email protected]]# php artisan wk start
守护模式启动
[[email protected]]# php artisan wk start --d
其它命令:
查看状态:status
停止: stop
重载: reload
重启: restart [–d]
2、创建 windows 环境启动命令文件
由于 windows 不支持批量启动服务,所以每个服务需要单独启动
namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerManWin extends Command
{
//兼容win
protected $signature = 'wk
{action : action}
{--start=all : start}
{--d : daemon mode}';
protected $description = 'Start a Workerman server.';
public function handle()
{
global $argv;
$action = $this->argument('action');
//针对 Windows 一次执行,无法注册多个协议的特殊处理
if ($action === 'single') {
$start = $this->option('start');
if ($start === 'register') {
$this->startRegister();
} elseif ($start === 'gateway') {
$this->startGateWay();
} elseif ($start === 'worker') {
$this->startBusinessWorker();
}
Worker::runAll();
return;
}
$argv[1] = $action;
$argv[2] = $this->option('d') ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
Worker::runAll();
}
private function startBusinessWorker()
{
$worker = new BusinessWorker();
$worker->name = 'BusinessWorker';
$worker->count = 1;
$worker->registerAddress = '127.0.0.1:1236';
$worker->eventHandler = \App\Workerman\Events::class;
}
private function startGateWay()
{
$gateway = new Gateway("websocket://0.0.0.0:2346");
$gateway->name = 'Gateway';
$gateway->count = 1;
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2300;
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 0;
$gateway->pingData = '{"type":"ping"}';
$gateway->registerAddress = '127.0.0.1:1236';
}
private function startRegister()
{
new Register('text://0.0.0.0:1236');
}
}
启动命令
[[email protected]]# php artisan wk single --start=gateway
[[email protected]]# php artisan wk single --start=worker
[[email protected]]# php artisan wk single --start=register
3、定义事件类
在 app/Workerman/ 目录下创建 Events.php
namespace App\Workerman;
use App\Helpers\Jwt;
use App\Models\UserModel;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Lib\Gateway;
use Illuminate\Support\Facades\Log;
use Workerman\Lib\Timer;
class Events
{
/**
* 业务服务启动事件
* @param BusinessWorker $businessWorker
* @return void
*/
public static function onWorkerStart(BusinessWorker $businessWorker)
{
self::log(__FUNCTION__, $businessWorker->workerId);
Timer::add(1, function () use ($businessWorker) {
$time_now = time();
foreach ($businessWorker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > 30) {
if ($connection->id) {
//todo
}
//断开后的回调
echo "Client ip {$connection->getRemoteIp()} timeout!!!\n";
$connection->close();
}
}
});
}
/**
* 客户端连接事件
* @param string $clientId
* @return void
*/
public static function onConnect(string $clientId)
{
self::log(__FUNCTION__, $clientId);
}
/**
* 客户端websocket 连接事件
* @param string $clientId
* @param mixed $data
* @return void
*/
public static function onWebSocketConnect(string $clientId, $data)
{
self::log(__FUNCTION__, $clientId, $data);
}
/**
* 客户端websocket消息
* @param string $clientId
* @param string $messageJson
* @return void
*/
public static function onMessage(string $clientId, string $messageJson)
{
self::log(__FUNCTION__, $clientId, $messageJson);
$message = json_decode($messageJson);
if (empty($message->type)) {
self::sendMessage(500, '请配置type');
return;
}
switch ($message->type) {
case 'login':
// 登录业务
break;
case 'ping':
self::sendMessage(201, 'pong');
break;
default:
self::sendMessage(500, '消息类型不支持');
}
}
/**
* 关闭客户端websocket
* @param string $clientId
* @return void
*/
public static function onClose(string $clientId)
{
self::log(__FUNCTION__, $clientId);
Gateway::destoryClient($clientId);
}
/**
* 写日志
* @param string $title
* @param $data
* @return void
*/
protected static function log(string $title, ...$data): void
{
if (config('app.debug')) {
var_dump("========== {$title} ==========");
var_dump($data);
Log::info("{$title} | " . json_encode($data, 256));
}
}
/**
* 发送客户端消息
* @param int $code
* @param mixed $message
* @param array|null $data
* @param string $clientId
* @return void
*/
protected static function sendMessage(int $code, $message, ?array $data = null, string $clientId = ''): void
{
$sendMessage = json_encode([
'code' => $code,
'msg' => $message,
'data' => $data,
]);
if ($clientId)
Gateway::sendToClient($clientId, $sendMessage);
else
Gateway::sendToCurrentClient($sendMessage);
}
}
测试可以在:www.websocket-test.com/ 连接服务并接收消息
4、js 连接 websocket 服务
let ws = new WebSocket('ws://192.168.0.100:2346');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
console.log('ws连接状态:' + ws.readyState);
//连接成功则发送登录请求
let message = {type: "login", token: "JWT授权码"};
ws.send(JSON.stringify(message));
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
console.log('接收到来自服务器的消息:');
console.log(data);
//完成通信后关闭WebSocket连接
ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
// 监听整个过程中websocket的状态
console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
console.log(error);
}
5、后端向客户端发送消息
\GatewayClient\Gateway::$registerAddress = '127.0.0.1:1236';
Gateway::sendToUid(29, '{"type": "update", "data": {"name": "张三", "aratar": "..."}}');
注意:
此处是单向 api 方式发送消息,registerAddress 地址与启动服务注册的 registerAddress 地址保持对应,如本地可直接使用默认设置。内网调用时,服务与客户端使用内网 IP。
Gateway 服务端与客户端使用方法,请查阅官方手册:
www.workerman.net/doc/gateway-work
相关文章
Laravel框架中集成并使用workerman、websocket流程步骤
https://www.zongscan.com/demo333/89432.html
相关文章