Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务

编程入门 行业动态 更新时间:2024-10-09 04:21:12
本文介绍了Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

最近,我将Airflow从1.9升级到1.10.3(最新版本)。

但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。

请参阅以下内容: get_monthly_summary-214和get_monthly_summary-215是两个子DAG,可以通过父DAG并发在并行控制器中运行

但当放大到SubDag时,请说get_monthly_summary-214,然后 你可以清楚地看到,一次只有一个任务在运行,其他任务都在排队,它一直以这种方式运行。当我们检查SubDag并发性时,它实际上是我们在代码中指定的8:

我们确实设置了池槽大小,它是32,我们确实有8个芹菜工人来接手排队的任务,并且我们与并发关联的气流配置如下:

# The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation parallelism = 32 # The number of task instances allowed to run concurrently by the scheduler dag_concurrency = 16 # The app name that will be used by celery celery_app_name = airflow.executors.celery_executor # The concurrency that will be used when starting workers with the # "airflow worker" command. This defines the number of task instances that # a worker will take, so size up your workers based on the resources on # your worker box and the nature of your tasks worker_concurrency = 16 此外,所有SubDag都是使用名为mini的队列配置的,而它的所有内部任务都是名为default的默认队列,因为如果我们在同一队列上同时运行SubDag运算符和SubDag内部任务,可能会有一些deadlock problems。我还尝试为所有任务和操作员使用default队列,但没有帮助。

旧版本1.9似乎很好,每个SubDag可以并行执行多个任务,我们有什么遗漏吗?

推荐答案

基于上面发布的@kaxil的发现,如果您仍然希望并行执行子DAG中的任务,解决方案是创建一个包装函数以显式传递executorWhen构造SubDagOperator:

from airflow.operators.subdag_operator import SubDagOperator from airflow.executors import GetDefaultExecutor def sub_dag_operator_with_default_executor(subdag, *args, **kwargs): return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs) 在创建subdag操作符时调用sub_dag_operator_with_default_executor。为了减轻子DAG操作员的工作量performance concerns

我们应该将subdag_OPERATOR的默认执行器更改为SequentialExecutor。子操作员不支持气流池,因此它可能会消耗所有的工作资源(例如,在celeryExecutor中)。这会导致Airflow-74中提到的问题,并限制subdag_OPERATOR的使用。我们通过指定使用顺序执行程序来在生产中使用subdag_OPERATOR。

我们建议创建一个特殊的队列(我们在本例中指定Queue=‘mini’)和芹菜工人来处理subdag_OPERATOR,这样它就不会消耗您普通芹菜工人的所有资源。具体如下:

dag = DAG( dag_id=DAG_NAME, description=f"{DAG_NAME}-{__version__}", ... ) with dag: ur_operator = sub_dag_operator_with_default_executor( task_id=f"your_task_id", subdag=load_sub_dag( parent_dag_name=DAG_NAME, child_dag_name=f"your_child_dag_name", args=args, concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY, ), queue="mini", dag=dag )

那么当您创建您的特殊芹菜工人(我们使用的是2核和3G内存这样的轻量级主机)时,指定AIRFLOW__CELERY__DEFAULT_QUEUE为mini,取决于您希望并行运行多少子DAG操作符,您应该创建多个专门的芹菜工人来负载均衡资源,我们建议每个特殊的芹菜工人一次最多注意2个子DAG操作符,否则会耗尽(例如,在2核和3G内存主机上内存用完)

您还可以通过在Airflow UIVariables配置页面中创建的ENV VARconcurrency_in_sub_dag在Subdag内部调整concurrency。

更新[22/05/2020]以上仅适用于气流(<;=1.10.3,>=1.10.0) 对于超过1.10.3的气流,请使用

from airflow.executors import get_default_executor

相反。

更多推荐

Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务

本文发布于:2023-11-23 21:33:16,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622920.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:性为   Airflow   SubDag

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!