以下Scala代码段似乎未返回:
The following Scala snippet doesn't seem to return:
val queue = Source.queue[Unit](10, OverflowStrategy.fail) .throttle(1, 1 second, 1, ThrottleMode.shaping) .to(Sink.ignore) .run() Await.result( (1 to 15).map(_ => queue.offer(())).last, Duration.Inf)这是Akka流中的错误还是我做错了?
Is this a bug in Akka streams or am I doing something wrong?
编辑:只是回过头来,此错误已在Akka中打开并接受: github/akka/akka/issues/23078
just to circle back, this bug was opened and accepted in Akka: github/akka/akka/issues/23078
推荐答案该程序提供了更多见解
import akka.actor.ActorSystem import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode} import scala.concurrent.Await import scala.concurrent.duration._ object Test extends App { implicit val actorSystem = ActorSystem() implicit val materializer = ActorMaterializer() import actorSystem.dispatcher val (queue, finalFuture) = Source.queue[Unit](10, OverflowStrategy.fail) .map(_ => println("Before throttle")) .throttle(1, 1.second, 1, ThrottleMode.shaping) .map(_ => println("After throttle")) .toMat(Sink.ignore)(Keep.both) .run() finalFuture.onComplete(r => println(s"Materialized future from ignore completed: $r")) Await.result((1 to 25).map(_ => queue.offer(()).map(e => println(s"Offer result: $e"))).last, Duration.Inf) }它为我打印了以下内容:
It prints the following for me:
Offer result: Enqueued After throttle Before throttle Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)但有时会以例外结尾:
Before throttle After throttle Before throttle Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Exception in thread "main" java.lang.IllegalStateException: Stream is terminated. SourceQueue is detached at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:57) at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:56) at akka.stream.stage.CallbackWrapper$$anonfun$invoke$1.apply$mcV$sp(GraphStage.scala:1373) at akka.stream.stage.CallbackWrapper$class.akka$stream$stage$CallbackWrapper$$locked(GraphStage.scala:1379) at akka.stream.stage.CallbackWrapper$class.invoke(GraphStage.scala:1369) at akka.stream.impl.QueueSource$$anon$1.invoke(Sources.scala:47) at akka.stream.impl.QueueSource$$anon$2.offer(Sources.scala:180) at test.Test$$anonfun$4.apply(Test.scala:25) at test.Test$$anonfun$4.apply(Test.scala:25) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at test.Test$.delayedEndpoint$test$Test$1(Test.scala:25) at test.Test$delayedInit$body.apply(Test.scala:10) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76)也就是说,您正在看到并发性-您提交的期货是并行执行的,其中之一他们以失败告终,但更多的是ng。如果您以失败的未来为先的顺序获得它们,则会得到异常,否则将无限期等待。
That is, you're seeing concurrency in action - the futures which you submit are executed in parallel, and one of them finishes with a failure, but much more often they just hang. If you get them in such an order that the failed future comes first, you get an exception, otherwise you get an infinite await.
确定流实际上已终止,您必须像上面一样直接查看它。但最重要的是,您最好不要将配置的事件数最多推送到队列中,或者如果您确实想这样做并且使用 OverflowStrategy.backpressure ,则始终需要等待提交的最后一个将来完成,然后再执行下一个 offer()。
To determine that your stream has actually terminated, you have to look into it directly, like it is done above. But most importantly, you should better push no more than the configured number of events into the queue, or if you do want to do that and you use OverflowStrategy.backpressure, you always need to wait for the last future that you submit to complete before executing the next offer().
更多推荐
使用故障溢出策略时,Akka流Source.queue挂起
发布评论