设置:带有 Kubernetes 1.18 和 Python 3.8 的 Airflow 2.0.1,Kubernetes 客户端:18.17.x
Pod 模板文件:
apiVersion: v1
kind: Pod
metadata:
name: workerPod
spec:
containers:
- args: []
command: []
env:
- name: <Key>
value: "<value>"
envFrom: []
name: base
image: "<image_name>"
imagePullSecrets: [name: "<image_pull_secrets>"]
imagePullPolicy: "Always"
ports: []
volumeMounts:
- mountPath: "<path>"
name: "<name>"
在airflow.cfg 中设置的默认配置如下:
[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>
问题是,例如,虽然从 pod_template_file 中正确读取了某些键,但我可以看到所有键都被正确env variables
设置以及imagePullPolicy
被正确读取(通过覆盖imagePullPolicy: "Always"
from的值来验证imagePullPolicy: "IfNotPresent"
),但键imagePullSecrets
不是被正确阅读。我可以验证这一点,因为Base credentials not provided
从 ecr 存储库中提取图像时出现错误。我已经验证了凭据是正确的,并且我可以在尝试明确创建一个 pod 时创建一个 pod。
即使尝试直接设置imagePullSecrets
,airflow.cfg
我仍然会得到同样的错误。
我还尝试使用 V1 api 显式创建 pod 覆盖,如下所示:
start_task = PythonOperator(
task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
executor_config={
"pod_template_file": "<path_to_template>",
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="<image_override>",
image_pull_policy="<pull_policy>"
),
],
image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
)
),
},
)
在这种情况下,我可以正确下载要使用的 docker 映像,而不会出现任何身份验证错误。但不幸的是,pod 抛出了一个错误:AttributeError: 'V1Container' object has no attribute '_startup_probe'
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
if not self.task_instance.check_and_change_state_before_execution(
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
session.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
persistence.save_obj(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
for (
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
state.manager[propkey].impl.is_equal(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
return x == y
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
return self.to_dict() == other.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
result[attr] = value.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
result[attr] = list(map(
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
value = getattr(self, attr)
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'