气流:将动态值传递给Sub DAG操作员

编程入门 行业动态 更新时间:2024-10-25 16:19:55
本文介绍了气流:将动态值传递给Sub DAG操作员的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我是Airflow的新手。 我遇到了一个场景,其中父DAG需要传递一些动态数字(比如说 n )到Sub DAG。 其中,SubDAG将使用此数字动态创建 n 并行任务。

I am new to Airflow. I have come across a scenario, where Parent DAG need to pass some dynamic number (let's say n) to Sub DAG. Where as SubDAG will use this number to dynamically create n parallel tasks.

气流文档没有介绍实现这一目标的方法。因此,我探讨了几种方法:

Airflow documentation doesn't cover a way to achieve this. So I have explore couple of ways :

我试图作为xcom值传递,但是出于某种原因SubDAG无法解析为传递的值。

I have tried to pass as a xcom value, but for some reason SubDAG is not resolving to the passed value.

父级Dag文件

def load_dag(**kwargs): number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs']) dag_data = json.dumps({ "number_of_runs": number_of_runs }) return dag_data # ------------------ Tasks ------------------------------ load_config = PythonOperator( task_id='load_config', provide_context=True, python_callable=load_dag, dag=dag) t1 = SubDagOperator( task_id=CHILD_DAG_NAME, subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ), default_args=default_args, dag=dag, )

子Dag文件

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval=None) variabe_names = {} for i in range(num_of_runs): variabe_names['task' + str(i + 1)] = DummyOperator( task_id='dummy_task', dag=dag_subdag, ) return dag_subdag

选项-2

我还尝试将 number_of_runs 作为全局变量传递,这是行不通的。

Option - 2

I have also tried to pass number_of_runs as a global variable, which was not working.

我们也尝试将此值写入数据文件。但是sub DAG抛出文件不存在错误。这可能是因为我们正在动态生成此文件。

Also we tried to write this value to a data file. But sub DAG is throwing File doesn't exist error. This might be because we are dynamically generating this file.

有人可以帮我这个忙吗?

Can some one help me with this.

推荐答案

我已经用选项3完成了。关键是要返回没有任务的有效dag(如果文件确实有)不存在。因此,load_config将生成一个文件,其中包含您的任务数或更多信息(如果需要)。您的subdag工厂将类似于:

I've done it with Option 3. The key is to return a valid dag with no tasks, if the file does not exist. So load_config will generate a file with your number of tasks or more information if needed. Your subdag factory would look something like:

def subdag(...): sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1)) file_path = "/path/to/generated/file" if os.path.exists(file_path): data_file = open(file_path) list_tasks = data_file.readlines() for task in list_tasks: DummyOperator( task_id='task_'+task, default_args=args, dag=sdag, ) return sdag

在dag一代中,您会看到一个Subdag,其中没有任务。在执行dag时,在load_config完成后,您会看到动态生成的subdag

At dag generation you will see a subdag with No tasks. At dag execution, after load_config is done, you can see you dynamically generated subdag

更多推荐

气流:将动态值传递给Sub DAG操作员

本文发布于:2023-11-23 18:26:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622446.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:操作员   气流   动态   DAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!