如何通过条件任务运行气流DAG

编程入门 行业动态 更新时间:2024-10-12 01:22:23
本文介绍了如何通过条件任务运行气流DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

总共有6个任务,这些任务需要根据输入json中出现的一个字段的( flag_value )值执行. 如果 flag_value 的值为true,则所有任务都需要以以下方式执行: 然后,第一任务1平行于(任务2和任务3在一起),平行于任务4,平行于任务5. 完成所有步骤后,再执行task6. 由于是气流和DAG的新手,我不知道如何在这种情况下运行.

There are total 6 tasks are there.These tasks need to get execute based on one field's(flag_value) value which is coming in input json. If the value of flag_value is true then all tasks need to get execute in such a way that , First task1 then parallell to (task2 & task3 together), parallell to task4, parallell to task5. Once all this finishes then task6. Since am new to airflow and DAG i dont know how to run for this condition.

如果 flag_value 的值为false,则该顺序仅是顺序的 task_1>> task_4>> task_5>> task_6.

If the value of flag_value is false then the order is sequential only task_1 >> task_4 >> task_5 >> task_6.

下面是我的DAG代码.

Below is my code for the DAG.

from airflow import DAG from datetime import datetime from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator default_args = { 'owner': 'airflow', 'depends_on_past': False } dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 8)) #################### CREATE TASK ##################################### task_1 = DatabricksSubmitRunOperator( task_id='task_1', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_1/task_1.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_1.driver.TestClass1', 'parameters' : [ '{{ dag_run.conf.json }}' ] } ) task_2 = DatabricksSubmitRunOperator( task_id='task_2', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_2/task_2.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_2.driver.TestClass2', 'parameters' : [ '{{ dag_run.conf.json }}' ] } ) task_3 = DatabricksSubmitRunOperator( task_id='task_3', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_3/task_3.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_3.driver.TestClass3', 'parameters' : [ '{{ dag_run.conf.json }}' ] } ) task_4 = DatabricksSubmitRunOperator( task_id='task_4', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_4/task_4.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_4.driver.TestClass4', 'parameters' : [ '{{ dag_run.conf.json }}' ] } ) task_5 = DatabricksSubmitRunOperator( task_id='task_5', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_5/task_5.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_5.driver.TestClass5', 'parameters' : [ 'json ={{ dag_run.conf.json }}' ] } ) task_6 = DatabricksSubmitRunOperator( task_id='task_6', databricks_conn_id='connection_id_details', existing_cluster_id='{{ dag_run.conf.clusterId }}', libraries= [ { 'jar': 'dbfs:/task_6/task_6.jar' } ], spark_jar_task={ 'main_class_name': 'com.task_6.driver.TestClass6', 'parameters' : ['{{ dag_run.conf.json }}' ] } ) flag_value='{{ dag_run.conf.json.flag_value }}' #################### ORDER OF OPERATORS ########################### if flag_value == 'true': task_1.dag = dag task_2.dag = dag task_3.dag = dag task_4.dag = dag task_5.dag = dag task_6.dag = dag task_1 >> [task_2 , task_3] >> [task_4] >> [task_5] >> task_6 // Not sure correct else: task_1.dag = dag task_4.dag = dag task_5.dag = dag task_6.dag = dag task_1 >> task_4 >> task_5 >> task_6

推荐答案

首先,依赖关系不正确,这应该可以工作:

First of all, dependency is not correct, this should work:

task_1 >> [task_2 , task_3] >> task_4 >> task_5 >> task_6

不可能用list_1 >> list_2来订购任务,但是有提供此帮助的方法,请参见:跨下游.

It is not possible to order tasks with list_1 >> list_2, but there are helper methods to provide this, see: cross_downstream.

对于分支,您可以使用BranchPythonOperator更改触发规则您的任务.不确定以下代码,可能会有一些小错误,但是这里的想法可行.

For branching, you can use BranchPythonOperator with changing trigger rules of your tasks. Not sure about the following code, it could have minor errors, but the idea here works.

task_4.trigger_rule = "none_failed" dummy = DummyOperator(task_id="dummy", dag=dag) branch = BranchPythonOperator( task_id="branch", # jinja template returns string "True" or "False" python_callable=lambda f: ["task_2" , "task_3"] if f == "True" else "dummy", op_kwargs={"f": flag_value}, dag=dag) task_1 >> branch branch >> [task_2 , task_3, dummy] >> task_4 task_4 >> task_5 >> task_6

可能会有更好的方法.

更多推荐

如何通过条件任务运行气流DAG

本文发布于:2023-10-18 23:24:01,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1505718.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   条件   DAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!