0

我使用 APScheduler 安排了三个作业,它们以 2 分钟、3 分钟和 10 分钟的间隔运行。所有三个作业都访问 SQLite 数据库。

在作业执行过程中,有可能当作业 1 锁定 SQLite 数据库时,作业 2 和作业 3 也尝试访问它,从而引发错误sqlite3.OperationalError: database is locked。因此,某些要插入数据库的数据未完成,并导致数据库丢失数据。

在这种情况下,我想确保 Job 2 和 Job 3 线程等待 Job 1 释放锁,以便它们可以访问数据。

以下是配置片段:

jobs.search_events:
  - trigger: interval
  - every: 10
  - unit: minutes

jobs.check_search_status_events:
  - trigger: interval
  - every: 2
  - unit: minutes

jobs.delete_searches:
  - trigger: interval
  - every: 3
  - unit: minutes
  1. search_events: 调用一些外部 API 并将搜索请求的响应写入状态
  2. check_search_status_events:检查搜索请求的状态,如果完成,将发送另一个请求来获取数据并写入数据库中的数据。
  3. delete_searches:如果搜索状态标记为完成,则该作业将删除所有此类搜索记录。

这些配置被读取然后发送到 APScheduler 的BackgroundScheduler

以下是我使用创建的 Schedule 类APScheduler

class Schedule(object):
    """Schedule the process to be run in the background

    :param object: Inherits default python object
    :type object: object
    """
    scheduler: BackgroundScheduler

    def __init__(self) -> None:
        """Initialize the scheduler demon in the background
        """
        self.scheduler = BackgroundScheduler(daemon=True)

    def add_job_storage(self, name: str, job_store: str) -> None:
        """Add persistent job storage to the scheduler

        :param name: Name of the job store
        :type name: str
        :param job_store: URL / location of the job store
        :type job_store: str
        """
        self.scheduler.add_jobstore(name, url=job_store)

    def add_configurations(self, jobstores: KeyValue, executors: KeyValue, job_defaults: KeyValue) -> None:
        """Add configurations to include job stores, executors and job_defaults to the scheduled jobs

        :param jobstores: Job store to be used
        :type jobstores: dict
        :param executors: Executors to be used
        :type executors: dict
        :param job_defaults: Job default configurations to be used
        :type job_defaults: dict
        """
        self.scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

    def add_jobs(self, jobs: KeyValue) -> None:
        """Add jobs to the scheduler from configurations

        : param jobs: Jobs to be added to the scheduler
        : type jobs: dict
        """
        for func_name, config in jobs.items():
            package_path, function = func_name.rsplit(JOB_SPLIT_CHAR, 1)
            module = importlib.import_module(package_path)
            func = getattr(module, function)

            kwargs = {}

            if EVERY in config:
                kwargs[config[UNIT]] = config[EVERY]
                del config[UNIT]
                del config[EVERY]

            if config[TRIGGER] == CRON:
                del config[UNIT]

            for key, value in config.items():
                kwargs[key] = value

            self.scheduler.add_job(func, **kwargs)
    
    def add_job(self, function: Callable[[KeyValue], Any], schedule: KeyValue) -> None:
        """Add single job to the scheduler

        :param function: Function as object, to be called to schedule a single job
        :type function: object
        :param schedule: Schedule to be executed for the function
        :type schedule: dict
        """
        self.scheduler.add_job(function, **schedule)
    
    def start(self) -> None:
        """Start the configured executors and job stores and begin processing scheduled jobs.
        """
        self.scheduler.start()
    
    def shutdown(self) -> None:
        """Shuts down the scheduler, along with its executors and job processes.
        """
        self.scheduler.shutdown()
    
    def pause(self) -> None:
        """Pause job processing in the scheduler.

        This will prevent the scheduler from waking up to do job processing until resume() is called. 
        It will not however stop any already running job processing.
        """
        self.scheduler.pause()
    
    def resume(self) -> None:
        """Resume job processing in the scheduler.
        """
        self.scheduler.resume()

    def is_running(self) -> bool:
        """Check if scheduler is running or not

        :return: True if running, else False
        :rtype: bool
        """
        return bool(self.scheduler.running)  # OK
    
    def is_paused(self) -> bool:
        """Check if scheduler is paused

        :return: True if paused, else False
        :rtype: bool
        """
        return STATE_PAUSED == int(self.scheduler.state) # OK

    def is_stopped(self) -> bool:
        """Check if the scheduler is stopped

        :return: True if stopped, else False
        :rtype: bool
        """
        return STATE_STOPPED == int(self.scheduler.state) # OK
    
    def is_job_exists(self, job_id: str) -> bool:
        """Look for job if it exists in the jobstore.

        :param job_id: Joib id to be checked to exists in the job store
        :type job_id: str
        :return: True if job exists, else False
        :rtype: bool
        """
        job = self.scheduler.get_job(job_id)
        return job is not None
    
    def get_job(self, job_id: str) -> Any:
        """Get the job specified by job_id. 

        If a job exists, it will return job else it will return None

        :param job_id: Job identifier
        :type job_id: str
        :return: Job object / None
        :rtype: object/None
        """
        return self.scheduler.get_job(job_id)
    
    def reschedule(self, job_id: str, schedule: KeyValue) -> None:
        """Reschdule the given job with the new scheduled values provided

        :param job_id: Job to be rescheduled
        :type job_id: str
        :param schedule: New schedule for the job
        :type schedule: dict
        """
        self.scheduler.reschedule_job(job_id, **schedule)

    def remove_job(self, job_id: str) -> None:
        """Removes a job if it exists in the scheduler

        :param job_id: Job to be removed
        :type job_id: str
        """
        if self.is_job_exists(job_id):
            self.scheduler.remove_job(job_id)

搜索事件功能:

def search_events():
    # Sends a request to external API and stores its response in the SQLite database.
    response = requests.get('https://some_url_with_query_params')
    if requests.codes.okay == response.status_code:
        # process the response and create insert query
        query = function_generating_query_from_response(response)
        try:
            conn = sqlite3.connect("database_location")
            cursor = conn.cursor()
            cursor.execute(query)
            conn.commit()
        except Exception as e:
             logger.exception(e)


def check_search_status_events():
    # Sends a request to external API and stores its response in the SQLite database.
    response = requests.get('https://status_check_url_with_query_params')
    if requests.codes.okay == response.status_code:
        # process the response and create insert query
        query = function_generating_query_from_response(response)
        try:
            conn = sqlite3.connect("database_location")
            cursor = conn.cursor()
            cursor.execute(query)
            conn.commit()
        except Exception as e:
             logger.exception(e)

def delete_searches():
    # process the response and create insert query
    query = function_generating_delete_query()
    try:
        conn = sqlite3.connect("database_location")
        cursor = conn.cursor()
        cursor.execute(query)
        conn.commit()
    except Exception as e:
        logger.exception(e)

如何确保由创建的线程APScheduler将等待其他线程完成对数据库的读/写工作,然后执行其读/写?

4

0 回答 0