5

我最近一直在使用Storm,它包含一个名为fields grouping(与 Celery 中的概念无关的group()概念),其中具有特定键的消息将始终路由到同一个工作人员。

只是为了更清楚地定义我的意思,这里是来自 Storm wiki。

字段分组:流按分组中指定的字段进行分区。例如,如果流按“user-id”字段分组,具有相同“user-id”的元组将始终执行相同的任务,但具有不同“user-id”的元组可能会执行不同的任务.

例如,从单词列表中读取,我想将以 a、b、c 开头的单词仅路由到工作进程,将 d、e、f 路由到另一个等。

想要这样做的原因可能是因为我希望一个进程负责数据库读取/写入一组相同的数据,以便进程之间不存在竞争条件。

我正在尝试找出在 Celery 中实现这一目标的最佳方法。

到目前为止,我最好的解决方案是为每个“组”(例如 letter.a、letters.d)使用一个队列,并确保工作进程的数量与队列的数量完全匹配。缺点是每个工作人员只能运行一个进程,以及各种情况,例如工作人员死亡或添加/删除工作人员时。

我是 Celery 的新手,所以如果我所指的概念不正确,请纠正我。

4

2 回答 2

6

涉及到一些胶水,但这是概念:

有一种方法可以使用CELERY_WORKER_DIRECT. 将其设置为True创建到每个工人的路线。

我通过使用celery.current_app.control.inspect().ping()或确定活动主机定期确定活动工作人员。例如:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

当我需要按键路由时,我会散列该值,然后以工人数量为模。这将平均分配任务,并保持相同的密钥到同一个工人。例如:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

然后在执行任务时,我只需指定交换和路由键,如下所示:

my_task.apply_async(exchange='C.dq', routing_key=host)

有几个缺点:

  1. 据我所知,在工作人员上设置 > 1 的并发性将使每个进程消耗相同的内容,从而否定整个练习。不幸的解决方法是将其保持在 1。
  2. 如果工作人员宕机,介于ping()和之间apply_async,消息将被发送到不存在的路由。解决此问题的方法是捕获超时、重新声明可用主机、重新散列并重新发送。
于 2013-11-11T11:09:02.717 回答
1

celery 的重点是你不需要管理单个工人。

如果您需要任务来获得数据的所有权,那么任务应该在运行开始时获得所有权。

如果你想单独管理工人,可能不要使用芹菜。您可能应该自己编写工作人员,然后使用消息队列(或者风暴)。为正确的工作使用正确的工具。

于 2013-09-10T18:27:16.413 回答