在Laravel中使用Workerman
项目对数据的实时性要求很高,所以必须得使用websocket
,PHP常见的websocket框架有workerman
和swoole
,前者是纯PHP写的,较为轻量,后者的底层是C语言,功能和性能较强。
因为是小项目,于是使用了workerman
,并将其整合进Laravel
项目中。
workerman安装
composer安装
composer 安装wokerman
composer require workerman/workerman
创建artisan命令
创建一个 artisan 命令行工具来启动websocket
服务端,在app/Console/Commands
目录下建立命令行文件WorkermanServer.php
。
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkermanServer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'workerman:command {action} {-d}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Workerman Server';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$worker = new Worker("websocket://127.0.0.1:2000");
$workerid = null;
// 启动4个进程对外提供服务
$worker->count = 4;
// 进程启动后设置一个每秒运行一次的定时器
$worker->onWorkerStart = function ($worker) {
$worker->onConnect = function ($connection) {
};
$worker->onMessage = function ($connection, $data) {
};
$worker->onClose = function ($connection) {
};
};
// 运行worker
Worker::runAll();
}
}
使用命令php artisan workerman:command start d
开启workerman
。
注意!如果是作为服务端监听的话要用websocket://
,作为客户端采用ws://
。
Unkown command 错误
如果出现以下错误
打开/vendor/workerman/workerman/Worker.php
,找到parseCommand()
函数。
将所有的$argv
数组的键值加一,上图中是我已经加过之后的。报这个错是因为workerman
将artisan
作为文件,wokerman:command
作为参数了,不过这个错我目前只在ubantu
上发现过,windows
下没有问题。
业务逻辑文件夹
不可能把所有的业务逻辑都写在一个文件里,因此,创建一个文件夹app/WebSocket
,存放处理业务逻辑的文件。
- Dispatch 判断并分发请求数据到对应的Event、Request、Subscribe
- Event 处理来自Redis的事件(与http请求通信)
- Request 处理请求
- Subscribe 处理订阅
- WS websocket工具存放一些公共方法
JSON-RPC
不同公司不同项目可能都有不同的规范,对于websocket
,我觉得JSON-RPC是很好的规范。我们对其规范中的method
又加以了简单的规范。
// 发起请求
{
method: request.{请求方法名},
params: [],
id: 0,
jsonrpc: 2.0
}
// 发起订阅
{
method: subscribe.{订阅内容},
params: [],
id: 0,
jsonrpc: 2.0
}
代码实现
WorkermanServer.php
<?php
namespace App\Console\Commands;
use App\WebSocket\Dispatch;
use App\WebSocket\WS;
use Illuminate\Console\Command;
use Workerman\Worker;
use Workerman\Lib\Timer;
// composer require clue/redis-react 这是异步redis组件,不能使用laravel封装的redis,因为它是同步阻塞的
use Clue\React\Redis\Factory;
use Clue\React\Redis\Client;
class WorkermanServer extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'workerman:command {action} {-d}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Workerman Server';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
define('HEARTBEAT_TIME', 60); // 心跳时间
$worker = new Worker("websocket://127.0.0.1:2000");
$workerid = null;
// 启动4个进程对外提供服务
$worker->count = 4;
// 进程启动后设置一个每秒运行一次的定时器用于断开无心跳的连接
$worker->onWorkerStart = function ($worker) {
$workerid = $worker->id;
Timer::add(1, function () use ($worker) {
$time_now = time();
foreach ($worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
$connection->close();
}
}
});
// 订阅redis频道,与http通信
$loop = Worker::getEventLoop();
$factory = new Factory($loop);
$factory->createClient('localhost:6379?password=********&timeout=-1')->then(function (Client $client) use ($worker) {
if ($client->subscribe('workerman_channel_event')) {
$client->on('message', function ($channel, $message) use ($worker) {
if ($channel == 'winplay_database_winplay') {
Dispatch::onRedis($worker, $message);
}
});
}
});
$worker->onConnect = function ($connection) {
$connection->lastMessageTime = time();
};
$worker->onMessage = function ($connection, $data) use ($worker) {
Dispatch::onMessage($connection, $worker, $data);
};
$worker->onClose = function ($connection) use ($worker) {
// 删除所有订阅
WS::delSubscribe($connection, $worker);
};
};
// 运行worker
Worker::runAll();
}
}
Dispatch.php
<?php
namespace App\WebSocket;
use App\Utils\JSONrpc;
class Dispatch
{
// 来自客户端的请求
public static function onMessage($connection, $worker, $message)
{
if (!JSONrpc::parse($message)) {
JSONrpc::send($connection, JSONrpc::error(-32700));
return false;
}
$message = \json_decode($message, true);
if (!array_key_exists(0, $message)) {
$message = [0 => $message];
}
$res = [];
foreach ($message as $data) {
$method = explode('.', $data['method']);
$params = [];
$id = null;
if (array_key_exists('params', $data)) {
$params = $data['params'];
}
if (array_key_exists('id', $data)) {
$id = $data['id'];
}
if ($method[0] == 'request') {
$res[] = self::handelRequest($method[1], $params, $id, $worker, $connection);
} else if ($method[0] == 'subscribe') {
$res[] = self::handelSubscribe($method[1], $params, $id, $worker, $connection);
} else if ($method[0] == 'ping') {
$res[] = self::handelHeartbeat($connection, $id);
} else {
$res[] = JSONrpc::error(-32600);
}
}
if (count($res) == 1) {
JSONrpc::send($connection, $res[0]);
} else {
JSONrpc::send($connection, $res);
}
}
// 来自redis的事件
public static function onRedis($worker, $message)
{
$message = json_decode($message, true);
if (!array_key_exists('event', $message)) {
return false;
}
$event = $message['event'];
$params = [];
if (array_key_exists('params', $message)) {
$params = $message['params'];
}
self::handelEvent($worker, $event, $params);
}
// 处理请求
private static function handelRequest($method, $params, $id, $worker, $connection)
{
$request = new Request();
if (!method_exists($request, $method)) {
return JSONrpc::error(-32601, $id);
}
$result = call_user_func_array(array($request, $method), array($params, $id));
return $result;
}
// 处理订阅
private static function handelSubscribe($method, $params, $id, $worker, $connection)
{
$result = 0;
$subscribe = new Subscribe();
if (!method_exists($subscribe, $method)) {
return JSONrpc::error(-32601, $id);
}
$result = call_user_func_array(array($subscribe, $method), array($params, $id, $worker, $connection));
return $result;
}
// 处理心跳
private static function handelHeartbeat($connection, $id)
{
$connection->lastMessageTime = time();
return JSONrpc::result('pong', $id);
}
// 处理事件
private static function handelEvent($worker, $method, $params)
{
$event = new Event();
if (!method_exists($event, $method)) {
return false;
}
if (call_user_func_array(array($event, $method), array($worker, $params))) {
return true;
}
}
}
WS.php
<?php
namespace App\WebSocket;
use Illuminate\Support\Facades\Redis;
class WS
{
private static $subscribe = []; // 存放客户端的订阅信息,建议使用redis
public static function publish($event, $params = [])
{
Redis::publish('event', \json_encode([
'event' => $event,
'params' => $params,
]));
}
// 增加客户端的订阅
public static function addSubscribe($worker, $connection, $sub)
{
$index = $worker->id . '-' . $connection->id;
if (!array_key_exists($index, self::$subscribe)) {
self::$subscribe[$index] = [];
}
self::$subscribe[$index][] = $sub;
}
// 删除客户端的订阅
public static function delSubscribe($worker, $connection, $sub = null)
{
$index = $worker->id . '-' . $connection->id;
if (array_key_exists($index, self::$subscribe)) {
if ($sub) {
unset(self::$subscribe[$index][$sub]);
} else {
unset(self::$subscribe[$index]);
}
}
}
// 判断客户端是否订阅对应内容
public static function isSubscribe($worker, $connection, $sub)
{
$index = $worker->id . '-' . $connection->id;
if (!self::$subscribe) {
return false;
}
if (!array_key_exists($index, self::$subscribe)) {
return false;
}
if (!in_array($sub, self::$subscribe[$index])) {
return false;
}
return true;
}
}
JSONrpc.php
这是一个工具类,方便将数据格式化为JSON-RPC规范
<?php
namespace App\Utils;
class JSONrpc
{
/**
* 校验jsonrpc格式
*/
public static function parse($json)
{
if (!$json) {
return false;
}
if (gettype($json) == 'string') {
$json = json_decode($json, true);
if (!$json) {
return false;
}
}
if (!array_key_exists(0, $json)) {
$json = [0 => $json];
}
foreach ($json as $value) {
if (!array_key_exists('jsonrpc', $value)) {
return false;
}
if ($value['jsonrpc'] != '2.0') {
return false;
}
if (!array_key_exists('method', $value)) {
return false;
}
if ($value['method'] != 'ping' && !array_key_exists('params', $value)) {
return false;
}
}
return true;
}
/**
* @param method 请求方法
* @param params 携带参数 可选
* @param id 请求id 可选
*/
public static function method($method, $params = [], $id = 'empty')
{
$res = [
'jsonrpc' => '2.0',
'method' => $method,
'params' => $params,
];
if ($id != 'empty') {
$res['id'] = $id;
}
return $res;
}
/**
* @param result 返回结果
* @param id 返回id
*/
public static function result($result, $id)
{
$res = [
'jsonrpc' => '2.0',
'result' => $result,
'id' => $id,
];
return $res;
}
/**
* @param code 错误代码
* @param message 错误信息 可选
* @param data 错误数据 可选
*/
public static function error($code, $id = null, $message = '', $data = null)
{
$res = [
'jsonrpc' => '2.0',
'error' => [
'code' => $code,
'message' => $message,
],
'id' => null,
];
if ($data) {
$res['error']['data'] = $data;
}
return $res;
}
/**
* @param conn
* @param jsonrpc
*/
public static function send($conn, $jsonrpc)
{
$conn->send(\json_encode($jsonrpc));
}
}
Event.php、Request.php、Subscribe.php
只需要在里面添加public static function
就行了,Request.php
和Subscribe.php
的方法需要返回JSON-RPC规范格式的数据,例如:
<?php
namespace App\WebSocket;
use App\Utils\JSONrpc;
use App\Models\Order;
class Request {
public function mineLotteryLog ($params, $id) {
if (!array_key_exists('userid', $params)) {
return JSONrpc::error(-32602, $id);
}
if (array_key_exists('page', $params)) {
$page = $params['page'];
} else{
$page = 1;
}
$userid = $params['userid'];
$result = Betting::getBetByUserid($userid, $page);
return JSONrpc::result($result, $id);
}
}
<?php
namespace App\WebSocket;
use App\Utils\JSONrpc;
use App\Models\Item;
class Subscribe
{
public function itemInfo($param, $id, $worker, $connection)
{
WS::addSubscribe($worker, $connection, 'itemInfo'); // 添加订阅信息
JSONrpc::send($connection, JSONrpc::method('update.itemInfo', Item::mainData(40))); // 订阅成功后返回首页全部数据,后续只推送更新的数据
return JSONrpc::result(1, $id); // 返回订阅成功给前端
}
}
与Http的通信
想象一个业务场景,一个用户购买了一件商品,然后所有人都得到推送,这件商品被买下。但因为一些原因购买商品只能通过http
请求,这时就需要http
与websocket
的通信。
在对应的Controller
中引入WS.php
,使用WS::publish()
就行了,然后在Event.php
里添加相同名称的方法就行了。
WS::publish('updateItemInfo');
<?php
namespace App\WebSocket;
use App\Models\ItemInfo;
use App\Utils\JSONrpc;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Cache;
class Event
{
public static function updateItemInfo($worker, $params)
{
if (!array_key_exists('itemName', $params)) {
return false;
}
$itemName = $params['itemName'];
$itemInfo = ItemInfo::itemDetail($itemName);
foreach ($worker->connections as $connection) {
if (WS::isSubscribe($worker, $connection, 'itemInfo')) {
JSONrpc::send($connection, JSONrpc::method('update.itemInfo', $itemInfo));
}
}
}
}
需要注意的是laravel
的redis
配置,在config/database
里找到redis
=>options
=>prefix
,这是laravel
框架使用redis
时会在每个键名前加的前缀,记得和WorkermanServer.php
中的配置保持一致。
Or you can contact me by Email