想要创建当前任务下游的气流任务

编程入门 行业动态 更新时间:2024-10-27 22:28:46
本文介绍了想要创建当前任务下游的气流任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我对气流几乎是全新的。

I'm mostly brand new to airflow.

我有两个步骤:

  • 获取所有符合条件的文件
  • 解压缩文件
  • 文件压缩后为半个演出,未压缩时为2-3个演出。我可以轻松地一次处理20多个文件,这意味着解压缩所有文件的运行时间可能比任何合理的超时时间都要长

    The files are half a gig compressed, and 2 - 3 gig when uncompressed. I can easily have 20+ files to process at a time, which means uncompressing all of them can run longer than just about any reasonable timeout

    我可以使用XCom来获取步骤1的结果,但是我想做的是这样的:

    I could use XCom to get the results of step 1, but what I'd like to do is something like this:

    def processFiles (reqDir, gvcfDir, matchSuffix): theFiles = getFiles (reqDir, gvcfDir, matchSuffix) for filePair in theFiles: task = PythonOperator (task_id = "Uncompress_" + os.path.basename (theFile), python_callable = expandFile, op_kwargs = {'theFile': theFile}, dag = dag) task.set_upstream (runThis)

    问题是 runThis是调用processFiles的PythonOperator,因此必须在processFiles之后进行声明。

    The problem is that "runThis" is the PythonOperator that called processFiles, so it has to be declared after processFiles.

    有什么方法可以使这项工作?

    Is there any way to make this work?

    这是XCo的原因吗? m存在,我应该放弃这种方法并使用XCom吗?

    Is this the reason that XCom exists, and I should dump this approach and go with XCom?

    推荐答案

    关于您提出的解决方案,我不认为您可以使用XComs实现此目的,因为它们仅对实例可用,而在您定义DAG时则不可用(据我所知)。

    Regarding your proposed solution, I don't think you can use XComs to achieve this, as they are only available to instances and not when you define the DAG (to the best of my knowledge).

    但是,您可以使用 SubDAG 以实现您的目标。 SubDagOperator 获取一个将在执行操作符时调用的函数,该函数将生成DAG,从而为您提供了动态创建以下子段的机会:您的工作流程。

    You can however use a SubDAG to achieve your objective. The SubDagOperator gets a function which is going to be invoked when the operator is going to be executed and that generates a DAG, giving you a chance to dynamically create a sub-section of your workflow.

    您可以使用以下简单示例测试该想法,该示例在每次调用时都会随机生成任务:

    You can test the idea using this simple example, which generates a random of tasks every time it's invoked:

    import airflow from builtins import range from random import randint from airflow.operators.bash_operator import BashOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.models import DAG args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG(dag_id='dynamic_dag', default_args=args) def generate_subdag(parent_dag, dag_id, default_args): # pseudo-randomly determine a number of tasks to be created n_tasks = randint(1, 10) subdag = DAG( '%s.%s' % (parent_dag.dag_id, dag_id), schedule_interval=parent_dag.schedule_interval, start_date=parent_dag.start_date, default_args=default_args ) for i in range(n_tasks): i = str(i) task = BashOperator(task_id='echo_%s' % i, bash_command='echo %s' % i, dag=subdag) return subdag subdag_dag_id = 'dynamic_subdag' SubDagOperator( subdag=generate_subdag(dag, subdag_dag_id, args), task_id=subdag_dag_id, dag=dag )

    如果执行您会注意到,在不同的运行中,SubDAG可能包含不同数量的任务(我在1.8.0版中对此进行了测试)。通过访问图形视图,单击灰色的SubDAG节点,然后单击放大到SubDAG,可以访问WebUI上的SubDAG视图。

    If you execute this you'll notice that in different runs SubDAGs are likely to contain a different number of tasks (I tested this with version 1.8.0). You can access the SubDAG view on the WebUI by accessing the graph view, clicking on the grey SubDAG node and then on "Zoom into SubDAG".

    您可以使用此概念通过列出文件并为每个文件创建一个任务,而不是像示例中那样仅以随机数生成文件。任务本身可以并行排列(如我所做的那样),按顺序排列或按任何有效的有向无环布局排列。

    You can use this concept by listing files and creating one task for each of those instead of just generating them in a random number like in the example. The tasks themselves can be arranged in parallel (as I did), sequentially or in any valid directed acyclic layout.

    更多推荐

    想要创建当前任务下游的气流任务

    本文发布于:2023-11-23 17:41:04,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1622328.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:气流   下游

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!