气流如何设置dag

编程入门 行业动态 更新时间:2024-10-17 11:24:59
本文介绍了气流如何设置dag_run.conf的默认值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 我正在尝试设置一个气流DAG,该DAG提供来自dag_run.conf的默认值。当从WebUI使用";run w/Config";选项运行DAG时,此功能非常有效。但按计划运行时,dag_run.conf字典不存在,任务会失败,如

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'

下面是示例作业。

是否可以使dag_run.conf始终包含params此处定义的词典?

from airflow import DAG from airflow.utils.dates import hours_ago from airflow.operators.bash import BashOperator from datetime import timedelta def do_something(val1: str, val2: str) -> str: return f'echo vars are: "{val1}, {val2}"' params = { 'key1': 'def1', 'key2': 'def2', } default_args = { 'retries': 0, } with DAG( 'template_test', default_args=default_args, schedule_interval=timedelta(minutes=1), start_date=hours_ago(1), params = params, ) as dag: hello_t = BashOperator( task_id='example-command', bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'), config=params, )

我见过的最接近的是在For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?中,但是在那里他们利用JJJA和if/Else-这将需要定义这些默认参数两次。我只想定义它们一次。

推荐答案

您可以使用DAGparams来实现您想要的功能:

params(Dict)-可在模板中访问的DAG级别参数字典,命名空间在params下。可以在任务级别覆盖这些参数。

您可以在DAG或任务级别定义params,也可以从触发器DAG w/config部分的UI添加或修改它们。

DAG示例:

default_args = { "owner": "airflow", } dag = DAG( dag_id="example_dag_params", default_args=default_args, schedule_interval="*/5 * * * *", start_date=days_ago(1), params={"param1": "first_param"}, catchup=False, ) with dag: bash_task = BashOperator( task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}" )

输出日志:

[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01 [2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=*** AIRFLOW_CTX_DAG_ID=example_dag_params AIRFLOW_CTX_TASK_ID=bash_task AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00 [2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location: /tmp [2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param'] [2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output: [2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param [2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0

从日志中,请注意dag_run是计划的,并且参数仍在那里。

您可以在this answer中找到有关使用参数的更广泛示例。

希望这对您有效!

更多推荐

气流如何设置dag

本文发布于:2023-11-23 19:04:12,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622548.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   如何设置   dag

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!