如何在Airflow中创建条件任务

编程入门 行业动态 更新时间:2024-10-13 04:22:46
本文介绍了如何在Airflow中创建条件任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想在Airflow中创建一个条件任务,如下图所示。预期的情况如下:

I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:

  • 任务1执行
  • 如果任务1成功,则执行任务2a
  • 其他如果任务1失败,则执行任务2b
  • 最后执行任务3
  • Task 1 executes
  • If Task 1 succeed, then execute Task 2a
  • Else If Task 1 fails, then execute Task 2b
  • Finally execute Task 3

上面所有任务都是SSHExecuteOperator。 我想我应该使用ShortCircuitOperator和/或XCom来管理条件,但是我不清楚如何实现。

All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?

推荐答案

您必须使用气流触发规则

所有运算符都有一个trigger_rule参数,用于定义规则触发生成的任务。

All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.

触发规则的可能性:

ALL_SUCCESS = 'all_success' ALL_FAILED = 'all_failed' ALL_DONE = 'all_done' ONE_SUCCESS = 'one_success' ONE_FAILED = 'one_failed' DUMMY = 'dummy'

这是解决您的问题的想法:

Here is the idea to solve your problem:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator from airflow.utils.trigger_rule import TriggerRule from airflow.contrib.hooks import SSHHook sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>) task_1 = SSHExecuteOperator( task_id='task_1', bash_command=<YOUR COMMAND>, ssh_hook=sshHook, dag=dag) task_2 = SSHExecuteOperator( task_id='conditional_task', bash_command=<YOUR COMMAND>, ssh_hook=sshHook, dag=dag) task_2a = SSHExecuteOperator( task_id='task_2a', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ALL_SUCCESS, ssh_hook=sshHook, dag=dag) task_2b = SSHExecuteOperator( task_id='task_2b', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ALL_FAILED, ssh_hook=sshHook, dag=dag) task_3 = SSHExecuteOperator( task_id='task_3', bash_command=<YOUR COMMAND>, trigger_rule=TriggerRule.ONE_SUCCESS, ssh_hook=sshHook, dag=dag) task_2.set_upstream(task_1) task_2a.set_upstream(task_2) task_2b.set_upstream(task_2) task_3.set_upstream(task_2a) task_3.set_upstream(task_2b)

更多推荐

如何在Airflow中创建条件任务

本文发布于:2023-11-23 16:22:40,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622119.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:条件   如何在   Airflow

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!