问题标签 [airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
6 回答
78174 浏览

python - 如何在 Windows 上运行 Airflow

运行 Airflow 的常规说明不适用于 Windows 环境:

Airflow 实用程序在命令行中不可用,我无法在其他地方找到它来手动添加。Airflow 如何在 Windows 上运行?

0 投票
2 回答
14956 浏览

python - 气流没有正确调度Python

代码:

Python 版本 2.7.x 和气流版本 1.5.1

我的 dag 脚本是这样的

从那里你可以看到我正在创建一个包含 6 个任务的 DAG,第一个任务(Start1)首先启动,然后所有其他五个任务启动

目前我在 DAG 开始之间延迟了 5 分钟

对于第一种类型的所有六个任务,它已经完美运行,但五分钟后 DAG 没有重新启动

已经超过 1 小时 DAG 没有重新启动我真的不知道我错了。

如果有人能指出我出了什么问题,那就太好了。我尝试使用airflow testing clearthen 清除以发生同样的事情。它首先运行然后就站在那里。

命令行显示的唯一内容是Getting all instance for DAG testing

当我更改 schedule_interval 的位置时,它只会在没有任何调度间隔的情况下并行运行。也就是说,在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔

代码 2:

0 投票
1 回答
3062 浏览

python - 如何使用 python 为 Windows 开发环境设置气流

在工作中,我们正在完成 AirBnB 的 Airflow PoC。标准操作实践是将开发(即 DAG 创建和 python 脚本等)与运行时环境解耦。即我们不允许直接在服务器上编写脚本(无论如何都是不好的做法)。

因此,我已将 intelliJ 配置为在 Windows 中使用 Python(按预期工作)(我们不允许使用 unix 工作站),但我找不到在 Windows 中安装气流的方法。我是具有 Java 开发背景的 Python 新手。

因此,如何为 AirBnb/Airflow 设置本地开发环境 IDEA Ultimate(没有文档?

0 投票
4 回答
30378 浏览

python - 气流 - Python 文件不在同一个 DAG 文件夹中

我正在尝试使用 Airflow 来执行一个简单的任务 python。

如果我尝试,例如:

气流测试 python_test 打印 2015-01-01

有用!

现在我想把我的def print_context(ds, **kwargs)函数放在其他 python 文件中。所以我创建了一个名为:simple_test.py 的文件并更改:

现在我尝试再次运行:

气流测试 python_test 打印 2015-01-01

好!它仍然有效!

但是如果我创建一个模块,例如,带有文件的工作模块SimplePython.py,导入(from worker import SimplePython)它并尝试:

气流测试 python_test 打印 2015-01-01

它给出了信息:

ImportError:没有名为worker的模块

问题:

  1. 是否可以在 DAG 定义中导入模块?
  2. Airflow+Celery 将如何在工作节点之间分发所有必要的 python 源文件?
0 投票
1 回答
1284 浏览

python - 以 DAG 方式调度作业

我们有一个包含不同类型工作的系统。例如,我们称它们为:

它们都需要不同的参数集(和可选参数)。即我们job_1(x)为不同的x= A, B, C .... job_2运行一组参数,这些参数取决于结果job_1(x)job_2加载job_A(x)存储的数据。等等。

结果是依赖关系的树结构。现在,这些工作偶尔会因为某种原因而失败。因此,如果job_Aforx=B失败,该树的分支将完全失败并且不应该运行。所有其他分支都应该运行。

所有作业都用 Python 编写并使用并行性(基于生成 SLURM 作业)。它们是用 cron 安排的。这显然不是很好,并且有两个主要缺点:

  • 调试非常困难。无论树中较高的作业是否失败,所有作业都会运行。如果不深入了解依赖关系,很难看出问题出在哪里。
  • 如果更高的作业(例如job_A)未完成,则job_B可能会安排运行,并且会失败或根据过时的日期运行。

为了解决这个问题,我们正在研究用于调度或可视化的气流,因为它是用 Python 编写的,并且似乎大致符合我们的需求。我看到了不同的挑战:

  • 作业的依赖树要么非常通用(即取决于job_Bjob_A,要么非常宽(即job_B(y)对于 100 个参数取决于job_A(x=A)对于某个参数失败。后一种情况下的可视化树将非常宽,大约有 300 个叶子。它会更准确,但可视化可能难以阅读。我们可以过滤失败的作业,只查看它们的依赖关系吗?
  • 我们在作业中具有并行性(我们需要它,否则作业运行超过一天,并且我们希望每天重新运行全部)这是否会破坏我们的调度?
  • 我们希望尽可能少地改变我们的工作和数据管理。
  • 我们能否以一种易于理解的方式实施接下来要产生哪些工作的规则系统?

气流是一个不错的选择吗?我知道还有其他一些(luigi、Azkaban 等)与 Hadoop 堆栈有些相关(我们没有使用它,因为它不是大数据)。需要多少黑客攻击?多少黑客行为是明智的?

0 投票
1 回答
1226 浏览

python - 配置 SnakeBite HDFS 客户端以使用高可用性模式

我正在使用蛇咬库从我的气流 dags 访问 HDFS。

我的 HDFS 集群已升级到高可用性模式。这意味着配置为仅指向一个名称节点的客户端将在该名称节点不是活动节点时失败。

我可以使用哪些策略来使高可用性模式具有高可用性?我可以将蛇咬客户端配置为故障转移到另一个节点吗?我可以使用某种负载均衡器将流量引导到正确的名称节点吗?

0 投票
1 回答
3673 浏览

python - 气流 HiveOperator 不工作

我正在尝试在气流中使用蜂巢操作员。我安装了所有依赖项(pyhs2、pyhive 并运行了 pip install airflow[hive])。

但是,当我使用下面的代码时

我得到这个错误。我不确定这意味着什么

0 投票
2 回答
4903 浏览

scheduler - 即使每小时安排一次,airbnb 气流调度程序也会保持运行任务

我正在使用 airbnb 的气流,并创建了一个如下所示的简单任务。但是,即使我将间隔设置为每小时或任何其他间隔,调度程序也会继续运行任务。我注意到的另一件事是,如果我将调度间隔设置为“@once”,它将永远不会运行 dag。

我遵循了这里的约定http://airflow.readthedocs.org/en/latest/scheduler.html#dag-runs

我正在使用的简单 dag。

调度程序输出供参考。如您所见,它一直在一遍又一遍地运行,但在我的 dag 中,我有 schedule_interval='@hourly'

0 投票
3 回答
17362 浏览

airflow - Airflow - 如何使 EmailOperator html_content 动态化?

我正在寻找一种允许动态设置给定 EmailOperator 任务发送的电子邮件内容的方法。理想情况下,我想让电子邮件内容依赖于 xcom 调用的结果,最好是通过 html_content 参数。

我注意到 Airflow 文档说 xcom 调用可以嵌入到模板中。也许有一种方法可以使用指定任务 ID 上的模板来制定 xcom 拉取然后将结果作为 html_content 传递?谢谢

0 投票
1 回答
173 浏览

python - 气流回填仅 13 个实例在默认设置下运行

我正在尝试使用airbnb 气流。当我尝试使用 'backfill' 选项以 60 分钟的 timedelta 运行一天时,只执行了 13 个实例。休息显示为等待并且从不执​​行。