dag中的Airflow任务状态

编程入门 行业动态 更新时间:2024-10-14 08:29:27
本文介绍了dag中的Airflow任务状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我需要任务的状态,例如它是否在同一dag中正在运行,upforretry或失败。所以我尝试使用下面的代码来获取它,尽管我没有输出...

I need the status of the task like if it is running or upforretry or failed within the same dag. So i tried to get it using the below code, though i got no output...

Auto = PythonOperator( task_id='test_sleep', python_callable=execute_on_emr, op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'}, dag=dag) logger.info(Auto)

目的是杀死某些正在运行的任务一旦完成特定的气流任务。

The intention is to kill certain running tasks once a particular task on airflow completes.

问题是我如何获取任务的状态,例如它处于运行状态还是失败或成功

推荐答案

我正在做类似的事情。我需要检查一项任务是否成功完成了另一项任务的前10次。 taky2使我走上了正确的道路。这实际上很容易:

I am doing something similar. I need to check for one task if the previous 10 runs of another task were successful. taky2 sent me on the right path. It is actually fairly easy:

from airflow.models import TaskInstance ti = TaskInstance(*your_task*, execution_date) state = ti.current_state()

我想在dag中检查一下,不必指定dag。 我只是创建了一个函数来遍历过去的n_days并检查状态。

As I want to check that within the dag, it is not neccessary to specify the dag. I simply created a function to loop through the past n_days and check the status.

def check_status(**kwargs): last_n_days = 10 for n in range(0,last_n_days): date = kwargs['execution_date']- timedelta(n) ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before') state = ti.current_state() if state != 'success': raise ValueError('Not all previous tasks successfully completed.')

当您调用该函数时,请确保设置Provide_context。

When you call the function make sure to set provide_context.

check_success_task = PythonOperator( task_id='check_success_days_before', python_callable= check_status, provide_context=True, dag=dag )

更新:当您想如果要执行另一个任务的任务,则需要这样调用它:

UPDATE: When you want to call a task from another dag, you need to call it like this:

from airflow import configuration as conf from airflow.models import DagBag, TaskInstance dag_folder = conf.get('core','DAGS_FOLDER') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[*my_dag_id*] my_task = check_dag.get_task(*my_task_id*) ti = TaskInstance(my_task, date)

显然,现在也有一个API调用在做同样的事情:

Apparently there is also an api-call by now doing the same thing:

from airflow.apimon.experimental.get_task_instance import get_task_instance ti = get_task_instance(*my_dag_id*, *my_task_id*, date)

更多推荐

dag中的Airflow任务状态

本文发布于:2023-07-07 22:09:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1068450.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:状态   dag   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!