5

我使用 Symfony2 和RabbitMqBundle创建了一个将文档发送到 ElasticSearch 的工作程序。以一对一的速度索引文档比使用 ElasticSearch 批量 API 慢得多。因此,我创建了一个缓冲区,以千为一组将文档刷新到 ES。代码看起来(有点简化)如下:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;
    }
}

这一切都很好,但有一个小问题。队列以不可预测的速度充满了消息。有时 100000 在 5 分钟内,有时不是一个小时。例如,当有 82671 个文档排队时,最后 671 个文档在收到另外 329 个文档之前不会被索引,这可能需要几个小时。我希望能够做到以下几点:

警告:科幻代码!这显然行不通:

class SearchIndexator
{
    protected $elasticaService;
    protected $buffer = [];
    protected $bufferSize = 0;
    protected $flushTimer;

    // The maximum number of documents to keep in the buffer.
    // If the buffer reaches this amount of documents, then the buffers content
    // is send to elasticsearch for indexation.
    const MAX_BUFFER_SIZE = 1000;

    public function __construct(ElasticaService $elasticaService)
    {
        $this->elasticaService = $elasticaService;

        // Highly Sci-fi code
        $this->flushTimer = new Timer();
        // Flush buffer after 5 minutes of inactivity.
        $this->flushTimer->setTimeout(5 * 60);
        $this->flushTimer->setCallback([$this, 'flush']);
    }

    /**
     * Destructor
     *
     * Flush any documents that remain in the buffer.
     */
    public function __destruct()
    {
        $this->flush();
    }

    /**
     * Add a document to the indexation buffer.
     */
    public function onMessage(array $document)
    {
        // Prepare the document for indexation.
        $this->doHeavyWeightStuff($document);

        // Create an Elastica document
        $document = new \Elastica\Document(
            $document['key'],
            $document
        );

        // Add the document to the buffer.
        $this->buffer[] = $document;

        // Flush the buffer when max buffersize has been reached.
        if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
            $this->flush();
        } else {
            // Start a timer that will flush the buffer after a timeout.
            $this->initTimer();
        }
    }

    /**
     * Send the current buffer to ElasticSearch for indexation.
     */
    public function flush()
    {
        // Send documents to ElasticSearch for indexation.
        if (1 <= $this->bufferSize) {
            $this->elasticaService->addDocuments($this->buffer);
        }

        // Clear buffer
        $this->buffer = [];
        $this->bufferSize = 0;

        // There are no longer messages to be send, stop the timer.
        $this->flushTimer->stop();
    }

    protected function initTimer()
    {
        // Start or restart timer
        $this->flushTimer->isRunning()
          ? $this->flushTimer->reset()
          : $this->flushTimer->start();
    }
}

现在,我知道 PHP 不是事件驱动的局限性。但这是 2015 年,并且有像 ReactPHP 这样的解决方案,所以这应该是可能的吧?对于 ØMQ 有这个功能。什么是适用于 RabbitMQ 或独立于任何消息队列扩展的解决方案?

我怀疑的解决方案:

  1. crysalead/code。它使用declare(ticks = 1);. 我不确定这是否是一种高效且可靠的方法。有任何想法吗?
  2. 我可以运行一个 cronjob,每 5 分钟向同一个队列发布一条“FLUSH”消息,然后在收到此消息时显式刷新缓冲区,但这会作弊。
4

1 回答 1

0

正如我在评论中提到的,您可以使用这些信号。PHP 允许您将信号处理程序注册到脚本信号(即 SIGINT、SIGKILL 等)

对于您的用例,您可以使用 SIGALRM 信号。此信号将在特定时间(您可以设置)到期后向您的脚本发出警报。这些信号的积极方面是它们是非阻塞的。换句话说,您的脚本的正常运行不会受到干扰。

调整后的解决方案(从 PHP 5.3 起不推荐使用刻度线):

function signal_handler($signal) {
    // You would flush here
    print "Caught SIGALRM\n";
    // Set the SIGALRM timer again or it won't trigger again
    pcntl_alarm(300);
}

// register your handler with the SIGALRM signal
pcntl_signal(SIGALRM, "signal_handler", true);
// set the timeout for the SIGALRM signal to 300 seconds
pcntl_alarm(300);

// start loop and check for pending signals
while(pcntl_signal_dispatch() && your_loop_condition) {
    //Execute your code here
}

注意:您只能在脚本中使用 1 个 SIGALRM 信号,如果您使用pcntl_alarm计时器设置信号的时间,则警报将重置(不触发信号)到其新设置的值。

于 2015-12-16T17:58:57.307 回答