本文介绍了如何在单个Dag气流中处理不同的任务间隔?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个简单的任务,具有多个具有简单结构的任务,任务A,B和C可以在开始时运行而没有任何依赖性,但是任务D取决于A,这不是我的问题:
I have a single dag with multiple tasks with this simple structure that tasks A, B, and C can run at the start without any dependencies but task D depends on A no here is my question:
任务A,B和C每天运行,但我需要任务D在A成功之后每周运行。
tasks A, B, and C run daily but I need task D to run weekly after A succeeds. how can I setup this dag?
是否更改任务工作的schedule_interval?
does changing schedule_interval of task work? Is there any best practice to this problem?
感谢您的帮助。
推荐答案您可以使用 ShortCircuitOperator 进行此操作。 / p>
You can use a ShortCircuitOperator to do this.
import airflow from airflow.operators.python_operator import ShortCircuitOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'schedule_interval': '0 10 * * *' } dag = DAG(dag_id='example', default_args=args) a = DummyOperator(task_id='a', dag=dag) b = DummyOperator(task_id='b', dag=dag) c = DummyOperator(task_id='c', dag=dag) d = DummyOperator(task_id='d', dag=dag) def check_trigger(execution_date, **kwargs): return execution_date.weekday() == 0 check_trigger_d = ShortCircuitOperator( task_id='check_trigger_d', python_callable=check_trigger, provide_context=True, dag=dag ) a.set_downstream(b) b.set_downstream(c) a.set_downstream(check_trigger_d) # Perform D only if trigger function returns a true value check_trigger_d.set_downstream(d)更多推荐
如何在单个Dag气流中处理不同的任务间隔?
发布评论