0

我正在使用带有 redistogo addon:nano 包的 heroku 上托管的 django 应用程序。我正在使用 rq,在后台执行任务——这些任务是由在线用户发起的。我害怕连接数量的增加和有限的资源受到限制。

我目前有一个工作人员在运行“n”个队列。每个队列使用连接池中的一个连接实例来处理“n”个不同类型的任务。例如,假设 4 个用户启动相同类型的任务,我想让我的主要工作人员动态创建子进程来处理它。有没有办法实现所需的多处理和并发?

我尝试使用multiprocessing模块,最初没有介绍Lock();但这会使用先前的请求数据公开并覆盖用户传递给启动函数的数据。应用锁后,它通过返回一个限制第二个用户发起请求server error - 500

github 链接 #1:看起来团队正在做 PR;虽然还没有发布!

github 链接 #2:这篇文章有助于解释在运行时创建更多工作人员。然而,这个解决方案也覆盖了数据。新请求再次使用先前的请求数据进行处理。

如果您需要查看一些代码,请告诉我。我将尝试发布一个最小的可重现片段。

有什么想法/建议/指南吗?

4

2 回答 2

2

您有机会尝试AutoWorker吗?

自动生成 RQ Worker。

from autoworker import AutoWorker
aw = AutoWorker(queue='high', max_procs=6)
aw.work()

它使用multiprocessingwith StrictRedisfromredis模块并遵循从rq

from rq.contrib.legacy import cleanup_ghosts
from rq.queue import Queue
from rq.worker import Worker, WorkerStatus
于 2019-06-28T22:57:40.457 回答
0

在深入了解之后,我意识到Worker类已经实现了多处理。

work函数在内部调用execute_job(job, queue)其反过来在模块中引用

产生一匹工作马来执行实际工作并将其传递给工作。

工人将等待工作马并确保它在给定的超时范围内执行,

或者将使用 SIGALRM 结束工作马。

execute_job()函数会隐式调用 to fork_work_horse(job, queue),它会生成一个工作马来执行实际工作,并按照以下逻辑将工作传递给它:


def fork_work_horse(self, job, queue):

        child_pid = os.fork()
        os.environ['RQ_WORKER_ID'] = self.name
        os.environ['RQ_JOB_ID'] = job.id
        if child_pid == 0:
            self.main_work_horse(job, queue)
        else:
            self._horse_pid = child_pid
            self.procline('Forked {0} at {1}'.format(child_pid, time.time()))


main_work_horse进行内部调用,然后调用perform_job(job, queue)其他一些调用来实际执行工作。

在rq 的官方文档页面上提到的有关 Worker 生命周期的所有步骤都在这些调用中得到处理。

这不是我所期望的多处理,但我猜他们有做事的方法。但是我的原始帖子仍然没有得到答复,而且我仍然不确定并发性..

那里的文档仍然需要处理,因为它几乎没有涵盖这个库的真正本质!

于 2019-06-22T10:34:43.507 回答