我想用 React 为经销商消息做一个异步路由器,但它不起作用。http://zguide.zeromq.org/php:rtdealer中的代码正在运行,但我无法确定我在做什么不同。我正在使用 libzmq 4.0.5
这是我的代码:
$context = new React\ZMQ\Context($loop);
$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');
$worker->on('error', function ($e) {
var_dump($e->getMessage());
});
$worker->on('messages', function($msg) use ($worker) {
echo 'Dealer messages'. PHP_EOL;
var_dump($msg);
});
$worker->on('message', function($msg) use ($worker) {
echo 'Dealer message'. PHP_EOL;
var_dump($msg);
});
$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');
$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
echo 'Time to send!'. PHP_EOL;
$i++;
$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');
});
$router->on('messages', function($msg) use ($router) {
echo 'Router messages'. PHP_EOL;
var_dump($msg);
});
$router->on('message', function($msg) {
echo 'Router message'. PHP_EOL;
var_dump($msg);
});
$loop->run();
问题是只有经销商发送第一条消息“END”。之后,路由器尝试发送消息,但经销商没有收到它们。
此外,这个函数似乎只被调用一次:
// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
$messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
echo 'Receiving...'; // Added
var_dump($messages); // Added
if ($messages !== false) {
if (count($messages) === 1) {
$this->emit('message', array($messages[0]));
}
$this->emit('messages', array($messages));
}
}
输出是:
Receiving...array(2) {
[0]=>
string(1) "A"
[1]=>
string(3) "END"
}
Router messages
array(2) {
[0]=>
string(1) "A"
[1]=>
string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...
编辑:
修改了在dealer连接前绑定路由器的代码,问题依旧:
$loop = React\EventLoop\Factory::create();
$context = new React\ZMQ\Context($loop);
$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');
$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');
});
$router->on('messages', function($msg) use ($router) {
echo 'Router messages'. PHP_EOL;
var_dump($msg);
});
$router->on('message', function($msg) {
echo 'Router message'. PHP_EOL;
var_dump($msg);
});
$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');
$timer->getLoop()->cancelTimer($timer); // Cancel the timer after connecting
});
$worker->on('error', function ($e) {
var_dump($e->getMessage());
});
$worker->on('messages', function($msg) use ($worker) {
echo 'Dealer messages'. PHP_EOL;
var_dump($msg);
});
$worker->on('message', function($msg) use ($worker) {
echo 'Dealer message'. PHP_EOL;
var_dump($msg);
});
$loop->run();
这是终端输出:
After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
[0]=>
string(1) "A"
[1]=>
string(3) "END"
}
Router messages
array(2) {
[0]=>
string(1) "A"
[1]=>
string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds