深入理解GatewayWorker框架_houzhyan-博客-CSDN博客_gatewayworker

$this->id

来源: 深入理解GatewayWorker框架_houzhyan-博客-CSDN博客_gatewayworker

原文地址:http://www.php-master.com/post/342621.html

序言

本文只是结合GatewayWorker和Workerman的官方文档和源码,深入了解执行过程。以便更深入的了解并使用

GatewayWorker基于Workerman开发的一个项目框架。Register进程负责保存Gateway进程和BusinessWorker进程的地址,建立两者的连接。Gateway进程负责维持客户端连接,并转发客户端的数据给BusinessWorker进程处理,BusinessWorker进程负责处理实际的业务逻辑(默认调用Events.php处理业务),并将结果推送给对应的客户端。Register、Gateway、BusinessWorker进程都是继承Worker类实现各自的功能,所以了解GatewayWorker框架的内部执行过程,需要优先理解Worker的内容

GatewayWorker目录结构

  1. ├── Applications // 这里是所有开发者应用项目
  2. │ └── YourApp // 其中一个项目目录,目录名可以自定义
  3. │ ├── Events.php // 开发者只需要关注这个文件
  4. │ ├── start_gateway.php // gateway进程启动脚本,包括端口 号等设置
  5. │ ├── start_businessworker.php // businessWorker进程启动 脚本
  6. │ └── start_register.php // 注册服务启动脚本
  7. ├── start.php // 全局启动脚本,此脚本会依次加载Applications/项目/start_*.php启动脚本
  8. └── vendor // GatewayWorker框架和Workerman框架源码目 录,此目录开发者不用关心

start.php 为启动脚本,在该脚本中,统一加载start_gateway.php start_businessworker.php start_register.php进程脚本,最后通过Worker::runAll();运行所有服务。

工作原理

  1. 1、Register、Gateway、BusinessWorker进程启动
  2. 2、Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己
  3. 3、Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中
  4. 4、Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker
  5. 5、BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway
  6. 6、如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接
  7. 7 、如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway
  8. 8、至此Gateway与BusinessWorker通过Register已经建立起长连接
  9. 9、客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。
  10. 10、BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件)

1 Register、Gateway、BusinessWorker进程启动

项目根目录下的start.php 为启动脚本,在该脚本中,加载start_gateway.php start_businessworker.php start_register.php进程脚本,完成各个服务的Worker初始化:

  1. // 加载所有Applications/*/start.php,以便启动所有服务
  2. foreach(glob(__DIR__.‘/Applications/*/start*.php’) as $start_file)
  3. {
  4. require_once $start_file;
  5. }

最后通过Worker::runAll();运行所有服务。

  1. // 运行所有服务
  2. Worker::runAll();

运行所有服务,先看一遍runAll()方法的执行内容

  1. public static function runAll()
  2. {
  3. // 检查运行环境
  4. self::checkSapiEnv();
  5. //初始化环境变量
  6. self::init();
  7. // 解析命令
  8. self::parseCommand();
  9. // 尝试以守护进程模式运行
  10. self::daemonize();
  11. // 初始化所有worker实例,主要是监听端口
  12. self::initWorkers();
  13. // 初始化所有信号处理函数
  14. self::installSignal();
  15. // 保存主进程pid
  16. self::saveMasterPid();
  17. // 展示启动界面
  18. self::displayUI();
  19. // 创建子进程(worker进程),然后给每个子进程绑定loop循环监听事件tcp
  20. self::forkWorkers();
  21. // 尝试重定向标准输入输出
  22. self::resetStd();
  23. // 监控所有子进程(worker进程)
  24. self::monitorWorkers();
  25. }

self::init()初始化环境变量中,有以下部分代码,保存$_idMap从PID映射到工作进程ID

  1. // Init data for worker id.
  2. self::initId();
  3. protected static function initId()
  4. {
  5. foreach (self::$_workers as $worker_id => $worker) {
  6. $new_id_map = array();
  7. for($key = 0; $key < $worker->count; $key++) {
  8. $new_id_map[$key] = isset(self::$_idMap[$worker_id] [$key]) ? self::$_idMap[$worker_id][$key] : 0;
  9. }
  10. self::$_idMap[$worker_id] = $new_id_map;
  11. }
  12. }

self::forkWorkers()方法通过循环self::$_workers数组,fork各自worker的count数量的进程。然后通过调用

$worker->run();

运行当前worker实例,在run方法中通过

  1. if (!self::$globalEvent) {
  2. $event_loop_class = self::getEventLoopName();
  3. self::$globalEvent = new $event_loop_class;
  4. // Register a listener to be notified when server socket is ready to read.
  5. if ($this->_socketName) {
  6. if ($this->transport !== ‘udp’) {
  7. self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
  8. array($this, ‘acceptConnection’));
  9. } else {
  10. self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
  11. array($this, ‘acceptUdpConnection’));
  12. }
  13. }

获取一个当前可用的事件轮询方式,然后根据当前的协议类型添加一个监听到事件轮询中
然后,尝试出发当前进程模型的onWorkerStart回调,此回调会在Gateway类以及BusinessWorker类中都会定义,代码

  1. if ($this->onWorkerStart) {
  2. try {
  3. call_user_func($this->onWorkerStart, $this);
  4. } catch (\Exception $e) {
  5. self::log($e);
  6. // Avoid rapid infinite loop exit.
  7. sleep(1);
  8. exit(250);
  9. } catch (\Error $e) {
  10. self::log($e);
  11. // Avoid rapid infinite loop exit.
  12. sleep(1);
  13. exit(250);
  14. }
  15. }

最后,执行事件的循环等待socket事件,处理读写等操作,代码

  1. // Main loop.
  2. self::$globalEvent->loop();

以上是runAll()方法的部分内容,会在了解GatewayWorker的工作原理的时候用到

2.1 Gateway进程向Register服务进程发起长连接注册自己

初始化Gateway

$gateway = new Gateway("text://0.0.0.0:8383");

在Gateway类中重写run方法,当调用runAll()方法启动进程时,fork进程之后,运行worker实例的时候,会调用到此重写的run方法

  1. public function run()
  2. {
  3. // 保存用户的回调,当对应的事件发生时触发
  4. $this->_onWorkerStart = $this->onWorkerStart;
  5. $this->onWorkerStart = array($this, ‘onWorkerStart’);
  6. // 保存用户的回调,当对应的事件发生时触发
  7. $this->_onConnect = $this->onConnect;
  8. $this->onConnect = array($this, ‘onClientConnect’);
  9. // onMessage禁止用户设置回调
  10. $this->onMessage = array($this, ‘onClientMessage’);
  11. // 保存用户的回调,当对应的事件发生时触发
  12. $this->_onClose = $this->onClose;
  13. $this->onClose = array($this, ‘onClientClose’);
  14. // 保存用户的回调,当对应的事件发生时触发
  15. $this->_onWorkerStop = $this->onWorkerStop;
  16. $this->onWorkerStop = array($this, ‘onWorkerStop’);
  17. $this->_startTime = time();
  18. // 运行父方法
  19. parent::run();
  20. }

定义了$this->onWorkerStart回调,

$this->onWorkerStart  = array($this, 'onWorkerStart');

 

 

执行到Worker类中的run()方法时,被触发。即,上边提到的

Worker脚本中的run方法

调用Gateway类中的onWorkerStart方法,代码

  1. public function onWorkerStart()
  2. {
  3. $this->lanPort = $this->startPort + $this->id;
  4. if ($this->pingInterval > 0) {
  5. $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
  6. Timer::add($timer_interval, array($this, ‘ping’));
  7. }
  8. if ($this->lanIp !== ‘127.0.0.1’) {
  9. Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, ‘pingBusinessWorker’));
  10. }
  11. if (strpos($this->registerAddress, ‘127.0.0.1’) !== 0) {
  12. Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, ‘pingRegister’));
  13. }
  14. if (!class_exists(‘\Protocols\GatewayProtocol’)) {
  15. class_alias(‘GatewayWorker\Protocols\GatewayProtocol’, ‘Protocols\GatewayProtocol’);
  16. }
  17. // 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
  18. $this->_innerTcpWorker = new Worker(“GatewayProtocol://{$this->lanIp}:{$this->lanPort});
  19. $this->_innerTcpWorker->listen();
  20. // 重新设置自动加载根目录
  21. Autoloader::setRootPath($this->_autoloadRootPath);
  22. // 设置内部监听的相关回调
  23. $this->_innerTcpWorker->onMessage = array($this, ‘onWorkerMessage’);
  24. $this->_innerTcpWorker->onConnect = array($this, ‘onWorkerConnect’);
  25. $this->_innerTcpWorker->onClose = array($this, ‘onWorkerClose’);
  26. // 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
  27. $this->registerAddress();
  28. if ($this->_onWorkerStart) {
  29. call_user_func($this->_onWorkerStart, $this);
  30. }
  31. }

$this->startPort : 内部通讯起始端口,假如$gateway->count=4,起始端口为4000,可在gateway启动脚本中自定义
$this->id : 基于worker实例分配的进程编号,当前从0开始,根据count自增。在fork进程的时候生成

Worker.php

$this->_innerTcpWorker:用于监听 worker 的连接已经连接上发来的数据。在工作原理5中,BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway以及其他两者之间的通信(连接,消息,关闭)会被调用
$this->registerAddress(): 代码中$this->registerAddress是在start_gateway.php初始化Gateway类之后定义的。该端口是Register进程所监听。此处异步的向Register进程发送数据,存储当前 Gateway 的内部通信地址

 

  1. public function registerAddress()
  2. {
  3. $address = $this->lanIp . ‘:’ . $this->lanPort;
  4. $this->_registerConnection = new AsyncTcpConnection(“text://{$this->registerAddress});
  5. $this->_registerConnection->send(‘{“event”:”gateway_connect”, “address”:”‘ . $address . ‘”, “secret_key”:”‘ . $this->secretKey . ‘”}’);
  6. $this->_registerConnection->onClose = array($this, ‘onRegisterConnectionClose’);
  7. $this->_registerConnection->connect();
  8. }

$this->lanIp: Gateway所在服务器的内网IP

2.2 BusinessWorker进程向Register服务进程发起长连接注册自己

BusinessWorker类中同样重写run方法,定义了$this->onWorkerStart

  1. public function run()
  2. {
  3. $this->_onWorkerStart = $this->onWorkerStart;
  4. $this->_onWorkerReload = $this->onWorkerReload;
  5. $this->_onWorkerStop = $this->onWorkerStop;
  6. $this->onWorkerStop = array($this, ‘onWorkerStop’);
  7. $this->onWorkerStart = array($this, ‘onWorkerStart’);
  8. $this->onWorkerReload = array($this, ‘onWorkerReload’);
  9. parent::run();
  10. }

执行Worker类中的run方法,触发BusinessWorker中的onWorkerStart

  1. protected function onWorkerStart()
  2. {
  3. if (!class_exists(‘\Protocols\GatewayProtocol’)) {
  4. class_alias(‘GatewayWorker\Protocols\GatewayProtocol’, ‘Protocols\GatewayProtocol’);
  5. }
  6. $this->connectToRegister();
  7. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  8. \GatewayWorker\Lib\Gateway::$secretKey = $this->secretKey;
  9. if ($this->_onWorkerStart) {
  10. call_user_func($this->_onWorkerStart, $this);
  11. }
  12. if (is_callable($this->eventHandler . ‘::onWorkerStart’)) {
  13. call_user_func($this->eventHandler . ‘::onWorkerStart’, $this);
  14. }
  15. if (function_exists(‘pcntl_signal’)) {
  16. // 业务超时信号处理
  17. pcntl_signal(SIGALRM, array($this, ‘timeoutHandler’), false);
  18. } else {
  19. $this->processTimeout = 0;
  20. }
  21. // 设置回调
  22. if (is_callable($this->eventHandler . ‘::onConnect’)) {
  23. $this->_eventOnConnect = $this->eventHandler . ‘::onConnect’;
  24. }
  25. if (is_callable($this->eventHandler . ‘::onMessage’)) {
  26. $this->_eventOnMessage = $this->eventHandler . ‘::onMessage’;
  27. } else {
  28. echo “Waring: {$this->eventHandler}::onMessage is not callable\n”;
  29. }
  30. if (is_callable($this->eventHandler . ‘::onClose’)) {
  31. $this->_eventOnClose = $this->eventHandler . ‘::onClose’;
  32. }
  33. // 如果Register服务器不在本地服务器,则需要保持心跳
  34. if (strpos($this->registerAddress, ‘127.0.0.1’) !== 0) {
  35. Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, ‘pingRegister’));
  36. }
  37. }

通过connectToRegister方法,发送数据到Register进程,连接服务注册中心

  1. public function connectToRegister()
  2. {
  3. $this->_registerConnection = new AsyncTcpConnection(“text://{$this->registerAddress});
  4. $this->_registerConnection->send(‘{“event”:”worker_connect”,”secret_key”:”‘ . $this->secretKey . ‘”}’);
  5. $this->_registerConnection->onClose = array($this, ‘onRegisterConnectionClose’);
  6. $this->_registerConnection->onMessage = array($this, ‘onRegisterConnectionMessage’);
  7. $this->_registerConnection->connect();
  8. }

3 Register服务收到Gateway的注册后,把所有的Gateway的通讯地址保存在内存中

在Register类中,重写了run方法,定义了当前的

  1. $this->onConnect = array($this, ‘onConnect’);
  2. // 设置 onMessage 回调
  3. $this->onMessage = array($this, ‘onMessage’);
  4. // 设置 onClose 回调
  5. $this->onClose = array($this, ‘onClose’);

三个属性,当Register启动的进程收到消息时,会触发onMessage方法

  1. public function onMessage($connection, $buffer)
  2. {
  3. // 删除定时器
  4. Timer::del($connection->timeout_timerid);
  5. $data = @json_decode($buffer, true);
  6. if (empty($data[‘event’])) {
  7. $error = “Bad request for Register service. Request info(IP:”.$connection->getRemoteIp().“, Request Buffer:$buffer). See http://wiki.workerman.net/Error4 for detail”;
  8. Worker::log($error);
  9. return $connection->close($error);
  10. }
  11. $event = $data[‘event’];
  12. $secret_key = isset($data[‘secret_key’]) ? $data[‘secret_key’] : ;
  13. // 开始验证
  14. switch ($event) {
  15. // 是 gateway 连接
  16. case ‘gateway_connect’:
  17. if (empty($data[‘address’])) {
  18. echo “address not found\n”;
  19. return $connection->close();
  20. }
  21. if ($secret_key !== $this->secretKey) {
  22. Worker::log(“Register: Key does not match “.var_export($secret_key, true).” !== “.var_export($this->secretKey, true));
  23. return $connection->close();
  24. }
  25. $this->_gatewayConnections[$connection->id] = $data[‘address’];
  26. $this->broadcastAddresses();
  27. break;
  28. // 是 worker 连接
  29. case ‘worker_connect’:
  30. if ($secret_key !== $this->secretKey) {
  31. Worker::log(“Register: Key does not match “.var_export($secret_key, true).” !== “.var_export($this->secretKey, true));
  32. return $connection->close();
  33. }
  34. $this->_workerConnections[$connection->id] = $connection;
  35. $this->broadcastAddresses($connection);
  36. break;
  37. case ‘ping’:
  38. break;
  39. default:
  40. Worker::log(“Register unknown event:$event IP: “.$connection->getRemoteIp().” Buffer:$buffer. See http://wiki.workerman.net/Error4 for detail”);
  41. $connection->close();
  42. }
  43. }

当$event = ‘gateway_connect’时,是Gateway发来的注册消息,保存到$this->_gatewayConnections数组中,在通过broadcastAddresses方法将当前$this->_gatewayConnections中所有的Gatewat通讯地址转发给所有BusinessWorker进程

4 Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker

 

 

同第3步中,Register类收到BusinessWorker的注册时,会触发onMessage方法中的worker_connect,case选项。

image.png

 

同时,将当前worker连接加入到$_workerConnections数组中,在通过broadcastAddresses方法将当前$this->_gatewayConnections中所有的Gatewat通讯地址转发给所有BusinessWorker进程。

5 BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway

在BusinessWoker类的启动中,通过重写run方法,定义的启动onWorkerStart方法中,通过connectToRegister方法注册服务中心的同时,也定义了onMessage匿名函数,用于接收消息回调。

$this->_registerConnection->onMessage = array($this, 'onRegisterConnectionMessage');

即,当注册中心发来消息时候,回调到此处

  1. public function onRegisterConnectionMessage($register_connection, $data)
  2. {
  3. $data = json_decode($data, true);
  4. if (!isset($data[‘event’])) {
  5. echo “Received bad data from Register\n”;
  6. return;
  7. }
  8. $event = $data[‘event’];
  9. switch ($event) {
  10. case ‘broadcast_addresses’:
  11. if (!is_array($data[‘addresses’])) {
  12. echo “Received bad data from Register. Addresses empty\n”;
  13. return;
  14. }
  15. $addresses = $data[‘addresses’];
  16. $this->_gatewayAddresses = array();
  17. foreach ($addresses as $addr) {
  18. $this->_gatewayAddresses[$addr] = $addr;
  19. }
  20. $this->checkGatewayConnections($addresses);
  21. break;
  22. default:
  23. echo “Receive bad event:$event from Register.\n”;
  24. }
  25. }

其中Register类发来的数据是

  1. $data = array(
  2. ‘event’ => ‘broadcast_addresses’,
  3. ‘addresses’ => array_unique(array_values($this->_gatewayConnections)),
  4. );

这个时候,就会通过checkGatewayConnections方法检查gateway的这些通信端口是否都已经连接,在通过tryToConnectGateway方法尝试连接gateway的这些内部通信地址

6 Gateway进程收到BusinessWorker进程的连接消息

同样,在Gateway进程启动的时候,触发的onWorkerStart方法中,也定义了一个内部通讯的onWorkerMessage

$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');

由此来接收BusinessWorker进程发来的连接消息,部分代码

  1. public function onWorkerMessage($connection, $data)
  2. {
  3. $cmd = $data[‘cmd’];
  4. if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
  5. self::log(“Unauthorized request from “ . $connection->getRemoteIp() . “:” . $connection->getRemotePort());
  6. return $connection->close();
  7. }
  8. switch ($cmd) {
  9. // BusinessWorker连接Gateway
  10. case GatewayProtocol::CMD_WORKER_CONNECT:
  11. $worker_info = json_decode($data[‘body’], true);
  12. if ($worker_info[‘secret_key’] !== $this->secretKey) {
  13. self::log(“Gateway: Worker key does not match “.var_export($this->secretKey, true).” !== “. var_export($this->secretKey));
  14. return $connection->close();
  15. }
  16. $key = $connection->getRemoteIp() . ‘:’ . $worker_info[‘worker_key’];
  17. // 在一台服务器上businessWorker->name不能相同
  18. if (isset($this->_workerConnections[$key])) {
  19. self::log(“Gateway: Worker->name conflict. Key:{$key});
  20. $connection->close();
  21. return;
  22. }
  23. $connection->key = $key;
  24. $this->_workerConnections[$key] = $connection;
  25. $connection->authorized = true;
  26. return;
  27. // GatewayClient连接Gateway

将worker的进程连接保存到$this->_workerConnections[$key] = $connection;

7 Gateway进程收到客户端的连接,消息时,会通过Gateway转发给worker处理

  1. // Gateway类的run方法中定义此属性
  2. $this->onMessage = array($this, ‘onClientMessage’);
  3. // 收到客户端消息的时候出发此函数
  4. public function onClientMessage($connection, $data)
  5. {
  6. $connection->pingNotResponseCount = –1;
  7. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
  8. }

在sendToWorker方法中,将数据发给worker进程处理

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏