在Akka Dispatcher上启动时,为什么Futures中的Futures按顺序运行

编程入门 行业动态 更新时间:2024-10-17 02:50:17
本文介绍了在Akka Dispatcher上启动时,为什么Futures中的Futures按顺序运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

当我们尝试从参与者的接收方法中启动多个期货时,我们观察到一种奇怪的行为. 如果我们将配置的调度程序用作ExecutionContext,则期货将在同一线程上按顺序运行.如果我们使用ExecutionContext.Implicits.global,则期货将按预期并行运行.

We observed a strange behavior when we tried to start a number of futures from within an actor's receive method. If we use our configured dispatchers as ExecutionContext, the futures run on the same thread and sequentially. If we use ExecutionContext.Implicits.global, the futures run in parallel as expected.

我们将代码简化为以下示例(下面是一个更完整的示例):

We boiled down the code to the following example (a more complete example is below):

implicit val ec = context.getDispatcher Future{ doWork() } // <-- all running parallel Future{ doWork() } Future{ doWork() } Future{ doWork() } Future { Future{ doWork() } Future{ doWork() } // <-- NOT RUNNING PARALLEL!!! WHY!!! Future{ doWork() } Future{ doWork() } }

一个可编译的示例如下:

A compilable example would be like this:

import akka.actor.ActorSystem import scala.concurrent.{ExecutionContext, Future} object WhyNotParallelExperiment extends App { val actorSystem = ActorSystem(s"Experimental") // Futures not started in future: running in parallel startFutures(runInFuture = false)(actorSystem.dispatcher) Thread.sleep(5000) // Futures started in future: running in sequentially. Why???? startFutures(runInFuture = true)(actorSystem.dispatcher) Thread.sleep(5000) actorSystem.terminate() private def startFutures(runInFuture: Boolean)(implicit executionContext: ExecutionContext): Unit = { if (runInFuture) { Future{ println(s"Start Futures on thread ${Thread.currentThread().getName()}") (1 to 9).foreach(startFuture) println(s"Started Futures on thread ${Thread.currentThread().getName()}") } } else { (11 to 19).foreach(startFuture) } } private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future{ println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}") Thread.sleep(500) println(s"Future $id finished on thread ${Thread.currentThread().getName()}") } }

我们尝试使用线程池执行器和fork-join-executor来获得相同的结果.

We tried with both, thread-pool-executor and fork-join-executor, with the same result.

我们以错误的方式使用期货吗? 然后应该如何生成并行任务?

Are we using futures in the wrong way? How should you then spawn parallel tasks?

推荐答案

来自Akka内部 BatchingExecutor (重点是我):

From the description of Akka's internal BatchingExecutor (emphasis mine):

执行器的Mixin特征,该执行器将多个嵌套的Runnable.run()调用分组为一个可传递给原始执行器的Runnable .这可能是有用的优化,因为它绕过了原始上下文的任务队列,并将相关(嵌套)代码保留在单个线程上,这可以提高CPU的亲和力.但是,如果传递给执行程序的任务阻塞或代价高昂,则此优化可以防止窃取工作并使性能恶化....如果代码在适当的时候不使用scala.concurrent.blocking,则批处理执行程序可以创建死锁,因为在内部执行的任务其他任务将阻止外部任务的完成.

Mixin trait for an Executor which groups multiple nested Runnable.run() calls into a single Runnable passed to the original Executor. This can be a useful optimization because it bypasses the original context's task queue and keeps related (nested) code on a single thread which may improve CPU affinity. However, if tasks passed to the Executor are blocking or expensive, this optimization can prevent work-stealing and make performance worse....A batching executor can create deadlocks if code does not use scala.concurrent.blocking when it should, because tasks created within other tasks will block on the outer task completing.

如果您使用的是混合在BatchingExecutor中的调度程序,即 MessageDispatcher -您可以使用scala.concurrent.blocking构造来启用嵌套期货的并行性:

If you're using a dispatcher that mixes in BatchingExecutor--namely, a subclass of MessageDispatcher--you could use the scala.concurrent.blocking construct to enable parallelism with nested Futures:

Future { Future { blocking { doBlockingWork() } } }

在您的示例中,您将在startFuture方法中添加blocking:

In your example, you would add blocking in the startFuture method:

private def startFuture(id: Int)(implicit executionContext: ExecutionContext): Future[Unit] = Future { blocking { println(s"Future $id should run for 500 millis on thread ${Thread.currentThread().getName()}") Thread.sleep(500) println(s"Future $id finished on thread ${Thread.currentThread().getName()}") } }

运行startFutures(true)(actorSystem.dispatcher)并进行上述更改的示例输出:

Sample output from running startFutures(true)(actorSystem.dispatcher) with the above change:

Start Futures on thread Experimental-akka.actor.default-dispatcher-2 Started Futures on thread Experimental-akka.actor.default-dispatcher-2 Future 1 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-2 Future 3 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-3 Future 5 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-6 Future 7 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-7 Future 4 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-5 Future 9 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-10 Future 6 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-8 Future 8 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-9 Future 2 should run for 500 millis on thread Experimental-akka.actor.default-dispatcher-4 Future 1 finished on thread Experimental-akka.actor.default-dispatcher-2 Future 3 finished on thread Experimental-akka.actor.default-dispatcher-3 Future 5 finished on thread Experimental-akka.actor.default-dispatcher-6 Future 4 finished on thread Experimental-akka.actor.default-dispatcher-5 Future 8 finished on thread Experimental-akka.actor.default-dispatcher-9 Future 7 finished on thread Experimental-akka.actor.default-dispatcher-7 Future 9 finished on thread Experimental-akka.actor.default-dispatcher-10 Future 6 finished on thread Experimental-akka.actor.default-dispatcher-8 Future 2 finished on thread Experimental-akka.actor.default-dispatcher-4

更多推荐

在Akka Dispatcher上启动时,为什么Futures中的Futures按顺序运行

本文发布于:2023-11-25 15:16:48,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1630261.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:启动时   顺序   Akka   Dispatcher   Futures

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!