气流未正确调度Python

编程入门 行业动态 更新时间:2024-10-11 13:25:36
本文介绍了气流未正确调度Python的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

代码:

Python版本2.7.x和气流版本1.5.1

我的dag脚本是这个

from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'schedule_interval':timedelta(minutes=5), 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing', default_args=default_args) run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)

从中您可以看到我正在使用6创建DAG任务第一个任务(Start1)首先开始,然后其他所有五个任务开始

From that you could see that I am creating a DAG with 6 tasks the first task(Start1) starts first after which all the other five tasks starts

目前,我已经给DAG启动之间间隔了5分钟

Currently I have given 5 minutes time delay between DAG's starting

对于第一个类型的所有六个任务,它运行都非常完美,但是五分钟后DAG没有重新启动

It has ran perfectly for all the six tasks the first type but after five minutes the DAG is not re-initiated

然后1个小时仍然没有重新启动DAG,我真的不知道我错了。

It has been more then 1 hour still the DAG is not re-initiated I really don't know were I am wrong .

如果有人可以指出我的意思,那将是非常好的是错的。我尝试使用气流测试清除进行清除,然后发生相同的事情。它首先运行,然后才站在那儿。

It would be really nice if some one could point me out what is wrong .I tried clearing using airflow testing clear then to the same thing happen.It ran first instance then just stood there.

命令行唯一显示的是获取所有实例以进行DAG测试

当我更改schedule_interval的位置仅在没有任何调度间隔并行的情况下运行,即在5分钟内完成300个或更多任务实例。没有5分钟的计划时间间隔

When I change the position of the schedule_interval it just runs with out any schedule interval parallel.That is with in 5 minutes 300 or more task instance has been completed. There is no 5 minute schedule interval

代码2:

from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first)

推荐答案

对于代码2,我猜它每分钟运行的原因是:

For Code 2, I guess the reason why it runs every minute is:

  • 开始时间是2015-10-13 00:00

  • The start time is 2015-10-13 00:00

    计划时间间隔是5分钟

    每个调度程序的心跳(默认情况下为5秒),您的DAG都会被检查

    Every heartbeat of scheduler(5 seconds by default), your DAG will be checked

    • 第一次检查:开始日期(找不到最后执行日期)+调度程序间隔<当前时间?如果是,则将执行DAG,并记录最后的执行时间。 (例如2015-10-13 00:00 + 5分钟<当前?)
    • 对下一个心跳的第二次检查:上次执行时间+调度程序间隔<当前时间?如果是这样,DAG将再次执行。
    • ....

    该解决方案将DAG开始日期设置为 datetime.now()-schedule_interval 。

    The solution is set the DAG start_date as datetime.now() - schedule_interval.

    还有如果要调试:

  • 设置在settings.py

  • Setting the LOGGINGLEVEL to debug in settings.py

    修改类方法 is_queueable()中,LOGGINGLEVEL调试调试 airflow.models.TaskInstance的到

    def is_queueable(self, flag_upstream_failed=False): logging.debug('Checking whether task instance is queueable or not!') if self.execution_date > datetime.now() - self.task.schedule_interval: logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) return False ...
  • 更多推荐

    气流未正确调度Python

    本文发布于:2023-11-23 19:19:14,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1622588.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:气流   正确   Python

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!