气流外部传感器在戳戳时卡住

编程入门 行业动态 更新时间:2024-10-23 19:30:55
本文介绍了气流外部传感器在戳戳时卡住的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我希望一个dag在另一个dag完成之后开始。一种解决方案是使用外部传感器功能,在下面可以找到我的解决方案。我遇到的问题是依赖dag卡在戳上,我检查了

依赖dag的日志:

解决方案

首先, leader_dag 中的task_id 被命名为 print_date ,但是您设置了任务为 wait_for_task 的dependent_dag ,正在等待 leader_dag 名为 t1 。没有名为 t1 的任务。在 py 文件中分配给它的内容无关紧要,也没有在Airflow数据库中使用,也没有被传感器横向使用。它应该在任务名称 print_date 上等待。

第二个您的日志不与您显示的leader_dag运行所在的队列对齐

最后,我不建议您使用Airflow每分钟安排任务。当然不是两个从属任务在一起。 考虑在诸如Spark的其他系统中编写流作业,或者为此滚动自己的Celery或Dask环境。

您还可以避免使用 ExternalTask​​Sensor ,方法是在Leader_dag的末尾添加 TriggerDagRunOperator 来触发dependent_dag,然后通过设置 schedule_interval 到 None 。

我在您的日志中看到的是领导者的日志来自2018-10-13T19:08:11。充其量这将是执行时间为dated_date 2018-10-13 19:07:00的dagrun,因为从19:07开始的分钟周期在19:08结束,这是可以安排的最早时间。在这种情况下,我发现在计划和执行之间会有大约11秒的延迟。但是,Airflow中可能会有数分钟的调度延迟。

我还看到了 dependent_dag 的日志从19:14:04到19:14:34,并寻找相应的19:13:00 dagrun的完成。没有迹象表明您的调度程序具有足够的时滞,可以在19:14:34之前启动 leader_dag 的19:13:00 dagrun。如果您显示它戳了5分钟左右,您最好说服我。当然,永远不会感觉到 leader_dag.t1 ,因为那不是您命名的所示任务。

因此,Airflow具有计划延迟,如果您的系统中有几千个dag,则可能会超过1分钟,因此如果 c时 catchup = False 可能会互相跟随一些运行IE 19:08、19:09和一些运行跳过一分钟(或6)的运行,例如19:10,然后是19:16,这是可能发生的,因为延迟对于dag来说是随机的-dag的基础上,即使您有正确的任务ID可以等待,传感器也可能始终处于未对准状态,甚至等待:

wait_for_task = ExternalTask​​Sensor( task_id ='wait_for_task', external_dag_id ='leader_dag',-external_task_id ='t1', + external_task_id ='print_date', dag = dag)

I want one dag starts after completion of another dag. one solution is using external sensor function, below you can find my solution. the problem I encounter is that the dependent dag is stuck at poking, I checked this answer and made sure that both of the dags runs on the same schedule, my simplified code is as follows: any help would be appreciated. leader dag:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5), } schedule = '* * * * *' dag = DAG('leader_dag', default_args=default_args,catchup=False, schedule_interval=schedule) t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag)

the dependent dag:

from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.operators.sensors import ExternalTaskSensor default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 10, 8), 'retries': 1, 'retry_delay': timedelta(minutes=5), } schedule='* * * * *' dag = DAG('dependent_dag', default_args=default_args, catchup=False, schedule_interval=schedule) wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', external_dag_id = 'leader_dag', external_task_id='t1', dag=dag) t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t1.set_upstream(wait_for_task)

the log for leader_dag:

the log for dependent dag:

解决方案

First the task_id in the leader_dag is named print_date but you setup your dependent_dag with a task wait_for_task which is waiting on leader_dag's task named t1. There is no task named t1. What you assigned it to in the py file is not relevant, nor used in the Airflow db and transversely by the sensor. It should be waiting on task name print_date.

Second your logs do not line up in which leader_dag run you show for what the dependent_dag is waiting for.

Finally, I can't recommend you use Airflow to schedule tasks every minute. Certainly not two dependent tasks together. Consider writing streaming jobs in a different system like Spark, or rolling your own Celery or Dask environment for this.

You could also avoid the the ExternalTaskSensor by adding a TriggerDagRunOperator to the end of your leader_dag to trigger the dependent_dag, and removing the schedule from that by setting the schedule_interval to None.

What I see in your logs is a log for the leader from 2018-10-13T19:08:11. This at best would be the dagrun for execution_date 2018-10-13 19:07:00 because the minute period starting 19:07 ends at 19:08 which is the earliest it can be scheduled. And I see some delay between scheduling and execution of about 11 seconds if this is the case. However there can be multiple minutes of scheduling lag in Airflow.

I also see a log from the dependent_dag which runs from 19:14:04 to 19:14:34 and is looking for the completion of the corresponding 19:13:00 dagrun. There's no indication that your scheduler is lag free enough to have started the 19:13:00 dagrun of leader_dag by 19:14:34. You could have better convinced me if you showed it poking for 5 minutes or so. Of course it's never going to sense leader_dag.t1 because that isn't what you named the tasks shown.

So, Airflow has scheduling delay, If you had a few 1000 dags in the system, it might be higher than 1 minute, such that a with catchup=False you're going to get some runs following each other IE 19:08, 19:09 and some runs that skip a minute (or 6) like 19:10 followed by 19:16 can happen, and since the delay is a bit random on a dag-by-dag basis, you might get unaligned runs with the sensor waiting for ever, EVEN if you have the correct task id to wait for:

wait_for_task = ExternalTaskSensor( task_id='wait_for_task', external_dag_id='leader_dag', - external_task_id='t1', + external_task_id='print_date', dag=dag)

更多推荐

气流外部传感器在戳戳时卡住

本文发布于:2023-11-23 18:33:54,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622468.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   传感器   戳戳时

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!