我有一个运行两个任务的DAG: A 和 B 。
我没有在DAG级别上指定开始日期,而是将它作为属性添加到了运算符(我使用的是 PythonOperator ),并将其从DAG字典中删除。这两个任务每天运行。
A 的开始日期是2013-01-01,而 B 的开始日期是2015-01-01。我的问题是任务A的Airflow运行了16天(因为我想在 airflow.cfg 中,我保留了默认的 dag_concurrency = 16 )从2013-01-01开始,此后它将停止。 DAG处于运行状态 ,而 B 的任务处于状态无状态。
显然我做错了什么,我可以在DAG上设置开始日期级别,并从 A 的开始日期开始运行 B ,
或者我可以将它们拆分为单独的DAG,但这又不是我要监视的方式。
有没有办法使DAG包含多个任务,每个任务都有自己的 start_date ?如果是这样,该怎么做?
更新:
我知道
I have a DAG which runs two tasks: A and B.
Instead of specifying the start_date on DAG level, I have added it as an attribute to the operators (I am using a PythonOperator in this case) and removed it form the DAG dictionary. Both tasks run daily.
The start_date for A is 2013-01-01 and the start_date for B is 2015-01-01. My problem is that Airflow runs for 16 days for tasks A (because I guess in my airflow.cfg I have left the default dag_concurrency = 16)from 2013-01-01 and after that it stops. The DAGs are in state running and the tasks for B are in state with no status.
Clearly I am doing something wrong and I can simply set the start_date on DAG level and have B run from the start_date of A, but that's not what i want to do.
Alternatively I can split them in separate DAGs, but again, that's not how I want to monitor them.
Is there a way to have a DAG with multiple tasks each having its own start_date? If so, how to do this?
UPDATE:
I know that a ShortCircuitOperator can be added, but this seems to work only for a flow of tasks which are dependent and there is a downstream. In my case A is independent of B.
解决方案Use BranchPythonOperator and check in that task that your execution_date >= '2015-01-01' or not. If true it should execute Task B, if not it should execute a Dummy Task.
However, I would recommend using a Separate DAG.
Documentation on branching: airflow.readthedocs.io/en/1.10.2/concepts.html#branching
更多推荐
我可以在一个DAG下使用不同的开始日期在Airflow中执行任务吗?
发布评论