使用故障溢出策略时,Akka流Source.queue挂起

编程入门 行业动态 更新时间:2024-10-12 01:30:05
本文介绍了使用故障溢出策略时,Akka流Source.queue挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

以下Scala代码段似乎未返回:

The following Scala snippet doesn't seem to return:

val queue = Source.queue[Unit](10, OverflowStrategy.fail) .throttle(1, 1 second, 1, ThrottleMode.shaping) .to(Sink.ignore) .run() Await.result( (1 to 15).map(_ => queue.offer(())).last, Duration.Inf)

这是Akka流中的错误还是我做错了?

Is this a bug in Akka streams or am I doing something wrong?

编辑:只是回过头来,此错误已在Akka中打开并接受: github/akka/akka/issues/23078

just to circle back, this bug was opened and accepted in Akka: github/akka/akka/issues/23078

推荐答案

该程序提供了更多见解

import akka.actor.ActorSystem import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode} import scala.concurrent.Await import scala.concurrent.duration._ object Test extends App { implicit val actorSystem = ActorSystem() implicit val materializer = ActorMaterializer() import actorSystem.dispatcher val (queue, finalFuture) = Source.queue[Unit](10, OverflowStrategy.fail) .map(_ => println("Before throttle")) .throttle(1, 1.second, 1, ThrottleMode.shaping) .map(_ => println("After throttle")) .toMat(Sink.ignore)(Keep.both) .run() finalFuture.onComplete(r => println(s"Materialized future from ignore completed: $r")) Await.result((1 to 25).map(_ => queue.offer(()).map(e => println(s"Offer result: $e"))).last, Duration.Inf) }

它为我打印了以下内容:

It prints the following for me:

Offer result: Enqueued After throttle Before throttle Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)

但有时会以例外结尾:

Before throttle After throttle Before throttle Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Enqueued Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!) Exception in thread "main" java.lang.IllegalStateException: Stream is terminated. SourceQueue is detached at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:57) at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:56) at akka.stream.stage.CallbackWrapper$$anonfun$invoke$1.apply$mcV$sp(GraphStage.scala:1373) at akka.stream.stage.CallbackWrapper$class.akka$stream$stage$CallbackWrapper$$locked(GraphStage.scala:1379) at akka.stream.stage.CallbackWrapper$class.invoke(GraphStage.scala:1369) at akka.stream.impl.QueueSource$$anon$1.invoke(Sources.scala:47) at akka.stream.impl.QueueSource$$anon$2.offer(Sources.scala:180) at test.Test$$anonfun$4.apply(Test.scala:25) at test.Test$$anonfun$4.apply(Test.scala:25) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Range.foreach(Range.scala:160) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at test.Test$.delayedEndpoint$test$Test$1(Test.scala:25) at test.Test$delayedInit$body.apply(Test.scala:10) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76)

也就是说,您正在看到并发性-您提交的期货是并行执行的,其中之一他们以失败告终,但更多的是ng。如果您以失败的未来为先的顺序获得它们,则会得到异常,否则将无限期等待。

That is, you're seeing concurrency in action - the futures which you submit are executed in parallel, and one of them finishes with a failure, but much more often they just hang. If you get them in such an order that the failed future comes first, you get an exception, otherwise you get an infinite await.

确定流实际上已终止,您必须像上面一样直接查看它。但最重要的是,您最好不要将配置的事件数最多推送到队列中,或者如果您确实想这样做并且使用 OverflowStrategy.backpressure ,则始终需要等待提交的最后一个将来完成,然后再执行下一个 offer()。

To determine that your stream has actually terminated, you have to look into it directly, like it is done above. But most importantly, you should better push no more than the configured number of events into the queue, or if you do want to do that and you use OverflowStrategy.backpressure, you always need to wait for the last future that you submit to complete before executing the next offer().

更多推荐

使用故障溢出策略时,Akka流Source.queue挂起

本文发布于:2023-11-25 02:32:12,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1627967.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:挂起   故障   策略   Akka   Source

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!