本文介绍了如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个python DAG Parent Job 和DAG Child Job 。成功完成每天运行的 Parent Job 任务后,应触发 Child Job 中的任务。如何添加外部作业触发器?
我的代码
从datetime导入datetime,timedelta 从airflow导入DAG 从airflow.operators.postgres_operator导入PostgresOperator 从utils导入FAILURE_EMAILS 昨天=日期时间bine(datetime.today()-timedelta(1),datetime.min.time()) default_args = {'所有者':'气流','depends_on_past':否,'start_date':昨天,'email':FAILURE_EMAILS,'email_on_failure':False,'email_on_retry':False,'重试':1,'retry_delay':timedelta(minutes = 5)} dag = DAG('Child Job',default_args = default_args,schedule_interval ='@ daily') execute_notebook = PostgresOperator( task_id ='data_sql', postgres_conn_id ='REDSHIFT_CONN', sql = SELECT * FROM athena_rs。装运限制5, dag = dag )解决方案
答案在
Parent_dag
Child_dag
I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?
MY CODE
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.postgres_operator import PostgresOperator from utils import FAILURE_EMAILS yesterday = datetimebine(datetime.today() - timedelta(1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': yesterday, 'email': FAILURE_EMAILS, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily') execute_notebook = PostgresOperator( task_id='data_sql', postgres_conn_id='REDSHIFT_CONN', sql="SELECT * FROM athena_rs.shipments limit 5", dag=dag )解决方案
Answer is in this thread already. Below is demo code:
Parent dag:
from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29), } dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily') leave_work = DummyOperator( task_id='leave_work', dag=dag, ) cook_dinner = DummyOperator( task_id='cook_dinner', dag=dag, ) leave_work >> cook_dinnerChild dag:
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.sensors import ExternalTaskSensor default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29), } dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily') # Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task # when cook_dinner is finished, Child_dag will be triggered wait_for_dinner = ExternalTaskSensor( task_id='wait_for_dinner', external_dag_id='Parent_dag', external_task_id='cook_dinner', start_date=datetime(2020, 4, 29), execution_delta=timedelta(hours=1), timeout=3600, ) have_dinner = DummyOperator( task_id='have_dinner', dag=dag, ) play_with_food = DummyOperator( task_id='play_with_food', dag=dag, ) wait_for_dinner >> have_dinner wait_for_dinner >> play_with_foodImages:
Dags
Parent_dag
Child_dag
更多推荐
如何使用Python在Airflow中成功触发另一个DAG时触发DAG?
发布评论