42

我们有一个用金字塔制作的网络应用程序,并通过 gunicorn+nginx 提供服务。它适用于 8 个工作线程/进程

我们需要工作,我们选择了调度程序。这是我们启动它的方式

from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.scheduler import Scheduler

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

问题是 gunicorn 的所有工作进程都选择了调度程序。我们尝试实现文件锁定,但它似乎不是一个足够好的解决方案。确保在任何给定时间只有一个工作进程选择预定事件并且没有其他线程在下一个之前选择它的最佳方法是什么JOB_INTERVAL

如果我们决定稍后切换到 apache2+modwsgi,该解决方案甚至需要使用 mod_wsgi。它需要与作为服务员的单进程开发服务器一起使用。

来自赏金赞助者的更新

我面临着 OP 描述的相同问题,只是使用 Django 应用程序。如果原始问题,我很确定添加此细节不会有太大变化。出于这个原因,为了获得更多的可见性,我还用 标记了这个问题django

4

3 回答 3

47

因为 Gunicorn 从 8 个工作人员开始(在您的示例中),所以这会将应用程序 8 次分叉为 8 个进程。这 8 个进程是从主进程派生出来的,进程监控每个进程的状态并能够添加/删除工作人员。

每个进程都会获得您的 APScheduler 对象的副本,该对象最初是您的主进程的 APScheduler 的精确副本。这导致每个“nth”工作人员(进程)执行每个作业总共“n”次。

解决此问题的方法是使用以下选项运行 gunicorn:

env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preload

--preload标志告诉 Gunicorn “在分叉工作进程之前加载应用程序”。通过这样做,每个工作人员都“获得了应用程序的副本,已经由 Master 实例化,而不是实例化应用程序本身”。这意味着以下代码仅在 Master 进程中执行一次:

rerun_monitor = Scheduler()
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

此外,我们需要将jobstore设置为:memory:以外的任何内容。这样,虽然每个 worker 都是自己的独立进程,无法与其他 7 个进程进行通信,但通过使用本地数据库(而不是内存),我们保证一个作业存储上的 CRUD 操作的真实点。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = Scheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

最后,我们想使用BackgroundScheduler,因为它实现了start(). 当我们调用start()BackgroundScheduler 时,会在后台启动一个新线程,负责调度/执行作业。这很重要,因为请记住在步骤 (1) 中,由于我们的--preload标志,我们仅start()在 Master Gunicorn 进程中执行该函数一次。根据定义,分叉的进程不会继承其父进程的线程,因此每个工作进程都不会运行 BackgroundScheduler 线程。

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

rerun_monitor = BackgroundScheduler(
    jobstores={'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
rerun_monitor.start()
rerun_monitor.add_interval_job(job_to_be_run,\
            seconds=JOB_INTERVAL)

由于这一切,每个 Gunicorn 工作者都有一个 APScheduler 被欺骗进入“已启动”状态,但实际上并没有运行,因为它丢弃了它的父线程!每个实例还能够更新作业存储数据库,只是不执行任何作业!

查看flask-APScheduler以获得在 Web 服务器(如 Gunicorn)中运行 APScheduler 的快速方法,并为每个作业启用 CRUD 操作。

于 2016-10-20T18:47:56.910 回答
22

我找到了一个适用于具有非常相似问题的 Django 项目的修复程序。我只是在调度程序第一次启动时绑定一个 TCP 套接字并随后检查它。我认为以下代码只需稍作调整也可以为您工作。

import sys, socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(("127.0.0.1", 47200))
except socket.error:
    print "!!!scheduler already started, DO NOTHING"
else:
    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    scheduler.start()
    print "scheduler started"
于 2014-12-04T20:52:03.667 回答
2

简短的回答:你不能正确地做到这一点而没有后果。

我以 Gunicorn 为例,但对于 uWSGI 来说基本相同。运行多个进程时有各种 hack,仅举几例:

  1. 使用--preload选项
  2. 使用on_startinghook 启动 APScheduler 后台调度程序
  3. 使用when_readyhook 启动 APScheduler 后台调度程序

它们在一定程度上起作用,但可能会出现以下错误:

  1. 工人经常超时
  2. 没有作业时调度程序挂起https://github.com/agronholm/apscheduler/issues/305

APScheduler 设计为在单个进程中运行,它可以完全控制将作业添加到作业存储的过程。它使用threading.Event'swait()set()方法来协调。如果它们由不同的进程运行,则协调将不起作用。

可以在一个进程中在 Gunicorn 中运行它。

  1. 仅使用一个工作进程
  2. 使用post_worker_init钩子启动调度程序,这将确保调度程序只在工作进程中运行,而不是在主进程中运行

作者还指出,多个进程共享作业存储量是不可能的。https://apscheduler.readthedocs.io/en/stable/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes他还提供了解决方案使用 RPyC。

虽然用 REST 接口包装 APScheduler 是完全可行的。您可能需要考虑将其作为一个独立的应用程序提供给一名工作人员。换句话说,如果您有其他端点,请将它们放在另一个应用程序中,您可以在其中使用多个工作人员。

于 2020-09-08T15:59:02.623 回答