即使DAG未运行,气流变量也会更新

编程入门 行业动态 更新时间:2024-10-12 01:22:22
本文介绍了即使DAG未运行,气流变量也会更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。

但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因

counter = Variable.get('counter') s = BashOperator( task_id='echo_start_variable', bash_command='echo ' + counter, dag=dag, ) Variable.set("counter", int(counter) + 1) sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))" sql_query = sql_query.replace('{start}', start).replace('{end}', end) submit_query = PythonOperator( task_id='submit_athena_query', python_callable=run_athena_query, op_kwargs={'query': sql_query, 'db': 'db', 's3_output': 's3://s3-path/rohan/date=' + current_date + '/'}, dag=dag) e = BashOperator( task_id='echo_end_variable', bash_command='echo ' + counter, dag=dag, ) s >> submit_query >> e Airflow每30秒处理一次推荐答案文件(默认设置为)这意味着您所有顶级代码都是每30秒运行一次,因此 将导致变量计数器每30秒递增1。

在顶级代码中与变量交互是一种糟糕的做法(不管值的增加问题如何)。它每隔30秒打开一个到Metore数据库的连接,这可能会导致严重问题并使数据库不堪重负。

要获取变量的值,可以使用JJJA:

e = BashOperator( task_id='echo_end_variable', bash_command='echo {{ var.value.counter }}', dag=dag, )

这是使用变量的一种安全方式,因为只有在执行运算符时才会检索值。

如果要将变量的值增加1,则使用PythonOpeartor:

def increase(): counter = Variable.get('counter') Variable.set("counter", int(counter) + 1) increase_op = PythonOperator( task_id='increase_task', python_callable=increase, dag=dag)

只有在运算符运行时,才会执行可调用的python。

更多推荐

即使DAG未运行,气流变量也会更新

本文发布于:2023-11-23 13:29:28,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1621617.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:也会   气流   变量   DAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!