我试图了解Airflow是否支持在DAG中跳过临时执行的任务?
让我说一下我的DAG图如下: task1> task2> task3> task4
我想从task3手动启动DAG,最好的方法是什么?
我已经阅读了 ShortCircuitOperator ,但是我正在寻找更多的临时解决方案,一旦执行被触发,它们便可以应用。 / p>
谢谢!
解决方案您可以合并 SkipMixin 表示ShortCircuitOperator 在后台使用来跳过下游任务。
来自airflow.models从airflow.utils.decorators导入BaseOperator,SkipMixin import apply_defaults 类mySkippingOperator(BaseOperator,SkipMixin) @apply_defaults def __init __(self, condition, * args, ** kwargs): super().__ init __(* args,** kwargs) self.condition = condition def execute(self,context): if self.condition: self.log.info('进行下游任务...' ) return self.log.info('跳过下游任务...') 下游_任务=上下文['任务'] .get_flat_relatives(上游= False) self.log.debug(下游task_ids%s,下游任务) 如果下游任务: self.skip(context ['dag_run'] ,context ['ti']。execution_date,下游任务) self.log.info( Done。)
I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?
Lets say my DAG graph look like this: task1 > task2 > task3 > task4
And I would like to start my DAG manually from task3, what is the best way of doing that?
I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.
Thanks!
解决方案You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.
from airflow.models import BaseOperator, SkipMixin from airflow.utils.decorators import apply_defaults class mySkippingOperator(BaseOperator, SkipMixin) @apply_defaults def __init__(self, condition, *args, **kwargs): super().__init__(*args, **kwargs) self.condition = condition def execute(self, context): if self.condition: self.log.info('Proceeding with downstream tasks...') return self.log.info('Skipping downstream tasks...') downstream_tasks = context['task'].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks) self.log.info("Done.")
更多推荐
如何跳过Airflow上的任务?
发布评论