如何并行化Airflow DAG中的类似BashOperator任务但参数不同

编程入门 行业动态 更新时间:2024-10-14 10:41:06
本文介绍了如何并行化Airflow DAG中的类似BashOperator任务但参数不同的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我在DAG中并行执行以下2个任务在现实世界中,这些任务可能是15或20个任务,其输入参数来自数组,如下所示.

I have parallel execution of 2 tasks below in my DAG In the real world these could be 15 or 20 tasks with the input parameters coming from an array, like below.

fruits = ["apples", "bananas"] bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None) t0=BashOperator( task_id="print", bash_command='echo "Beginning parallel tasks next..." ', dag=bad_dag) t1=BashOperator( task_id="fruit_"+fruits[0], params={"fruits": fruits}, bash_command='echo fruit= {{params.fruits[0]}} ', dag=bad_dag) t2=BashOperator( task_id="fruit_"+fruits[1], params={"fruits": fruits}, bash_command='echo fruit= {{params.fruits[1]}} ', dag=bad_dag) t0>>[t1, t2]

对我来说,编写此DAG的最佳方法是什么,所以我不必像上面一样重复一遍又一遍地重写同一BashOperator.

Whats the best way for me to write this DAG, so I dont have to re-write the same BashOperator over and over again like I have above.

我不能使用循环,因为如果使用循环,则无法并行化任务.

I cannot use a loop because I cannot parallelize the tasks if I use a loop.

推荐答案

使用以下DAG.这个想法是每个任务的 task_id 应该是唯一的,其余部分将由气流处理.

Use the below DAG. The idea is that the task_id for each task should be unique, airflow will handle the rest.

fruits = ["apples", "bananas"] bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None) t0=BashOperator( task_id="print", bash_command='echo "Beginning parallel tasks next..." ', dag=bad_dag) for fruit in fruits: task_t = BashOperator( task_id="fruit_"+fruit, params={"fruit": fruit}, bash_command='echo fruit= {{params.fruit}} ', dag=bad_dag) t0 >> task_t

更多推荐

如何并行化Airflow DAG中的类似BashOperator任务但参数不同

本文发布于:2023-07-07 22:10:29,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1068457.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:类似   参数   Airflow   DAG   BashOperator

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!