我使用 Symfony2 和RabbitMqBundle创建了一个将文档发送到 ElasticSearch 的工作程序。以一对一的速度索引文档比使用 ElasticSearch 批量 API 慢得多。因此,我创建了一个缓冲区,以千为一组将文档刷新到 ES。代码看起来(有点简化)如下:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
}
}
这一切都很好,但有一个小问题。队列以不可预测的速度充满了消息。有时 100000 在 5 分钟内,有时不是一个小时。例如,当有 82671 个文档排队时,最后 671 个文档在收到另外 329 个文档之前不会被索引,这可能需要几个小时。我希望能够做到以下几点:
警告:科幻代码!这显然行不通:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
现在,我知道 PHP 不是事件驱动的局限性。但这是 2015 年,并且有像 ReactPHP 这样的解决方案,所以这应该是可能的吧?对于 ØMQ 有这个功能。什么是适用于 RabbitMQ 或独立于任何消息队列扩展的解决方案?
我怀疑的解决方案:
- 有crysalead/code。它使用
declare(ticks = 1);
. 我不确定这是否是一种高效且可靠的方法。有任何想法吗? - 我可以运行一个 cronjob,每 5 分钟向同一个队列发布一条“FLUSH”消息,然后在收到此消息时显式刷新缓冲区,但这会作弊。