本文介绍了如果任何任务失败,如何将Airflow DAG运行标记为失败?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
如果任何任务失败,是否有可能使Airflow DAG失败?
Is it possible to make an Airflow DAG fail if any task fails?
我通常在DAG结束时有一些清理任务,现在,每当最后一个任务成功完成时,整个DAG都被标记为成功。
I usually have some cleaning up tasks at the end of a DAG and as it is now, whenever the last task succeeds the whole DAG is marked as a success.
推荐答案另一种解决方案是添加最终的PythonOperator检查此运行中所有任务的状态:
Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:
final_status = PythonOperator( task_id='final_status', provide_context=True, python_callable=final_status, trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails dag=dag, ) def final_status(**kwargs): for task_instance in kwargs['dag_run'].get_task_instances(): if task_instance.current_state() != State.SUCCESS and \ task_instance.task_id != kwargs['task_instance'].task_id: raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))更多推荐
如果任何任务失败,如何将Airflow DAG运行标记为失败?
发布评论