如何控制气流安装的并行性或并发性?

编程入门 行业动态 更新时间:2024-10-09 12:34:52
本文介绍了如何控制气流安装的并行性或并发性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

在我的某些Apache Airflow安装中,即使调度程序似乎未完全加载,预定运行的DAG或任务也不会运行.如何增加可以同时运行的DAG或任务的数量?

In some of my Apache Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler doesn't appear to be fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?

类似地,如果我的安装处于高负载状态,并且我想限制Airflow工作人员拉出排队任务的速度(例如以减少资源消耗),我该如何调整以减少平均负载?

Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks (such as to reduce resource consumption), what can I adjust to reduce the average load?

推荐答案

以下是自Airflow v1.10.2起可用的配置选项的扩展列表.可以在每个DAG或每个操作员的基础上进行设置,但是如果未指定,则可能会落回到设置范围的默认值.

Here's an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.

可以在每个DAG基础上指定的选项:

Options that can be specified on a per-DAG basis:

  • concurrency:已设置为允许在DAG的所有活动运行中同时运行的任务实例的数量.如果未设置,默认为core.dag_concurrency
  • max_active_runs:此DAG的最大活动运行数.一旦达到此限制,调度程序将不会创建新的活动DAG运行.如果未设置,默认为core.max_active_runs_per_dag
  • concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to core.dag_concurrency if not set
  • max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set

示例:

# Only allow one run of this DAG to be running at any given time dag = DAG('my_dag_id', max_active_runs=1) # Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs dag = DAG('example2', concurrency=10, max_active_runs=2)

可以按每个操作员指定的选项:

  • pool:要在其中执行任务的池.池可以是用于限制仅任务的一个子集的并行性
  • task_concurrency:具有相同执行日期的任务运行的并发限制
  • pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
  • task_concurrency: concurrency limit for task runs with the same execution date

示例:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

在整个Airflow设置中指定的选项:

  • core.parallelism:整个Airflow安装中运行的最大任务数
  • core.dag_concurrency:每个DAG可以运行的最大任务数(跨多个 DAG运行)
  • core.non_pooled_task_slot_count:分配给不在池中运行的任务的任务插槽数
  • core.max_active_runs_per_dag:每个DAG的最大活动DAG运行次数
  • scheduler.max_threads:调度程序进程应使用多少个线程来调度DAG
  • celery.worker_concurrency:如果使用CeleryExecutor
  • ,工人一次将处理的最大任务实例数
  • celery.sync_parallelism:CeleryExecutor用于同步任务状态的进程数
  • core.parallelism: maximum number of tasks running across an entire Airflow installation
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: max number of task instances that a worker will process at a time if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state

更多推荐

如何控制气流安装的并行性或并发性?

本文发布于:2023-11-23 21:28:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1622916.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:气流   性或   发性

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!