本文介绍了气流在单个DAG中生成动态任务,任务N + 1取决于TaskN的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
动态生成任务时,我需要让任务2依赖于任务1,任务1 >>任务2或task2.set_upstream(task1).
When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).
由于task_ids是经过评估的,或者似乎是预先确定的,因此我无法提前设置依赖项,因此将不胜感激.
Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.
Component(I)任务可以很好地运行,只是它们一次运行即可.
The Component(I) tasks generate fine, except that they all run at once.
for i in range(1,10): task_id='Component'+str(i) task_id = BashOperator( task_id='Component'+str(i), bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) , xcom_push=True, dag=dag) ?????.set_upstream(??????)推荐答案
使用以下代码:
a = [] for i in range(0,10): a.append(BashOperator( task_id='Component'+str(i), bash_command="echo {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) , xcom_push=True, dag=dag)) if i not in [0]: a[i-1] >> a[i]使用DummyOperator,代码如下:
a = [] for i in range(0,10): a.append(DummyOperator( task_id='Component'+str(i), dag=dag)) if i not in [0]: a[i-1] >> a[i]这将生成以下DAG:
更多推荐
气流在单个DAG中生成动态任务,任务N + 1取决于TaskN
发布评论