气流ExternalTask​​Sensor被卡住

编程入门 行业动态 更新时间:2024-10-22 21:42:09
本文介绍了气流ExternalTask​​Sensor被卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试使用ExternalTask​​Sensor,它卡住了已成功完成的另一个DAG任务。

这里是第一个DAG a完成其任务,然后应该触发通过ExternalTask​​Sensor的第二个DAG b。相反,它被卡在为a.first_task戳上。

第一个DAG:

导入日期时间来自气流从airflow.operators.python_operator导入DAG 导入PythonOperator dag = DAG( dag_id ='a', default_args = {'owner':'airflow' ,'start_date':datetime.datetime.now()}, schedule_interval = None ) def do_first_task(): print('第一个任务完成') PythonOperator( task_id ='first_task', python_callable = do_first_task, dag = dag)

第二个DAG:

导入日期时间从airflow.operators.python_operator导入DAG 从airflow.operators.sensors导入PythonOperator 从externalflowSensor dag = DAG( dag_id ='b', default_args = {'owner':'airflow','start_date':datetime.datetime.now()}, schedule_interval =无) def do_second_task ():打印(第二项任务已完成) ExternalTask​​Sensor( task_id ='wait_for_the_first_task_to_be_completed', external_dag_id ='a', external_task_id ='first_task', dag = dag)> > \ PythonOperator( task_id ='second_task', python_callable = do_second_task, dag = dag)

我在这里想念什么?

解决方案

ExternalTask​​Sensor 假定您依赖于dag中的任务

这意味着在您的情况下, a 和 b 需要按照相同的时间表运行(例如每天的9:00 am或w / e)。

否则,您需要使用实例化 ExternalTask​​Sensor 时, execution_delta 或 execution_date_fn 。以下是操作员内部的文档,以帮助进一步阐明:

:paramexecution_delta:与前一次执行到的时间差,默认值为与当前任务相同的execution_date。 对于昨天,请使用[positive!] datetime.timedelta(days = 1)。 execution_delta或execution_date_fn都可以传递给 ExternalTask​​Sensor,但不能两者都传递。 :type execute_delta:datetime.timedelta :param execute_date_fn:接收当前执行日期并返回所需执行日期以进行查询的函数。可以将execution_delta 或execution_date_fn传递给ExternalTask​​Sensor,但不能两者都传递。 :键入execution_date_fn:可调用的

I'm trying to use ExternalTaskSensor and it gets stuck at poking another DAG's task, which has already been successfully completed.

Here, a first DAG "a" completes its task and after that a second DAG "b" through ExternalTaskSensor is supposed to be triggered. Instead it gets stuck at poking for a.first_task.

First DAG:

import datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator dag = DAG( dag_id='a', default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, schedule_interval=None ) def do_first_task(): print('First task is done') PythonOperator( task_id='first_task', python_callable=do_first_task, dag=dag)

Second DAG:

import datetime from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.sensors import ExternalTaskSensor dag = DAG( dag_id='b', default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()}, schedule_interval=None ) def do_second_task(): print('Second task is done') ExternalTaskSensor( task_id='wait_for_the_first_task_to_be_completed', external_dag_id='a', external_task_id='first_task', dag=dag) >> \ PythonOperator( task_id='second_task', python_callable=do_second_task, dag=dag)

What am I missing here?

解决方案

ExternalTaskSensor assumes that you are dependent on a task in a dag run with the same execution date.

This means that in your case dags a and b need to run on the same schedule (e.g. every day at 9:00am or w/e).

Otherwise you need to use the execution_delta or execution_date_fn when you instantiate an ExternalTaskSensor.

Here is the documentation inside the operator itself to help clarify further:

:param execution_delta: time difference with the previous execution to look at, the default is the same execution_date as the current task. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_delta: datetime.timedelta :param execution_date_fn: function that receives the current execution date and returns the desired execution date to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_date_fn: callable

更多推荐

气流ExternalTask​​Sensor被卡住

本文发布于:2023-11-23 20:54:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622838.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   ExternalTask   Sensor

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!