我正在尝试构建一个可以做一些事情的小应用程序。
一个主类将产生、销毁、倾听和管理工人......
<?php
namespace Queue;
class Controller extends Process
{
public $workers = array();
function __construct()
{
//Run the Process constructor before doing anything else.
parent::__construct();
}
function spawnWorker($businessId)
{
$this->workers[$businessId] = new \React\ChildProcess\Process("php index.php worker $businessId");
$this->workers[$businessId]->start($this->loop);
$this->workers[$businessId]->stdout->on("data", function($output) use(&$businessId) {
$this->channels->out->write("Child $businessId Said: " . $output);
});
}
function outputToWorker($businessId, $string)
{
$this->workers[$businessId]->stdin->write($string);
}
function run()
{
$this->loop->run();
}
}
一群由控制器管理的工作人员(最终他们将有一个长时间运行的进程来管理数据库中定义的队列)......
<?php
namespace Queue;
class Worker extends Process
{
function __construct($businessId)
{
//Run the Process constructor before doing anything else.
parent::__construct();
// Assign a business to this worker.
$this->businessId = $businessId;
// Create a queue object for this worker.
$this->queue = new \stdClass;
}
function run()
{
$this->channels->in->on("data", function($input) {
$this->sendOutput($input);
});
$this->loop->run();
}
}
但是,我遇到了代码没有达到我预期的情况......
// Based on command line argument, decide which kind of process to spawn.
if (!isset($argv[1]) || $argv[1] === "controller")
{
$controller = new Controller;
$controller->spawnWorker(1);
$controller->outputToWorker(1, "Before Run");
$controller->run();
$controller->outputToWorker(1, "After Run");
} else if (strtolower($argv[1]) === "worker" && isset($argv[2]))
{
$worker = new Worker($argv[1]);
$worker->run();
}
最后一段代码是我实际运行以启动应用程序的文件。它根据命令行参数路由应该生成哪种进程。(这样我就不必为一堆不同类型的进程创建一个文件。)
工人正确生成,第一条消息(Child 1 Said: Before Run
)被发送给工人,工人将其发送给stdout
并controller
听到该事件并将其发送给它自己的stdout
。
但是,似乎监听器是一次性监听器,并且事件循环并没有重新处理它。这意味着,在初始之后的任何时候run()
,controller
它都不会响应stdout
来自工作人员的事件。
我是否必须使用计时器创建自己的显式侦听器?