如何使用Python在Airflow中成功触发另一个DAG时触发DAG?

编程入门 行业动态 更新时间:2024-10-07 05:21:08
本文介绍了如何使用Python在Airflow中成功触发另一个DAG时触发DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个python DAG Parent Job 和DAG Child Job 。成功完成每天运行的 Parent Job 任务后,应触发 Child Job 中的任务。如何添加外部作业触发器?

我的代码

从datetime导入datetime,timedelta 从airflow导入DAG 从airflow.operators.postgres_operator导入PostgresOperator 从utils导入FAILURE_EMAILS 昨天=日期时间bine(datetime.today()-timedelta(1),datetime.min.time()) default_args = {'所有者':'气流','depends_on_past':否,'start_date':昨天,'email':FAILURE_EMAILS,'email_on_failure':False,'email_on_retry':False,'重试':1,'retry_delay':timedelta(minutes = 5)} dag = DAG('Child Job',default_args = default_args,schedule_interval ='@ daily') execute_notebook = PostgresOperator( task_id ='data_sql', postgres_conn_id ='REDSHIFT_CONN', sql = SELECT * FROM athena_rs。装运限制5, dag = dag )

解决方案

答案在

Parent_dag

Child_dag

I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can add external job trigger ?

MY CODE

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.postgres_operator import PostgresOperator from utils import FAILURE_EMAILS yesterday = datetimebine(datetime.today() - timedelta(1), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': yesterday, 'email': FAILURE_EMAILS, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily') execute_notebook = PostgresOperator( task_id='data_sql', postgres_conn_id='REDSHIFT_CONN', sql="SELECT * FROM athena_rs.shipments limit 5", dag=dag )

解决方案

Answer is in this thread already. Below is demo code:

Parent dag:

from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29), } dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily') leave_work = DummyOperator( task_id='leave_work', dag=dag, ) cook_dinner = DummyOperator( task_id='cook_dinner', dag=dag, ) leave_work >> cook_dinner

Child dag:

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.sensors import ExternalTaskSensor default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2020, 4, 29), } dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily') # Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task # when cook_dinner is finished, Child_dag will be triggered wait_for_dinner = ExternalTaskSensor( task_id='wait_for_dinner', external_dag_id='Parent_dag', external_task_id='cook_dinner', start_date=datetime(2020, 4, 29), execution_delta=timedelta(hours=1), timeout=3600, ) have_dinner = DummyOperator( task_id='have_dinner', dag=dag, ) play_with_food = DummyOperator( task_id='play_with_food', dag=dag, ) wait_for_dinner >> have_dinner wait_for_dinner >> play_with_food

Images:

Dags

Parent_dag

Child_dag

更多推荐

如何使用Python在Airflow中成功触发另一个DAG时触发DAG?

本文发布于:2023-11-23 21:46:55,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622941.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:如何使用   Python   DAG   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!