在已经计划并运行Apache Airflow DAG的同时进行测试?

编程入门 行业动态 更新时间:2024-10-23 20:26:21
本文介绍了在已经计划并运行Apache Airflow DAG的同时进行测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我运行了以下测试命令:

I ran the following test command:

airflow test events {task_name_redacted} 2018-12-12

...并获得以下输出:

...and got the following output:

Dependencies not met for <TaskInstance: events.{redacted} 2018-12-12T00:00:00+00:00 [None]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (16) for this task's DAG 'events' has been reached. [2019-01-17 19:47:48,978] {models.py:1556} WARNING - -------------------------------------------------------------------------------- FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 6. State set to NONE. -------------------------------------------------------------------------------- [2019-01-17 19:47:48,978] {models.py:1559} INFO - Queuing into pool None

我的气流已配置最大并发性为16。这是否意味着我不能在DAG当前正在运行并且已经使用了所有任务插槽的情况下测试任务?

My Airflow is configured with a maximum concurrency of 16. Does this mean that I cannot test a task when the DAG is currently running, and has used all of it's task slots?

还可以吗?从文档中还不清楚,但是气流测试是否实际执行了任务,就好像它是 SparkSubmitOperator ,它实际上会提交作业吗?

Also, it was a little unclear from the docs, but does the airflow test actually execute the task, as in if it was a SparkSubmitOperator, it would actually submit the job?

推荐答案

虽然我还没有达到并发至关重要的部署阶段,但是文档确实提供了相当不错的

While I am yet to reach that phase of deployment where concurrency will matter, the docs do give a fairly good indication of problem at hand

  • 由于在任何时间点只有一个 Scheduler 正在运行(并且无论如何您都不应运行多个),实际上无论是否 DAG -运行是实时-运行或测试- 运行,此限制将共同应用于他们。因此,这肯定是一个障碍。

  • Since at any point of time just one scheduler is running (and you shouldn't be running multiple anyways), indeed it appears that irrespective of whether the DAG-runs are live-runs or test-runs, this limit will apply on them collectively. So that is certainly a hurdle.

    #调度程序允许并发运行的任务实例数

    # The number of task instances allowed to run concurrently by the scheduler

    dag_concurrency = 16

  • 但是请注意,只是增加这个数字(假设您有足够多的 boxs 对于庞大的 worker s /多个 worker s),还必须调整其他几种配置才能实现我感觉到你想要并行性。

  • But beware that merely increasing this number (assuming you have big-enough boxes for hefty workers / multiple workers), several other configurations will have to be tweaked as well to achieve the kind of parallelism I sense you want.

    它们都列在 [core] 部分

    #并行度作为执行程序的设置。 定义了在这种气流装置上应同时运行的任务实例的最大数量

    # The amount of parallelism as a setting to the executor. This defines the max number of task instances that should run simultaneously on this airflow installation

    parallelism = 32

    #不使用池时,任务在默认池中运行,默认池的大小由此配置元素决定

    # When not using pools, tasks are run in the "default pool", whose size is guided by this config element

    non_pooled_task_slot_count = 128

    #每个DAG运行的活动DAG的最大数量

    # The maximum number of active DAG runs per DAG

    max_active_runs_per_dag = 16

  • 但是我们仍然不在那儿,因为一旦您同时生成了这么多任务,后端元数据 -db将开始阻塞。虽然这可能是一个小问题(除非您有一些真正的 DAG s /非常大的 Variable interactions ),仍然值得一提的潜在障碍

  • But we are still not there, because once you spawn so many tasks simultaneously, the backend metadata-db will start choking. While this is likely a minor problem (and might not be affecting unless you have some real huge DAGs / very large no of Variable interactions in your tasks), its still worth noting as a potential roadblock

    #SqlAlchemy池size是池中数据库连接的最大数目。 0表示没有限制。

    # The SqlAlchemy pool size is the maximum number of database connections in the pool. 0 indicates no limit.

    sql_alchemy_pool_size = 5

    #SqlAlchemy池回收是连接在失效之前可以在池中空闲的秒数。此配置不适用于$ sql $。如果超过了数据库连接的数量,则降低的配置值将使系统恢复更快。

    # The SqlAlchemy pool recycle is the number of seconds a connection can be idle in the pool before it is invalidated. This config does not apply to sqlite. If the number of DB connections is ever exceeded, a lower config value will allow the system to recover faster.

    sql_alchemy_pool_recycle = 1800 >

    #断开连接后,多少秒可以重试建立数据库连接。将其设置为0将禁用重试。

    # How many seconds to retry re-establishing a DB connection after disconnects. Setting this to 0 disables retries.

    sql_alchemy_reconnect_timeout = 300

  • 不用说,所有这些都是除非您选择正确的 executor ,否则这几乎是徒劳的; SequentialExecutor ,特别是仅用于测试

  • Needless to say, all this is pretty much futile unless you pick the right executor; SequentialExecutor, in particular is only intended for testing

    #应该使用。选项包括SequentialExecutor,LocalExecutor,CeleryExecutor,DaskExecutor, KubernetesExecutor

    # The executor class that airflow should use. Choices include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor

    executor = SequentialExecutor

  • 但是 BaseOperator 像 depends_on_past , wait_for_downstream 一样,也会破坏聚会 li>
  • But then params to BaseOperator like depends_on_past, wait_for_downstream are there to spoil the party as well
  • 最后,我把这个留给你与气流 + 火花组合相关的链接:如何从Airflow向EMR群集提交Spark作业?
  • Finally I leave you with this link related to Airflow + Spark combination: How to submit Spark jobs to EMR cluster from Airflow?
  • (如果答案比您对您的困惑更大,请原谅我,但是..)

    更多推荐

    在已经计划并运行Apache Airflow DAG的同时进行测试?

    本文发布于:2023-11-23 18:46:59,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1622502.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:进行测试   计划   Airflow   Apache   DAG

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!