如何跳过Airflow上的任务?

编程入门 行业动态 更新时间:2024-10-13 16:18:57
本文介绍了如何跳过Airflow上的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我试图了解Airflow是否支持在DAG中跳过临时执行的任务?

让我说一下我的DAG图如下: task1> task2> task3> task4

我想从task3手动启动DAG,最好的方法是什么?

我已经阅读了 ShortCircuitOperator ,但是我正在寻找更多的临时解决方案,一旦执行被触发,它们便可以应用。 / p>

谢谢!

解决方案

您可以合并 SkipMixin 表示ShortCircuitOperator 在后台使用来跳过下游任务。

来自airflow.models从airflow.utils.decorators导入BaseOperator,SkipMixin import apply_defaults 类mySkippingOperator(BaseOperator,SkipMixin) @apply_defaults def __init __(self, condition, * args, ** kwargs): super().__ init __(* args,** kwargs) self.condition = condition def execute(self,context): if self.condition: self.log.info('进行下游任务...' ) return self.log.info('跳过下游任务...') 下游_任务=上下文['任务'] .get_flat_relatives(上游= False) self.log.debug(下游task_ids%s,下游任务) 如果下游任务: self.skip(context ['dag_run'] ,context ['ti']。execution_date,下游任务) self.log.info( Done。)

I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!

解决方案

You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

from airflow.models import BaseOperator, SkipMixin from airflow.utils.decorators import apply_defaults class mySkippingOperator(BaseOperator, SkipMixin) @apply_defaults def __init__(self, condition, *args, **kwargs): super().__init__(*args, **kwargs) self.condition = condition def execute(self, context): if self.condition: self.log.info('Proceeding with downstream tasks...') return self.log.info('Skipping downstream tasks...') downstream_tasks = context['task'].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) self.log.info("Done.")

更多推荐

如何跳过Airflow上的任务?

本文发布于:2023-11-23 16:20:11,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622113.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:跳过   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!