1

设置:带有 Kubernetes 1.18 和 Python 3.8 的 Airflow 2.0.1,Kubernetes 客户端:18.17.x

Pod 模板文件:

apiVersion: v1
kind: Pod
metadata:
  name: workerPod

spec:
  containers:
    - args: []
      command: []
      env:
        - name: <Key>
          value: "<value>"
      envFrom: []
      name: base
      image: "<image_name>"
      imagePullSecrets: [name: "<image_pull_secrets>"]
      imagePullPolicy: "Always"
      ports: []
      volumeMounts:
        - mountPath: "<path>"
          name: "<name>"

在airflow.cfg 中设置的默认配置如下:

[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>

问题是,例如,虽然从 pod_template_file 中正确读取了某些键,但我可以看到所有键都被正确env variables设置以及imagePullPolicy被正确读取(通过覆盖imagePullPolicy: "Always"from的值来验证imagePullPolicy: "IfNotPresent"),但键imagePullSecrets不是被正确阅读。我可以验证这一点,因为Base credentials not provided从 ecr 存储库中提取图像时出现错误。我已经验证了凭据是正确的,并且我可以在尝试明确创建一个 pod 时创建一个 pod。

即使尝试直接设置imagePullSecretsairflow.cfg我仍然会得到同样的错误。

我还尝试使用 V1 api 显式创建 pod 覆盖,如下所示:

start_task = PythonOperator(
            task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
            executor_config={
                "pod_template_file": "<path_to_template>",
                "pod_override": k8s.V1Pod(
                    spec=k8s.V1PodSpec(
                        containers=[
                            k8s.V1Container(
                                name="base",
                                image="<image_override>",
                                image_pull_policy="<pull_policy>"
                            ),
                        ],
                        image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
                    )
                ),
            },
        )

在这种情况下,我可以正确下载要使用的 docker 映像,而不会出现任何身份验证错误。但不幸的是,pod 抛出了一个错误:AttributeError: 'V1Container' object has no attribute '_startup_probe'

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
    run_job.run()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
    self._execute()
  File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
    if not self.task_instance.check_and_change_state_before_execution(
  File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
    session.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.raise_(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
    persistence.save_obj(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
    _emit_update_statements(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
    for (
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
    state.manager[propkey].impl.is_equal(
  File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
    return x == y
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
    return self.to_dict() == other.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
    result[attr] = value.to_dict()
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
    result[attr] = list(map(
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
    lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
    value = getattr(self, attr)
  File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
    return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'
4

1 回答 1

0

我有一个类似的问题。问题是我们更改了气流容器并升级了新容器中的 Kubernetes 库。新的 Kubernetes 库不一定存在问题,但 Airflow 已经序列化了一些对象(在我们的例子中是 TaskInstance,根据共享的回溯,您的情况似乎也是这种情况)并且它反序列化它并从中生成一个 Python 对象。因此,在您的情况下,它会V1Container从它拥有的序列化表单中重新创建一个对象。在您的案例中,新对象的结构是这样的 Python 它在其初始化程序中设置了一个属性 _startup_probe。但是序列化版本没有该属性,因此它似乎是此提交之前的版本. 似乎反序列化不会导致问题,但是每当使用 to_dict 方法时就会出现问题。在您的情况下,它用于对我进行比较(eq),它是在记录时使用它的repr 。

Airflow Slack 社区向我指出了应该解决此问题的更改。我还不能对此进行测试,但是已经在这里分享了,以防有人碰到它。

于 2022-02-17T12:10:42.570 回答