我运行了以下测试命令:
I ran the following test command:
airflow test events {task_name_redacted} 2018-12-12...并获得以下输出:
...and got the following output:
Dependencies not met for <TaskInstance: events.{redacted} 2018-12-12T00:00:00+00:00 [None]>, dependency 'Task Instance Slots Available' FAILED: The maximum number of running tasks (16) for this task's DAG 'events' has been reached. [2019-01-17 19:47:48,978] {models.py:1556} WARNING - -------------------------------------------------------------------------------- FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt 1 of 6. State set to NONE. -------------------------------------------------------------------------------- [2019-01-17 19:47:48,978] {models.py:1559} INFO - Queuing into pool None我的气流已配置最大并发性为16。这是否意味着我不能在DAG当前正在运行并且已经使用了所有任务插槽的情况下测试任务?
My Airflow is configured with a maximum concurrency of 16. Does this mean that I cannot test a task when the DAG is currently running, and has used all of it's task slots?
还可以吗?从文档中还不清楚,但是气流测试是否实际执行了任务,就好像它是 SparkSubmitOperator ,它实际上会提交作业吗?
Also, it was a little unclear from the docs, but does the airflow test actually execute the task, as in if it was a SparkSubmitOperator, it would actually submit the job?
推荐答案虽然我还没有达到并发至关重要的部署阶段,但是文档确实提供了相当不错的
While I am yet to reach that phase of deployment where concurrency will matter, the docs do give a fairly good indication of problem at hand
由于在任何时间点只有一个 Scheduler 正在运行(并且无论如何您都不应运行多个),实际上无论是否 DAG -运行是实时-运行或测试- 运行,此限制将共同应用于他们。因此,这肯定是一个障碍。
Since at any point of time just one scheduler is running (and you shouldn't be running multiple anyways), indeed it appears that irrespective of whether the DAG-runs are live-runs or test-runs, this limit will apply on them collectively. So that is certainly a hurdle.
#调度程序允许并发运行的任务实例数
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
但是请注意,只是增加这个数字(假设您有足够多的 boxs 对于庞大的 worker s /多个 worker s),还必须调整其他几种配置才能实现我感觉到你想要并行性。
But beware that merely increasing this number (assuming you have big-enough boxes for hefty workers / multiple workers), several other configurations will have to be tweaked as well to achieve the kind of parallelism I sense you want.
它们都列在 [core] 部分
#并行度作为执行程序的设置。 定义了在这种气流装置上应同时运行的任务实例的最大数量
# The amount of parallelism as a setting to the executor. This defines the max number of task instances that should run simultaneously on this airflow installation
parallelism = 32
#不使用池时,任务在默认池中运行,默认池的大小由此配置元素决定
# When not using pools, tasks are run in the "default pool", whose size is guided by this config element
non_pooled_task_slot_count = 128
#每个DAG运行的活动DAG的最大数量
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
但是我们仍然不在那儿,因为一旦您同时生成了这么多任务,后端元数据 -db将开始阻塞。虽然这可能是一个小问题(除非您有一些真正的 DAG s /非常大的 Variable interactions ),仍然值得一提的潜在障碍
But we are still not there, because once you spawn so many tasks simultaneously, the backend metadata-db will start choking. While this is likely a minor problem (and might not be affecting unless you have some real huge DAGs / very large no of Variable interactions in your tasks), its still worth noting as a potential roadblock
#SqlAlchemy池size是池中数据库连接的最大数目。 0表示没有限制。
# The SqlAlchemy pool size is the maximum number of database connections in the pool. 0 indicates no limit.
sql_alchemy_pool_size = 5
#SqlAlchemy池回收是连接在失效之前可以在池中空闲的秒数。此配置不适用于$ sql $。如果超过了数据库连接的数量,则降低的配置值将使系统恢复更快。
# The SqlAlchemy pool recycle is the number of seconds a connection can be idle in the pool before it is invalidated. This config does not apply to sqlite. If the number of DB connections is ever exceeded, a lower config value will allow the system to recover faster.
sql_alchemy_pool_recycle = 1800 >
#断开连接后,多少秒可以重试建立数据库连接。将其设置为0将禁用重试。
# How many seconds to retry re-establishing a DB connection after disconnects. Setting this to 0 disables retries.
sql_alchemy_reconnect_timeout = 300
不用说,所有这些都是除非您选择正确的 executor ,否则这几乎是徒劳的; SequentialExecutor ,特别是仅用于测试
Needless to say, all this is pretty much futile unless you pick the right executor; SequentialExecutor, in particular is only intended for testing
#应该使用。选项包括SequentialExecutor,LocalExecutor,CeleryExecutor,DaskExecutor, KubernetesExecutor
# The executor class that airflow should use. Choices include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = SequentialExecutor
(如果答案比您对您的困惑更大,请原谅我,但是..)
更多推荐
在已经计划并运行Apache Airflow DAG的同时进行测试?
发布评论