我需要任务的状态,例如它是否在同一dag中正在运行,upforretry或失败。所以我尝试使用下面的代码来获取它,尽管我没有输出...
I need the status of the task like if it is running or upforretry or failed within the same dag. So i tried to get it using the below code, though i got no output...
Auto = PythonOperator( task_id='test_sleep', python_callable=execute_on_emr, op_kwargs={'cmd':'python /home/hadoop/test/testsleep.py'}, dag=dag) logger.info(Auto)目的是杀死某些正在运行的任务一旦完成特定的气流任务。
The intention is to kill certain running tasks once a particular task on airflow completes.
问题是我如何获取任务的状态,例如它处于运行状态还是失败或成功
推荐答案我正在做类似的事情。我需要检查一项任务是否成功完成了另一项任务的前10次。 taky2使我走上了正确的道路。这实际上很容易:
I am doing something similar. I need to check for one task if the previous 10 runs of another task were successful. taky2 sent me on the right path. It is actually fairly easy:
from airflow.models import TaskInstance ti = TaskInstance(*your_task*, execution_date) state = ti.current_state()我想在dag中检查一下,不必指定dag。 我只是创建了一个函数来遍历过去的n_days并检查状态。
As I want to check that within the dag, it is not neccessary to specify the dag. I simply created a function to loop through the past n_days and check the status.
def check_status(**kwargs): last_n_days = 10 for n in range(0,last_n_days): date = kwargs['execution_date']- timedelta(n) ti = TaskInstance(*my_task*, date) #my_task is the task you defined within the DAG rather than the task_id (as in the example below: check_success_task rather than 'check_success_days_before') state = ti.current_state() if state != 'success': raise ValueError('Not all previous tasks successfully completed.')当您调用该函数时,请确保设置Provide_context。
When you call the function make sure to set provide_context.
check_success_task = PythonOperator( task_id='check_success_days_before', python_callable= check_status, provide_context=True, dag=dag )更新:当您想如果要执行另一个任务的任务,则需要这样调用它:
UPDATE: When you want to call a task from another dag, you need to call it like this:
from airflow import configuration as conf from airflow.models import DagBag, TaskInstance dag_folder = conf.get('core','DAGS_FOLDER') dagbag = DagBag(dag_folder) check_dag = dagbag.dags[*my_dag_id*] my_task = check_dag.get_task(*my_task_id*) ti = TaskInstance(my_task, date)显然,现在也有一个API调用在做同样的事情:
Apparently there is also an api-call by now doing the same thing:
from airflow.apimon.experimental.get_task_instance import get_task_instance ti = get_task_instance(*my_dag_id*, *my_task_id*, date)更多推荐
dag中的Airflow任务状态
发布评论