在我的DAG中,我有一些任务只能在星期六执行。因此,我使用BranchPythonOperator在星期六的任务和DummyTask之间进行分支。之后,我加入了两个分支,并希望执行其他任务。
工作流程如下: BranchPythonOperator和分支正确具有状态'upstream_failed',但是加入分支的任务变为'skipped',因此整个工作流程显示为 'success'。
我尝试使用'all_success'作为触发规则,那么如果有东西直到整个工作流程都失败了,但如果没有失败,则将跳过dummy3。
我还尝试了'all_done'作为触发规则,那么如果没有任何失败,它将正常工作,但是如果失败,则dummy3仍将执行。
我的测试代码如下:
from datetime导入datetime,日期从airflow导入DAG 从airflow.operators.python_operator import BranchPythonOperator,PythonOperator 从airflow .operators.dummy_operator import DummyOperator dag = DAG('test_branches', description ='测试分支', catchup = False, schedule_interval ='0 0 * * *', start_date = datetime(2018,8,1)) def python1():引发Exception('Test failure')#打印'测试成功' dummy1 = PythonOperator( task_id ='python1', python_callable = python1, dag = dag ) 虚拟对象2 = Du mmyOperator( task_id ='dummy2', dag = dag ) dummy3 = DummyOperator( task_id ='dummy3', dag = dag, trigger_rule ='one_success') def is_saturday():如果date.today()。 weekday()== 6:返回'dummy2'其他:返回'today_is_not_saturday' branch_on_saturday = BranchPythonOperator( task_id ='branch_on_saturday', python_callable = is_saturday, dag = dag) not_saturday = DummyOperator( task_id ='today_is_not_saturday', dag = dag ) dummy1> branch_on_saturday>>假人2>> dummy3 branch_on_saturday>> not_saturday>> dummy3EDIT
我刚刚想通了一个丑陋的解决方法: dummy4代表我实际上需要运行的任务,dummy5只是一个假人。 dummy3仍然具有触发规则'one_success'。
现在,如果没有上游故障,则将运行dummy3和dummy4 如果上游不是星期天,则dummy5'运行',如果星期六不是星期六,dummy5将被跳过,这意味着DAG在两种情况下都标记为成功。跳过,dummy5被标记为'upstream_failed',并且DAG被标记为失败。
此解决方法使我的DAG
解决方案设置触发器ru如果将dummy3设为'none_failed',在任何情况下都会使其以预期状态结束。
请参见 airflow.apache/concepts.html#trigger-rules
编辑:看起来像这样'none_failed'触发规则尚不存在:它是在2018年11月添加的
请参见 github/apache/airflow/pull/4182
In my DAG, I have some tasks that should only be run on Saturdays. Therefore I used a BranchPythonOperator to branch between the tasks for Saturdays and a DummyTask. After that, I join both branches and want to run other tasks.
The workflow looks like this: Here I set the trigger rule for dummy3 to 'one_success' and everything works fine.
The problem I encountered is when something upstream of the BranchPythonOperator fails: The BranchPythonOperator and the branches correctly have the state'upstream_failed', but the task joining the branches becomes 'skipped', therefore the whole workflow shows 'success'.
I tried using 'all_success' as the trigger rule, then it works correctly if something fails the whole workflow fails, but if nothing fails dummy3 gets skipped.
I also tried 'all_done' as the trigger rule, then it works correctly if nothing fails, but if something fails dummy3 still gets executed.
My test code looks like this:
from datetime import datetime, date from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator, PythonOperator from airflow.operators.dummy_operator import DummyOperator dag = DAG('test_branches', description='Test branches', catchup=False, schedule_interval='0 0 * * *', start_date=datetime(2018, 8, 1)) def python1(): raise Exception('Test failure') # print 'Test success' dummy1 = PythonOperator( task_id='python1', python_callable=python1, dag=dag ) dummy2 = DummyOperator( task_id='dummy2', dag=dag ) dummy3 = DummyOperator( task_id='dummy3', dag=dag, trigger_rule='one_success' ) def is_saturday(): if date.today().weekday() == 6: return 'dummy2' else: return 'today_is_not_saturday' branch_on_saturday = BranchPythonOperator( task_id='branch_on_saturday', python_callable=is_saturday, dag=dag) not_saturday = DummyOperator( task_id='today_is_not_saturday', dag=dag ) dummy1 >> branch_on_saturday >> dummy2 >> dummy3 branch_on_saturday >> not_saturday >> dummy3EDIT
I just figured out an ugly workaround: dummy4 represents a task that I actually need to run, dummy5 is just a dummy. dummy3 still has the trigger rule 'one_success'.
Now dummy3 and dummy4 run if there is no upstream failure, dummy5 'runs' if the day is not saturday and gets skipped if the day is saturday, which means the DAG is marked as success in both cases. If there is a failure upstream, dummy3 and dummy4 get skipped and dummy5 gets marked as 'upstream_failed' and the DAG is marked as failed.
This workaround makes my DAG run as I want it to, but I'd still prefer a solution without some hacky workaround.
解决方案Setting the trigger rule for dummy3 to 'none_failed' would make it end with the expected status in any cases.
see airflow.apache/concepts.html#trigger-rules
EDIT : it looks like this 'none_failed' trigger rule was not yet existing when this question was asked and answered: it was added in november 2018
see github/apache/airflow/pull/4182
更多推荐
BranchPythonOperator之后的气流任务不会失败,并且可以正确成功
发布评论