Akka Stream Source.queue 的背压策略不起作用

编程入门 行业动态 更新时间:2024-10-12 10:18:46
本文介绍了Akka Stream Source.queue 的背压策略不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后.

I'm trying to understand why the below code snippet is doing what it's doing. I would have thought that because the Sink cannot produce demand faster than the Source is producing content, then I would be getting dropped messages in response to some of the offers (overflow strategy is set to Drop Buffer) and also an error and queue closed message after the self destruct piece.

片段:

package playground import java.time.LocalDateTime import java.util.concurrent.atomic.AtomicInteger import akka.actor.{Actor, ActorLogging, ActorSystem, Props} import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed} import akka.stream._ import akka.stream.scaladsl.{Sink, Source} import scala.concurrent.duration._ case object MessageToSink object Playground extends App { implicit val actorSystem = ActorSystem("Playground") implicit val execCntxt = actorSystem.dispatcher val sinkActor = actorSystem.actorOf(Props[Actor2SinkFwder]) actorSystem.scheduler.schedule(1 millisecond, 50 milliseconds, sinkActor, MessageToSink) println(s"Playground has started... ${LocalDateTime.now()}") } class Actor2SinkFwder extends Actor with ActorLogging { implicit val materializer = ActorMaterializer() implicit val execCtxt = context.dispatcher val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer) .to(Sink.foreach[Int] { i => println(s"$i Sinking starts at ${LocalDateTime.now()}") Thread.sleep(150) if (i == 5) throw new RuntimeException("KaBoom!") println(s"$i Sinking completes at ${LocalDateTime.now()}") }).run() val i: AtomicInteger = new AtomicInteger(0) override def receive: Receive = { case MessageToSink => val num = i.incrementAndGet() println(s"$num Sink Command received at ${LocalDateTime.now()}") flow.offer(num).collect { case Enqueued => println(s"$num Enqueued ${LocalDateTime.now}") case Dropped => println(s"$num Dropped ${LocalDateTime.now}") case Failure(err) => println(s"$num Failed ${LocalDateTime.now} $err") case QueueClosed => println(s"$num Failed ${LocalDateTime.now} QueueClosed") } } }

输出:

Playground has started... 2016-12-27T18:35:29.574 1 Sink Command received at 2016-12-27T18:35:29.640 2 Sink Command received at 2016-12-27T18:35:29.642 3 Sink Command received at 2016-12-27T18:35:29.642 1 Sinking starts at 2016-12-27T18:35:29.649 1 Enqueued 2016-12-27T18:35:29.650 4 Sink Command received at 2016-12-27T18:35:29.688 5 Sink Command received at 2016-12-27T18:35:29.738 6 Sink Command received at 2016-12-27T18:35:29.788 1 Sinking completes at 2016-12-27T18:35:29.799 2 Sinking starts at 2016-12-27T18:35:29.800 2 Enqueued 2016-12-27T18:35:29.800 7 Sink Command received at 2016-12-27T18:35:29.838 8 Sink Command received at 2016-12-27T18:35:29.888 9 Sink Command received at 2016-12-27T18:35:29.938 2 Sinking completes at 2016-12-27T18:35:29.950 3 Sinking starts at 2016-12-27T18:35:29.951 3 Enqueued 2016-12-27T18:35:29.951 10 Sink Command received at 2016-12-27T18:35:29.988 11 Sink Command received at 2016-12-27T18:35:30.038 12 Sink Command received at 2016-12-27T18:35:30.088 3 Sinking completes at 2016-12-27T18:35:30.101 4 Sinking starts at 2016-12-27T18:35:30.101 4 Enqueued 2016-12-27T18:35:30.101 13 Sink Command received at 2016-12-27T18:35:30.138 14 Sink Command received at 2016-12-27T18:35:30.189 15 Sink Command received at 2016-12-27T18:35:30.238 4 Sinking completes at 2016-12-27T18:35:30.251 5 Sinking starts at 2016-12-27T18:35:30.251 5 Enqueued 2016-12-27T18:35:30.252 16 Sink Command received at 2016-12-27T18:35:30.288 17 Sink Command received at 2016-12-27T18:35:30.338 18 Sink Command received at 2016-12-27T18:35:30.388 19 Sink Command received at 2016-12-27T18:35:30.438 20 Sink Command received at 2016-12-27T18:35:30.488 21 Sink Command received at 2016-12-27T18:35:30.538 22 Sink Command received at 2016-12-27T18:35:30.588 23 Sink Command received at 2016-12-27T18:35:30.638 24 Sink Command received at 2016-12-27T18:35:30.688 25 Sink Command received at 2016-12-27T18:35:30.738 26 Sink Command received at 2016-12-27T18:35:30.788 etc...

我认为我的误解是关于使用 getAsyncCallback 在 QueueSource 类中.即使 QueueSource 中的 offer 调用使用正确的 Offer 详细信息调用 stageLogic,在前一个元素完成处理之前,不会调用阶段逻辑中此代码的实际处理程序,因此没有用于检查缓冲区大小或应用溢出的逻辑策略正在得到应用...... :-/

I think my miss-understanding is around the use of getAsyncCallback in the QueueSource class. Even though the offer call in the QueueSource invokes the stageLogic with the correct Offer details, the actual handler for this code in the stage logic doesnt get invoked until the previous element has finished processing, so none of the logic for checking buffer sizes or applying Overflow Strategies is getting applied... :-/

推荐答案

要查看您期望的结果,您应该在 Source 和你的 接收器.这是一种告诉 Akka 使用两个不同的 Actor 运行两个阶段的方法——通过强制在两者之间建立异步边界.

To see the result you're expecting, you should add an async stage between your Source and your Sink. This is a way to tell Akka to run the two stages using two distinct Actors - by forcing an asynchronous boundary between the two.

如果没有 async,Akka 将通过在一个 Actor 中粉碎所有内容来优化执行,这将使处理顺序化.在您的示例中,正如您所注意到的,在前一条消息的 Thread.sleep(150) 完成之前,一条消息被提供到队列中.可以找到有关该主题的更多信息 这里.

Without the async, Akka will optimize the execution by smashing everything in one actor, which will sequentialise the processing. In your example, as you noticed, a message is offered to the queue until the Thread.sleep(150) of the previous message have been completed. More info on the topic can be found here.

val flow = Source.queue[Int](bufferSize = 1, overflowStrategy = OverflowStrategy.dropBuffer) .async .to(Sink.foreach[Int] {...}).run()

此外,您应该在匹配 .offer 结果时再添加一种情况.这是Future 的Failure,当队列下游出现故障时,Future 完成.这适用于在前 5 个

Also, you should add one more case when matching the .offer result. This is a Failure of the Future, which the Future gets completed with when the queue downstream has been failed. This applies to all messages offered after the first 5

override def receive: Receive = { case MessageToSink => val num = i.incrementAndGet() println(s"$num Sink Command received at ${LocalDateTime.now()}") flow.offer(num).onComplete { case Success(Enqueued) => println(s"$num Enqueued ${LocalDateTime.now}") case Success(Dropped) => println(s"$num Dropped ${LocalDateTime.now}") case Success(Failure(err)) => println(s"$num Failed ${LocalDateTime.now} $err") case Success(QueueClosed) => println(s"$num Failed ${LocalDateTime.now} QueueClosed") case util.Failure(err) => println(s"$num Failed ${LocalDateTime.now} with exception $err") } }

请注意,即使执行上述所有操作,您也不会看到任何 QueueOfferResult.Dropped 结果.那是因为您选择了 DropBuffer 策略.每个传入的消息都将排队(因此产生一个 Enqueued 消息),踢出现有的缓冲区.如果您将策略更改为 DropNew,您应该会开始看到一些 Dropped 消息.

Note that, even by doing all the above, you will not see any QueueOfferResult.Dropped results. That is because you chose DropBuffer strategy. Every incoming message will be queued (therefore producing an Enqueued message), kicking out the existing buffer. If you change the strategy to DropNew, you should start seeing some Dropped messages.

更多推荐

Akka Stream Source.queue 的背压策略不起作用

本文发布于:2023-11-25 02:33:59,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1627975.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:不起作用   策略   Stream   Akka   queue

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!