使用kubernetes执行器时找不到airflow

编程入门 行业动态 更新时间:2024-10-27 00:29:01
本文介绍了使用kubernetes执行器时找不到airflow- dag_id问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在使用气流稳定的舵图并使用Kubernetes Executor,新的Pod计划用于dag,但找不到dag_id失败的问题.我正在使用git-sync来获取数据.以下是错误和kubernetes的配置值.有人可以帮我解决这个问题吗?

错误:

[2020-07-01 23:18:36,939] {__init__.py:51} INFO - Using executor LocalExecutor [2020-07-01 23:18:36,940] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/dags/etl/sampledag_dag.py Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 37, in <module> args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 75, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 523, in run dag = get_dag(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 149, in get_dag 'parse.'.format(args.dag_id)) airflow.exceptions.AirflowException: dag_id could not be found: sampledag . Either the dag did not exist or it failed to parse.

配置:

AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: false AIRFLOW__KUBERNETES__GIT_REPO: git@git/dags.git AIRFLOW__KUBERNETES__GIT_BRANCH: master AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT: /dags AIRFLOW__KUBERNETES__GIT_SSH_KEY_SECRET_NAME: git-secret AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-repo AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: tag AIRFLOW__KUBERNETES__RUN_AS_USER: "50000"

sampledag

import logging import datetime from airflow import models from airflow.contrib.operators import kubernetes_pod_operator import os args = { 'owner': 'airflow' } YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) try: print("Entered try block") with models.DAG( dag_id='sampledag', schedule_interval=datetime.timedelta(days=1), start_date=YESTERDAY) as dag: print("Initialized dag") kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( # The ID specified for the task. task_id='trigger-task', # Name of task you want to run, used to generate Pod ID. name='trigger-name', namespace='scheduler', in_cluster = True, cmds=["./docker-run.sh"], is_delete_operator_pod=False, image='imagerepo:latest', image_pull_policy='Always', dag=dag) print("done") except Exception as e: print(str(e)) logging.error("Error at {}, error={}".format(__file__, str(e))) raise

解决方案

我遇到了同样的问题.我通过将以下内容添加到我的配置中来解决了这个问题:

AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH: repo/

发生的事情是,初始化容器将在[AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT]/[AIRFLOW__KUBERNETES__GIT_SYNC_DEST]中下载您的数据,而AIRFLOW__KUBERNETES__GIT_SYNC_DEST默认为repo( airflow.apache/docs/stable/configurations-ref.html#git-sync-dest )

I am using airflow stable helm chart and using Kubernetes Executor, new pod is being scheduled for dag but its failing with dag_id could not be found issue. I am using git-sync to get dags. Below is the error and kubernetes config values. Can someone please help me resolve this issue?

Error:

[2020-07-01 23:18:36,939] {__init__.py:51} INFO - Using executor LocalExecutor [2020-07-01 23:18:36,940] {dagbag.py:396} INFO - Filling up the DagBag from /opt/airflow/dags/dags/etl/sampledag_dag.py Traceback (most recent call last): File "/home/airflow/.local/bin/airflow", line 37, in <module> args.func(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 75, in wrapper return f(*args, **kwargs) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 523, in run dag = get_dag(args) File "/home/airflow/.local/lib/python3.6/site-packages/airflow/bin/cli.py", line 149, in get_dag 'parse.'.format(args.dag_id)) airflow.exceptions.AirflowException: dag_id could not be found: sampledag . Either the dag did not exist or it failed to parse.

Config:

AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: false AIRFLOW__KUBERNETES__GIT_REPO: git@git/dags.git AIRFLOW__KUBERNETES__GIT_BRANCH: master AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT: /dags AIRFLOW__KUBERNETES__GIT_SSH_KEY_SECRET_NAME: git-secret AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY: airflow-repo AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG: tag AIRFLOW__KUBERNETES__RUN_AS_USER: "50000"

sampledag

import logging import datetime from airflow import models from airflow.contrib.operators import kubernetes_pod_operator import os args = { 'owner': 'airflow' } YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) try: print("Entered try block") with models.DAG( dag_id='sampledag', schedule_interval=datetime.timedelta(days=1), start_date=YESTERDAY) as dag: print("Initialized dag") kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator( # The ID specified for the task. task_id='trigger-task', # Name of task you want to run, used to generate Pod ID. name='trigger-name', namespace='scheduler', in_cluster = True, cmds=["./docker-run.sh"], is_delete_operator_pod=False, image='imagerepo:latest', image_pull_policy='Always', dag=dag) print("done") except Exception as e: print(str(e)) logging.error("Error at {}, error={}".format(__file__, str(e))) raise

解决方案

I had the same issue. I solved it by adding the following to my config:

AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH: repo/

What was happening is that the init container will download your dags in [AIRFLOW__KUBERNETES__GIT_DAGS_FOLDER_MOUNT_POINT]/[AIRFLOW__KUBERNETES__GIT_SYNC_DEST] and AIRFLOW__KUBERNETES__GIT_SYNC_DEST by default is repo (airflow.apache/docs/stable/configurations-ref.html#git-sync-dest)

更多推荐

使用kubernetes执行器时找不到airflow

本文发布于:2023-11-23 18:19:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622430.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:找不到   执行器   kubernetes   airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!