TP5.0.24+Workerman+定时任务

浏览:875 发布日期:2019/08/02 分类:功能实现 关键字: 定时任务 Workerman 聊天
TP5.0.24+Workerman+定时任务
优秀开源项目公众号+小程序:http://github.crmeb.net/u/liaofei

1.安装 Workerman
安装GatewayWorker内核文件(不包含start_gateway.php start_businessworker.php等启动入口文件),直接上composercomposer require workerman/gateway-worker2.创建 Workerman 启动文件
创建一个自定义命令类文件来启动 Socket 服务端,新建application/push/command/Workerman.phpnamespace app\push\command;

use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Workerman extends Command
{

    protected function configure()
    {
        $this->setName('workerman')
            ->addArgument('action', Argument::OPTIONAL, "action  start|stop|restart")
            ->addArgument('type', Argument::OPTIONAL, "d -d")
            ->setDescription('workerman chat');
    }

    protected function execute(Input $input, Output $output)
    {
        global $argv;
        $action = trim($input->getArgument('action'));
        $type   = trim($input->getArgument('type')) ? '-d' : '';

        $argv[0] = 'chat';
        $argv[1] = $action;
        $argv[2] = $type ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        // bussinessWorker 进程
        $worker = new BusinessWorker();
        // worker名称
        $worker->name = 'YourAppBusinessWorker';
        // bussinessWorker进程数量
        $worker->count = 4;
        //设置处理业务的类,此处制定Events的命名空间
        $worker->eventHandler= \app\push\controller\Events::class;
        // 服务注册地址
        $worker->registerAddress = '127.0.0.1:1238';
    }

    private function startGateWay()
    {
        // gateway 进程,这里使用Text协议,可以用telnet测试
        $gateway = new Gateway("websocket://0.0.0.0:8282");
        // gateway名称,status方便查看
        $gateway->name = 'YourAppGateway';
        // gateway进程数
        $gateway->count = 4;
        // 本机ip,分布式部署时使用内网ip
        $gateway->lanIp = '127.0.0.1';
        // 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
        // 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口
        $gateway->startPort = 20003;
        // 服务注册地址
        $gateway->registerAddress = '127.0.0.1:1238';
        // 心跳间隔
        $gateway->pingInterval = 55;
        $gateway->pingNotResponseLimit = 1;
        // 心跳数据
        $gateway->pingData = '';
    }

    private function startRegister()
    {
        new Register('text://0.0.0.0:1238');
    }
}
配置 application/command.php 文件return [
    'app\common\command\Workerman',
];
3.创建事件监听文件
创建 application/push/controller/Events.php 文件来监听处理 workerman 的各种事件<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;
use think\Hook;
use Workerman\Lib\Timer;

class Events
{

    //定时器间隔
    protected static $interval = 2;
    //定时器
    protected static $timer = null;
    //事件处理类
    protected static $evevtRunClass = \app\push\controller\EvevtRun::class;
    /*
     * 消息事件回调 class
     *
     * */
    protected static $eventClassName = \app\push\controller\Push::class;
    /**
     * 当客户端发来消息时触发
     * @param int $client_id 连接id
     * @param mixed $message 具体消息
     */
    public static function onMessage($client_id, $message)
    {
        $message_data = json_decode($message,true);
        if (!$message_data) return ;
        try{
            if(!isset($message_data['type'])) throw new \Exception('缺少消息参数类型');
            //消息回調处理
            $evevtName = self::$eventClassName.'::instance';
            if(is_callable($evevtName))
                $evevtName()->start($message_data['type'],$client_id,$message_data);
            else
                throw new \Exception('消息处理回调不存在。['+$evevtName+']');
        }catch (\Exception $e){
            var_dump([
                'file'=>$e->getFile(),
                'code'=>$e->getCode(),
                'msg'=>$e->getMessage(),
                'line'=>$e->getLine()
            ]);
        }
    }

    /**
     * 当用户连接时触发的方法
     * @param integer $client_id 连接的客户端
     * @return void
     */
    public static function onConnect($client_id)
    {
        Gateway::sendToClient($client_id, json_encode(array(
            'type'      => 'init',
            'client_id' => $client_id
        )));
    }

    /**
     * 当用户断开连接时触发的方法
     * @param integer $client_id 断开连接的客户端
     * @return void
     */
    public static function onClose($client_id)
    {
        Gateway::sendToClient($client_id,json_encode([
            'type'=>'logout',
            'message'=>"client[$client_id]"
        ]));
    }

    /**
     * 当进程启动时
     * @param integer $businessWorker 进程实例
     */
    public static function onWorkerStart($worker)
    {
        //在进程1上开启定时器 每self::$interval秒执行
        if($worker->id === 0){
            $last = time();
            $task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last];
            self::$timer = Timer::add(self::$interval, function() use(&$task) {
                try {
                    $now = time();
                    Hook::exec(self::$evevtRunClass);
                    foreach ($task as $sec => &$time) {
                        if (($now - $time) >= $sec) {
                            $time = $now;
                            Hook::exec(self::$evevtRunClass,'task_'.$sec);
                        }
                    }
                } catch (\Throwable $e) {}

            });
        }

    }

    /**
     * 当进程关闭时
     * @param integer $businessWorker 进程实例
     */
    public static function onWorkerStop($worker)
    {
        if($worker->id === 0) Timer::del(self::$timer);
    }
}
消息事件回调 class 方法里的处理根据自身情况编写<?php

namespace app\push\controller;

use app\wap\model\live\LiveUser;
use GatewayWorker\Lib\Gateway;
use app\wap\model\live\LiveHonouredGuest;
use app\wap\model\user\User;
use app\wap\model\live\LiveBarrage;

class Push
{

    /*
     * @var array 消息内容
     * */
    protected $message_data = [
        'type' => '',
        'message'=>'',
    ];
    /*
     * @var string 消息类型
     * */
    protected $message_type = '';
    /*
     * @var string $client_id
     * */
    protected $client_id    = '';
    /*
     * @var int 当前登陆用户
     * */
    protected $uid = null;
    /*
     * @var null 本类实例化结果
     * */
    protected static $instance = null;
    /*
     *
     * */
    protected function __construct($message_data = [])
    {

    }
    /*
     * 实例化本类
     * */
    public static function instance()
    {
        if(is_null(self::$instance)) self::$instance = new static();
        return self::$instance;
    }

    /*
     * 检测参数并返回
     * @param array || string $keyValue 需要提取的键值
     * @param null || bool $value
     * @return array;
     * */
    protected function checkValue($keyValue = null,$value = null)
    {
        if(is_null($keyValue))
            $message_data = $this->message_data;
        if(is_string($keyValue))
            $message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value);
        if(is_array($keyValue))
            $message_data = array_merge($keyValue,$this->message_data);
        if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){
            $newData = [];
            foreach ($keyValue as $key => $item){
                $newData [] = $message_data[$key];
            }
            return $newData;
        }
        return $message_data;
    }

    /*
     * 开始设置回调
     * @param string $typeFnName 回调函数名
     * @param string $client_id
     * @param array $message_data
     *
     * */
    public function start($typeFnName,$client_id,$message_data)
    {

        $this->message_type = $typeFnName;

        $this->message_data = $message_data;

        $this->client_id    = $client_id;

        $this->uid = Gateway::getUidByClientId($client_id);
        //记录用户上线
        if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room')))
        {
            LiveUser::setLiveUserOnline($live_id,$this->uid,1);
        }

        if(method_exists($this,$typeFnName))
            call_user_func([$this,$typeFnName]);
        else
            throw new \Exception('缺少回调方法');
    }

    /*
     * 心跳检测
     *
     * */
    protected  function ping()
    {
        return ;
    }

    /*
     * 绑定用户相应客户端
     * @param string $client_id
     * @param array $message_data
     * @return
     * */
    protected function handshake()
    {
        $message_data = $this->checkValue(['uid'=>0,'room'=>0]);
        if(!$message_data['uid'])  throw new \Exception("缺少用户uid,无法绑定用户");
        $new_message    = [
            'type'      => $this->message_type,
            'client_id' => $this->client_id,
            'time'      => date('H:i:s'),
            'msg'       => '绑定成功!'
        ];
        Gateway::bindUid($this->client_id,$message_data['uid']);

        //如果有群组id加入群组
        if($message_data['room']){
            // 加入某个群组(可调用多次加入多个群组) 将clientid加入roomid分组中
            Gateway::joinGroup($this->client_id, $message_data['room']);
        }

        Gateway::sendToClient($this->client_id, json_encode($new_message));
    }

    /*
     * 接受客户端发送的消息
     * @param string $client_id 客户端client_id
     * @param array $message_data 发送的数据
     * @return
     *
     * */
    protected function send()
    {
        list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true);
        $client_id      = $this->client_id;
        if(!$this->uid) {
            //认证用户信息失败,关闭用户链接
            Gateway::closeClient($client_id);
            throw new \Exception("缺少用户uid");
        }
        $userInfo = User::get($this->uid);
        if(!$userInfo){
            //认证用户信息失败,关闭用户链接
            Gateway::closeClient($client_id);
            throw new \Exception("用户信息缺少");
        }
        if($room && Gateway::getClientIdCountByGroup($room)){
            $user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type');
            if(is_null($user_type)) $user_type = 2;
            $res = LiveBarrage::set([
                'live_id'=>$room,
                'uid'=>$this->uid,
                'type'=>$type,
                'barrage'=>$message,
                'add_time'=>time(),
                'is_show'=>1
            ]);
            if(!$res) throw new \Exception("写入历史记录失败");
            Gateway::sendToGroup($room,json_encode([
                'message'=>$message,
                'm_type'=>$type,
                'type'=>'message',
                'user_type'=>$user_type,
                'userInfo'=>$userInfo,
                'id'=>$res['id']
            ]));
        }else{
            $new_message    = [
                'type'      => 'reception',
                'content'   => $message,
                'time'      => date('H:i:s'),
                'timestamp' => time(),
            ];
            if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message));
        }
    }

    /*
     * 消息撤回
     * @param string $client_id
     * @param array $message_data
     * */
    protected function recall()
    {
        list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true);

        if(!$id)
            throw new \Exception('缺少撤回消息的id');

        if(!$room)
            throw new \Exception('缺少房间号');

        if(LiveBarrage::del($id)){
            Gateway::sendToGroup($room,json_encode([
                'type'=>'recall',
                'id'=>$id
            ]),Gateway::getClientIdByUid($this->uid));
        }
    }

}
定时任务事件处理类 按照自身情况编写方法内逻辑<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;

/*
 * 定时任务
 *
 * */

class EvevtRun
{

    /*
     * 默认定时器执行事件
     * */
    public function run()
    {

    }

    /*
     * 每隔6秒执行
     * */
    public function task_6()
    {

    }

    /*
     * 每隔10秒执行
     * */
    public function task_10()
    {

    }

    /*
     * 每隔30秒执行
     * */
    public function task_30()
    {

    }

    /*
     * 每隔60秒执行
     * */
    public function task_60()
    {

    }
    /*
     * 每隔180秒执行
     * */
    public function task_180()
    {

    }

    /*
     * 每隔300秒执行
     * */
    public function task_300()
    {

    }

}
4.启动 Workerman 服务端
以debug(调试)方式启动以debug(调试)方式启动
php think workerman start
//以daemon(守护进程)方式启动
php think workerman start d
//停止
php think workerman stop
//重启
php think workerman restart
//平滑重启
php think workerman reload
//查看状态
php think workerman status

//当你看到如下结果的时候,workerman已经启动成功了。
Workerman[chat] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.11          PHP version:7.0.29
------------------------ WORKERS -------------------------------
user          worker          listen                    processes status
tegic         Gateway         websocket://0.0.0.0:8282   4         [OK]
tegic         BusinessWorker  none                       1         [OK]
tegic         Register        text://0.0.0.0:1236        4         [OK]
----------------------------------------------------------------
Press Ctrl+C to stop. Start success.
5.客户端连接使用

socket.ws.send()调用可发送消息,socket.onmessage 内是处理消息类型,即可实现长链接(function (global) {

    var socketDebug = window.socketDebug == undefined ? false : window.socketDebug;
    var socket = {
        ws:null,
        connect:function () {
            var that= this;
            that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//这里如果使用127.0.0.1或者localhost会出现连接失败。当时为了方便以后的维护,这里在php的全局文件里定义了一个常量来定义ip,后来本地开发完提交到linux服务器环境之后发现链接失败!按照此行代码会有效连接~
            that.ws.onopen = this.onopen;
            that.ws.onmessage = this.onmessage;
            that.ws.onclose = function(e) {
                socketDebug && console.log("连接关闭,定时重连");
                that.connect();
            };
            that.ws.onerror = function(e) {
                socketDebug && console.log("出现错误");
            };
        },
        onopen:function () {
            var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}';
            socket.ws.send(joint);
            socket.heartCheck.start();
        },
        sendMsg:function(content,type,id){
            socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}")
        },
        onmessage:function (e) {
            try {
                var data = JSON.parse(e.data);
                socketDebug && console.log(data)
                switch(data.type){
                    case 'init':

                        break;
                    // 服务端ping客户端
                    case 'ping':

                        break;
                    // 登录 更新用户列表
                    case 'handshake':

                        break;
                    // 提醒
                    case 'reception':

                        break;
                    //直播进行中
                    case 'live_ing':

                        break;
                    //直播结束
                    case 'live_end':

                        break;
                    //消息提醒
                    case 'message':

                        break;
                    //消息撤回
                    case 'recall':

                        break;
                    case 'ban':

                        break;
                }
            }catch (e) {
                socketDebug && console.info(e);
            }

        },
        heartCheck:{
            timeout: 3000,
            timeoutObj: null,
            start: function(){
                this.timeoutObj = setInterval(function(){
                    socket.ws.send("{'type':'ping'}");
                }, this.timeout);
            }
        }
    };

    window.onload=function () {
        socket.connect();
    };

    global.socket = socket;

    return socket
}(this));
注意在windows 上无法启动;




评论( 相关
后面还有条评论,点击查看>>