项目对数据的实时性要求很高,所以必须得使用websocket,PHP常见的websocket框架有workermanswoole,前者是纯PHP写的,较为轻量,后者的底层是C语言,功能和性能较强。

因为是小项目,于是使用了workerman,并将其整合进Laravel项目中。

workerman安装

composer安装

composer 安装wokerman

composer require workerman/workerman

创建artisan命令

创建一个 artisan 命令行工具来启动websocket服务端,在app/Console/Commands目录下建立命令行文件WorkermanServer.php

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Workerman\Worker;

class WorkermanServer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'workerman:command {action} {-d}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Workerman Server';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $worker = new Worker("websocket://127.0.0.1:2000");
        $workerid = null;

        // 启动4个进程对外提供服务
        $worker->count = 4;

        // 进程启动后设置一个每秒运行一次的定时器
        $worker->onWorkerStart = function ($worker) {
            $worker->onConnect = function ($connection) {

            };
            $worker->onMessage = function ($connection, $data) {

            };
            $worker->onClose = function ($connection) {

            };
        };

        // 运行worker
        Worker::runAll();
    }
}

使用命令php artisan workerman:command start d开启workerman

开启workerman.png

注意!如果是作为服务端监听的话要用websocket://,作为客户端采用ws://

Unkown command 错误

如果出现以下错误

Unkown command 错误.png

打开/vendor/workerman/workerman/Worker.php,找到parseCommand()函数。

parseCommand.png

将所有的$argv数组的键值加一,上图中是我已经加过之后的。报这个错是因为workermanartisan作为文件,wokerman:command作为参数了,不过这个错我目前只在ubantu上发现过,windows下没有问题。

业务逻辑文件夹

不可能把所有的业务逻辑都写在一个文件里,因此,创建一个文件夹app/WebSocket,存放处理业务逻辑的文件。

处理业务逻辑.png

  • Dispatch 判断并分发请求数据到对应的Event、Request、Subscribe
  • Event 处理来自Redis的事件(与http请求通信)
  • Request 处理请求
  • Subscribe 处理订阅
  • WS websocket工具存放一些公共方法

JSON-RPC

不同公司不同项目可能都有不同的规范,对于websocket,我觉得JSON-RPC是很好的规范。我们对其规范中的method又加以了简单的规范。

// 发起请求
{
  method: request.{请求方法名},
  params: [],
  id: 0,
  jsonrpc: 2.0
}

// 发起订阅
{
  method: subscribe.{订阅内容},
  params: [],
  id: 0,
  jsonrpc: 2.0
}

代码实现

WorkermanServer.php

<?php

namespace App\Console\Commands;

use App\WebSocket\Dispatch;
use App\WebSocket\WS;
use Illuminate\Console\Command;
use Workerman\Worker;
use Workerman\Lib\Timer;

// composer require clue/redis-react 这是异步redis组件,不能使用laravel封装的redis,因为它是同步阻塞的
use Clue\React\Redis\Factory;
use Clue\React\Redis\Client;

class WorkermanServer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'workerman:command {action} {-d}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Workerman Server';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        define('HEARTBEAT_TIME', 60); // 心跳时间

        $worker = new Worker("websocket://127.0.0.1:2000");
        $workerid = null;

        // 启动4个进程对外提供服务
        $worker->count = 4;

        // 进程启动后设置一个每秒运行一次的定时器用于断开无心跳的连接
        $worker->onWorkerStart = function ($worker) {
            $workerid = $worker->id;
            Timer::add(1, function () use ($worker) {
                $time_now = time();
                foreach ($worker->connections as $connection) {
                    // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                    if (empty($connection->lastMessageTime)) {
                        $connection->lastMessageTime = $time_now;
                        continue;
                    }
                    // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                    if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
                        $connection->close();
                    }
                }
            });

            // 订阅redis频道,与http通信
            $loop = Worker::getEventLoop();
            $factory = new Factory($loop);

            $factory->createClient('localhost:6379?password=********&timeout=-1')->then(function (Client $client) use ($worker) {
                if ($client->subscribe('workerman_channel_event')) {
                    $client->on('message', function ($channel, $message) use ($worker) {
                        if ($channel == 'winplay_database_winplay') {
                            Dispatch::onRedis($worker, $message);
                        }
                    });
                }
            });


            $worker->onConnect = function ($connection) {
                $connection->lastMessageTime = time();
            };
            $worker->onMessage = function ($connection, $data) use ($worker) {
                Dispatch::onMessage($connection, $worker, $data);
            };
            $worker->onClose = function ($connection) use ($worker) {
                // 删除所有订阅
                WS::delSubscribe($connection, $worker);
            };
        };

        // 运行worker
        Worker::runAll();
    }
}

Dispatch.php

<?php

namespace App\WebSocket;

use App\Utils\JSONrpc;

class Dispatch
{

    // 来自客户端的请求
    public static function onMessage($connection, $worker, $message)
    {
        if (!JSONrpc::parse($message)) {
            JSONrpc::send($connection, JSONrpc::error(-32700));
            return false;
        }
        $message = \json_decode($message, true);
        if (!array_key_exists(0, $message)) {
            $message = [0 => $message];
        }
        $res = [];
        foreach ($message as $data) {
            $method = explode('.', $data['method']);
            $params = [];
            $id = null;
            if (array_key_exists('params', $data)) {
                $params = $data['params'];
            }
            if (array_key_exists('id', $data)) {
                $id = $data['id'];
            }

            if ($method[0] == 'request') {
                $res[] = self::handelRequest($method[1], $params, $id, $worker, $connection);

            } else if ($method[0] == 'subscribe') {
                $res[] = self::handelSubscribe($method[1], $params, $id, $worker, $connection);

            } else if ($method[0] == 'ping') {
                $res[] = self::handelHeartbeat($connection, $id);

            } else {
                $res[] = JSONrpc::error(-32600);
            }
        }

        if (count($res) == 1) {
            JSONrpc::send($connection, $res[0]);
        } else {
            JSONrpc::send($connection, $res);
        }

    }

    // 来自redis的事件
    public static function onRedis($worker, $message)
    {
        $message = json_decode($message, true);
        if (!array_key_exists('event', $message)) {
            return false;
        }
        $event = $message['event'];
        $params = [];
        if (array_key_exists('params', $message)) {
            $params = $message['params'];
        }

        self::handelEvent($worker, $event, $params);
    }

    // 处理请求
    private static function handelRequest($method, $params, $id, $worker, $connection)
    {
        $request = new Request();
        if (!method_exists($request, $method)) {
            return JSONrpc::error(-32601, $id);
        }
        $result = call_user_func_array(array($request, $method), array($params, $id));

        return $result;

    }

    // 处理订阅
    private static function handelSubscribe($method, $params, $id, $worker, $connection)
    {
        $result = 0;

        $subscribe = new Subscribe();
        if (!method_exists($subscribe, $method)) {
            return JSONrpc::error(-32601, $id);
        }

        $result = call_user_func_array(array($subscribe, $method), array($params, $id, $worker, $connection));

        return $result;

    }

    // 处理心跳
    private static function handelHeartbeat($connection, $id)
    {
        $connection->lastMessageTime = time();
        return JSONrpc::result('pong', $id);
    }

    // 处理事件
    private static function handelEvent($worker, $method, $params)
    {
        $event = new Event();
        if (!method_exists($event, $method)) {
            return false;
        }
        if (call_user_func_array(array($event, $method), array($worker, $params))) {
            return true;
        }
    }

}

WS.php

<?php

namespace App\WebSocket;

use Illuminate\Support\Facades\Redis;

class WS
{

    private static $subscribe = []; // 存放客户端的订阅信息,建议使用redis

    public static function publish($event, $params = [])
    {
        Redis::publish('event', \json_encode([
            'event' => $event,
            'params' => $params,
        ]));
    }

    // 增加客户端的订阅
    public static function addSubscribe($worker, $connection, $sub)
    {
        $index = $worker->id . '-' . $connection->id;
        if (!array_key_exists($index, self::$subscribe)) {
            self::$subscribe[$index] = [];
        }
        self::$subscribe[$index][] = $sub;
    }

    // 删除客户端的订阅
    public static function delSubscribe($worker, $connection, $sub = null)
    {
        $index = $worker->id . '-' . $connection->id;
        if (array_key_exists($index, self::$subscribe)) {
            if ($sub) {
                unset(self::$subscribe[$index][$sub]);
            } else {
                unset(self::$subscribe[$index]);
            }
        }
    }

    // 判断客户端是否订阅对应内容
    public static function isSubscribe($worker, $connection, $sub)
    {
        $index = $worker->id . '-' . $connection->id;
        if (!self::$subscribe) {
            return false;
        }
        if (!array_key_exists($index, self::$subscribe)) {
            return false;
        }
        if (!in_array($sub, self::$subscribe[$index])) {
            return false;
        }
        return true;
    }
}

JSONrpc.php

这是一个工具类,方便将数据格式化为JSON-RPC规范

<?php
namespace App\Utils;

class JSONrpc
{

    /**
     * 校验jsonrpc格式
     */
    public static function parse($json)
    {
        if (!$json) {
            return false;
        }

        if (gettype($json) == 'string') {
            $json = json_decode($json, true);
            if (!$json) {
                return false;
            }
        }
        if (!array_key_exists(0, $json)) {
            $json = [0 => $json];
        }
        foreach ($json as $value) {
            if (!array_key_exists('jsonrpc', $value)) {
                return false;
            }
            if ($value['jsonrpc'] != '2.0') {
                return false;
            }
            if (!array_key_exists('method', $value)) {
                return false;
            }
            if ($value['method'] != 'ping' && !array_key_exists('params', $value)) {
                return false;
            }

        }
        return true;
    }

    /**
     * @param method 请求方法
     * @param params 携带参数 可选
     * @param id 请求id 可选
     */
    public static function method($method, $params = [], $id = 'empty')
    {
        $res = [
            'jsonrpc' => '2.0',
            'method' => $method,
            'params' => $params,
        ];
        if ($id != 'empty') {
            $res['id'] = $id;
        }
        return $res;
    }

    /**
     * @param result 返回结果
     * @param id 返回id
     */
    public static function result($result, $id)
    {
        $res = [
            'jsonrpc' => '2.0',
            'result' => $result,
            'id' => $id,
        ];
        return $res;
    }

    /**
     * @param code 错误代码
     * @param message 错误信息 可选
     * @param data 错误数据 可选
     */
    public static function error($code, $id = null, $message = '', $data = null)
    {
        $res = [
            'jsonrpc' => '2.0',
            'error' => [
                'code' => $code,
                'message' => $message,
            ],
            'id' => null,
        ];
        if ($data) {
            $res['error']['data'] = $data;
        }
        return $res;
    }

    /**
     * @param conn
     * @param jsonrpc
     */
    public static function send($conn, $jsonrpc)
    {
        $conn->send(\json_encode($jsonrpc));
    }
}

Event.php、Request.php、Subscribe.php

只需要在里面添加public static function就行了,Request.phpSubscribe.php的方法需要返回JSON-RPC规范格式的数据,例如:

<?php

namespace App\WebSocket;

use App\Utils\JSONrpc;
use App\Models\Order;

class Request {
    public function mineLotteryLog ($params, $id) {
        if (!array_key_exists('userid', $params)) {
            return JSONrpc::error(-32602, $id);
        }
        if (array_key_exists('page', $params)) {
            $page = $params['page'];
        } else{
            $page = 1;
        }
        $userid = $params['userid'];
        $result = Betting::getBetByUserid($userid, $page);
        return JSONrpc::result($result, $id);
    }
}
<?php

namespace App\WebSocket;

use App\Utils\JSONrpc;
use App\Models\Item;

class Subscribe
{
    public function itemInfo($param, $id, $worker, $connection)
    {
        WS::addSubscribe($worker, $connection, 'itemInfo'); // 添加订阅信息
        JSONrpc::send($connection, JSONrpc::method('update.itemInfo', Item::mainData(40))); // 订阅成功后返回首页全部数据,后续只推送更新的数据
        return JSONrpc::result(1, $id); // 返回订阅成功给前端
    }
}

与Http的通信

想象一个业务场景,一个用户购买了一件商品,然后所有人都得到推送,这件商品被买下。但因为一些原因购买商品只能通过http请求,这时就需要httpwebsocket的通信。

在对应的Controller中引入WS.php,使用WS::publish()就行了,然后在Event.php里添加相同名称的方法就行了。

WS::publish('updateItemInfo');
<?php

namespace App\WebSocket;

use App\Models\ItemInfo;
use App\Utils\JSONrpc;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Facades\Cache;

class Event
{
    public static function updateItemInfo($worker, $params)
    {
        if (!array_key_exists('itemName', $params)) {
			return false;
        }
		$itemName = $params['itemName'];
        $itemInfo = ItemInfo::itemDetail($itemName);
        foreach ($worker->connections as $connection) {
            if (WS::isSubscribe($worker, $connection, 'itemInfo')) {
                JSONrpc::send($connection, JSONrpc::method('update.itemInfo', $itemInfo));
            }
        }
    }
}

需要注意的是laravelredis配置,在config/database里找到redis=>options=>prefix,这是laravel框架使用redis时会在每个键名前加的前缀,记得和WorkermanServer.php中的配置保持一致。