在Airflow中创建动态工作流程的正确方法

编程入门 行业动态 更新时间:2024-10-09 11:26:26
本文介绍了在Airflow中创建动态工作流程的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?我看过subdags,但看起来它只能与必须由Dag创建确定的一组静态任务一起使用。

Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

dag会触发工作吗?

Would dag triggers work? And if so could you please provide an example.

我遇到了一个问题,即在任务执行之前,不可能知道计算任务C所需的任务B的数量。 A已经完成。每个任务B. *将花费几个小时才能计算,并且无法合并。

I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

|---> Task B.1 --| |---> Task B.2 --| Task A ------|---> Task B.3 --|-----> Task C | .... | |---> Task B.N --|

想法#1

我不之所以喜欢这种解决方案,是因为我必须创建一个阻塞的ExternalTask​​Sensor,并且所有任务B. *将需要2-24小时才能完成。因此,我认为这不是可行的解决方案。当然有更简单的方法吗?还是不是为此设计了气流?

Idea #1

I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1 Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator) |-- Task B.1 --| |-- Task B.2 --| Task Dummy A --|-- Task B.3 --|-----> Task Dummy B | .... | |-- Task B.N --|

编辑1:

到目前为止,这个问题仍然没有很好的答案。我已经与寻求解决方案的几个人联系。

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.

推荐答案

这是我在没有任何子标记的情况下对类似请求的处理方式:

Here is how I did it with a similar request without any subdags:

首先创建一个返回所需值的方法

First create a method that returns whatever values you want

def values_function(): return values

下一个将动态生成作业的创建方法:

Next create method that will generate the jobs dynamically:

def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag)

然后将它们组合:

push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete

更多推荐

在Airflow中创建动态工作流程的正确方法

本文发布于:2023-11-24 00:43:07,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1623346.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:工作流程   正确   方法   动态   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!