3

我想用 React 为经销商消息做一个异步路由器,但它不起作用。http://zguide.zeromq.org/php:rtdealer中的代码正在运行,但我无法确定我在做什么不同。我正在使用 libzmq 4.0.5

这是我的代码:

$context = new React\ZMQ\Context($loop);

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
    echo 'Time to send!'. PHP_EOL;
    $i++;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

问题是只有经销商发送第一条消息“END”。之后,路由器尝试发送消息,但经销商没有收到它们。

此外,这个函数似乎只被调用一次:

// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
    $messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
    echo 'Receiving...';    // Added
    var_dump($messages);    // Added

    if ($messages !== false) {
        if (count($messages) === 1) {
            $this->emit('message', array($messages[0]));
        }

        $this->emit('messages', array($messages));
    }
}

输出是:

Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...

编辑:

修改了在dealer连接前绑定路由器的代码,问题依旧:

$loop = React\EventLoop\Factory::create();

$context = new React\ZMQ\Context($loop);

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
    echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);

$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
    echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
    $worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
    $worker->connect('tcp://127.0.0.1:5556');
    $worker->send('END');
    $timer->getLoop()->cancelTimer($timer);     // Cancel the timer after connecting
});

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

这是终端输出:

After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
4

1 回答 1

2

我终于解决了这个问题。@Jason 的评论帮助没有使用 PHP/React,你确定你的命名空间是正确的吗?,我看了一下send调用的方法。此方法如下所示:

// \React\ZMQ\SocketWrapper

public function send($message)
{
    echo 'Inside send, sending message'. PHP_EOL; //This line was added by myself
    $this->buffer->send($message);
}

然后,我想:好吧,如果我这样调用这个方法:

$router->send('A', \ZMQ::MODE_SNDMORE);

第二个参数永远不会通过。

然后,我看到该sendmulti方法没有在该类中实现,SocketWrapper__call方法是在这个SocketWrapper类中实现的,直接调用了ZMQ api for PHP中实现的方法,所以我尝试调用sendmulti将这两个参数包装在一个数组中。它起作用了,所以我尝试调用第一个send传递相同数组的方法,我不知道为什么,但它也起作用了,所以诀窍是调用:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('END'));

代替:

$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');

另外,请注意位于 GitHub 存储库中的 ZMQ PHP API,这是错误的。PhpStorm 以某种方式下载了另一个 ZMQ PHP API,该 API 位于某个似乎正确的位置,sendmulti方法原型在此 API 中如下所示:

public function sendmulti(array $message, $mode = 0)

令人惊讶的是,以下对该方法的调用似乎给出了相同的结果:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('A'), \ZMQ::MODE_SNDMORE);
于 2015-05-28T10:46:32.480 回答