3

我正在使用 airbnb 的气流,并创建了一个如下所示的简单任务。但是,即使我将间隔设置为每小时或任何其他间隔,调度程序也会继续运行任务。我注意到的另一件事是,如果我将调度间隔设置为“@once”,它将永远不会运行 dag。

我遵循了这里的约定http://airflow.readthedocs.org/en/latest/scheduler.html#dag-runs

我正在使用的简单 dag。

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 1, 5),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'schedule_test', default_args=default_args, schedule_interval='@hourly')

# t1, t2 and t3 are examples of tasks created by instatiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

调度程序输出供参考。如您所见,它一直在一遍又一遍地运行,但在我的 dag 中,我有 schedule_interval='@hourly'

2016-01-06 20:34:37,266 - root - INFO - Starting the scheduler
2016-01-06 20:34:37,267 - root - INFO - Filling up the DagBag from /usr/local/airflow/dags
2016-01-06 20:34:37,267 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,272 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,281 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:37,288 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:37,291 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:37,318 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:37,321 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:37,323 - root - INFO - Checking dependencies on 1 tasks instances, minus 0 skippable ones
2016-01-06 20:34:37,326 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:37,347 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:37,347 - root - INFO - Loop took: 0.071298 seconds
2016-01-06 20:34:37,356 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:37,357 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:22.357089
2016-01-06 20:34:42,269 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:42,274 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:42,277 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:42,295 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:42,296 - root - INFO - Loop took: 0.029931 seconds
2016-01-06 20:34:42,309 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:42,310 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:27.310303
2016-01-06 20:34:42,314 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T00:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T00:00:00
2016-01-06 20:34:47,895 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:47,900 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:47,904 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:47,922 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False>
2016-01-06 20:34:47,925 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 00:00:00: scheduled__2016-01-06T00:00:00, externally triggered: False> successful
2016-01-06 20:34:47,926 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:47,928 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:47,937 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:47,940 - root - INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
2016-01-06 20:34:47,943 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:47,947 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:47,960 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:47,960 - root - INFO - Loop took: 0.067789 seconds
2016-01-06 20:34:47,968 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:47,968 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:32.968940
2016-01-06 20:34:52,901 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:34:52,906 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:34:52,909 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:34:52,942 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:34:52,946 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:34:52,948 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:34:52,950 - root - INFO - Getting list of tasks to skip for active runs.
2016-01-06 20:34:52,953 - root - INFO - Checking dependencies on 3 tasks instances, minus 0 skippable ones
2016-01-06 20:34:52,961 - root - INFO - Adding to queue: airflow run schedule_test print_date 2016-01-06T03:00:00 --local -sd DAGS_FOLDER/test.py 
2016-01-06 20:34:52,975 - root - INFO - Done queuing tasks, calling the executor's heartbeat
2016-01-06 20:34:52,976 - root - INFO - Loop took: 0.07741 seconds
2016-01-06 20:34:52,982 - root - INFO - Finding 'running' jobs without a recent heartbeat
2016-01-06 20:34:52,983 - root - INFO - Failing jobs without heartbeat after 2016-01-06 20:32:37.983542
2016-01-06 20:34:52,987 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T02:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T02:00:00
2016-01-06 20:34:58,583 - root - INFO - commandairflow run schedule_test print_date 2016-01-06T01:00:00 --local -sd DAGS_FOLDER/test.py 
Logging into: /usr/local/airflow/logs/schedule_test/print_date/2016-01-06T01:00:00
2016-01-06 20:35:04,215 - root - INFO - Prioritizing 0 queued jobs
2016-01-06 20:35:04,223 - root - INFO - Importing /usr/local/airflow/dags/test.py
2016-01-06 20:35:04,229 - root - INFO - Loaded DAG <DAG: schedule_test>
2016-01-06 20:35:04,263 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False>
2016-01-06 20:35:04,267 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 01:00:00: scheduled__2016-01-06T01:00:00, externally triggered: False> successful
2016-01-06 20:35:04,268 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False>
2016-01-06 20:35:04,272 - root - INFO - Marking run <DagRun schedule_test @ 2016-01-06 02:00:00: scheduled__2016-01-06T02:00:00, externally triggered: False> successful
2016-01-06 20:35:04,273 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 03:00:00: scheduled__2016-01-06T03:00:00, externally triggered: False>
2016-01-06 20:35:04,276 - root - INFO - Checking state for <DagRun schedule_test @ 2016-01-06 04:00:00: scheduled__2016-01-06T04:00:00, externally triggered: False>
4

2 回答 2

2

我已经看到了这个问题,实际上在airflow.cfg文件中你可以看到worker的数量和worker刷新时间,一般是30秒,所以每30秒新的worker就会启动,然后再次加载所有dags并运行它们。我让worker刷新为 0 以停止此问题。

于 2017-07-06T13:00:16.977 回答
0

你提供的start_date正确吗?它可以在程序中或在运行调度程序时提供。如果这个日期是过去的,气流将简单地找出你的 dag 应该执行的过去实例,并将其排队等待执行(回填)。

于 2016-03-14T05:49:46.303 回答