Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?我看过subdags,但看起来它只能与必须由Dag创建确定的一组静态任务一起使用。
Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.
dag会触发工作吗?
Would dag triggers work? And if so could you please provide an example.
我遇到了一个问题,即在任务执行之前,不可能知道计算任务C所需的任务B的数量。 A已经完成。每个任务B. *将花费几个小时才能计算,并且无法合并。
I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.
|---> Task B.1 --| |---> Task B.2 --| Task A ------|---> Task B.3 --|-----> Task C | .... | |---> Task B.N --|想法#1
我不之所以喜欢这种解决方案,是因为我必须创建一个阻塞的ExternalTaskSensor,并且所有任务B. *将需要2-24小时才能完成。因此,我认为这不是可行的解决方案。当然有更简单的方法吗?还是不是为此设计了气流?
Idea #1
I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?
Dag 1 Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator) |-- Task B.1 --| |-- Task B.2 --| Task Dummy A --|-- Task B.3 --|-----> Task Dummy B | .... | |-- Task B.N --|编辑1:
到目前为止,这个问题仍然没有很好的答案。我已经与寻求解决方案的几个人联系。
Edit 1:
As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.
推荐答案这是我在没有任何子标记的情况下对类似请求的处理方式:
Here is how I did it with a similar request without any subdags:
首先创建一个返回所需值的方法
First create a method that returns whatever values you want
def values_function(): return values下一个将动态生成作业的创建方法:
Next create method that will generate the jobs dynamically:
def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag)然后将它们组合:
push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete更多推荐
在Airflow中创建动态工作流程的正确方法
发布评论