如何从Flow中创建递归生成值的akka​​流源?

编程入门 行业动态 更新时间:2024-10-11 09:24:15
本文介绍了如何从Flow中创建递归生成值的akka​​流源?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我需要遍历形状像树的API。例如,目录结构或讨论线程。可以通过以下流程对其进行建模:

I need to traverse an API that is shaped like a tree. For example, a directory structure or threads of discussion. It can be modeled via the following flow:

type ItemId = Int type Data = String case class Item(data: Data, kids: List[ItemId]) def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString // 0 => [1, 9] // 1 => [10, 19] // 2 => [20, 29] // ... // 9 => [90, 99] // _ => [] // NB. I don't have access to this function, only the itemFlow. def nested(id: ItemId): List[ItemId] = if (id == 0) (1 to 9).toList else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList else Nil val itemFlow: Flow[ItemId, Item, NotUsed] = Flow.fromFunction(id => Item(randomData, nested(id)))

如何遍历此数据?我得到了以下工作:

How can I traverse this data? I got the following working:

import akka.NotUsed import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.Await import scala.concurrent.duration.Duration implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val loop = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val source = b.add(Flow[Int]) val merge = b.add(Merge[Int](2)) val fetch = b.add(itemFlow) val bcast = b.add(Broadcast[Item](2)) val kids = b.add(Flow[Item].mapConcat(_.kids)) val data = b.add(Flow[Item].map(_.data)) val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead) source ~> merge ~> fetch ~> bcast ~> data merge <~ buffer <~ kids <~ bcast FlowShape(source.in, data.out) } val flow = Flow.fromGraph(loop) Await.result( Source.single(0).via(flow).runWith(Sink.foreach(println)), Duration.Inf ) system.terminate()

但是,由于我正在使用带有缓冲区的流,因此Stream永远不会完成。

However, since I'm using a flow with a buffer, the Stream will never complete.

在上游完成时完成,并且缓冲的元素已耗尽

Completes when upstream completes and buffered elements have been drained

Flow.buffer

我阅读了图表周期,活跃性和僵局部分,但我仍在努力寻找一个nswer。

I read the Graph cycles, liveness, and deadlocks section multiple times and I'm still struggling to find an answer.

这将创建活动锁:

import java.util.concurrent.atomic.AtomicInteger def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = { // keep track of how many element flows, val remaining = new AtomicInteger(1) // 1 = seed // should be > max loop(x) val bufferSize = 10000 val (ref, publisher) = Source.actorRef[S](bufferSize, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both) .run() ref ! seed Source.fromPublisher(publisher) .via(flow) .map{x => loop(x).foreach{ c => remaining.incrementAndGet() ref ! c } x } .takeWhile(_ => remaining.decrementAndGet > 0) }

编辑:我添加了一个git repo来测试您的解决方案 github/MasseGuillaume/source-unfold

I added a git repo to test your solution github/MasseGuillaume/source-unfold

推荐答案

我通过编写自己的GraphStage解决了这个问题。

I solved this problem by writing my own GraphStage.

import akka.NotUsed import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import scala.concurrent.ExecutionContext import scala.collection.mutable import scala.util.{Success, Failure, Try} import scala.collection.mutable def unfoldTree[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = { Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize)) } object UnfoldSource { implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal { def dequeueN(n: Int): List[A] = { val b = List.newBuilder[A] var i = 0 while (i < n) { val e = self.dequeue b += e i += 1 } b.result() } } } class UnfoldSource[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldSource.out") override val shape: SourceShape[E] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { // Nodes to expand val frontier = mutable.Queue[S]() frontier ++= seeds // Nodes expanded val buffer = mutable.Queue[E]() // Using the flow to fetch more data var inFlight = false // Sink pulled but the buffer was empty var downstreamWaiting = false def isBufferFull() = buffer.size >= bufferSize def fillBuffer(): Unit = { val batchSize = Math.min(bufferSize - buffer.size, frontier.size) val batch = frontier.dequeueN(batchSize) inFlight = true val toProcess = Source(batch) .via(flow) .runWith(Sink.seq)(materializer) val callback = getAsyncCallback[Try[Seq[E]]]{ case Failure(ex) => { fail(out, ex) } case Success(es) => { val got = es.size inFlight = false es.foreach{ e => buffer += e frontier ++= loop(e) } if (downstreamWaiting && buffer.nonEmpty) { val e = buffer.dequeue downstreamWaiting = false sendOne(e) } else { checkCompletion() } () } } toProcess.onComplete(callback.invoke) } override def preStart(): Unit = { checkCompletion() } def checkCompletion(): Unit = { if (!inFlight && buffer.isEmpty && frontier.isEmpty) { completeStage() } } def sendOne(e: E): Unit = { push(out, e) checkCompletion() } def onPull(): Unit = { if (buffer.nonEmpty) { sendOne(buffer.dequeue) } else { downstreamWaiting = true } if (!isBufferFull && frontier.nonEmpty) { fillBuffer() } } setHandler(out, this) } }

更多推荐

如何从Flow中创建递归生成值的akka​​流源?

本文发布于:2023-11-25 11:16:15,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1629527.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:递归   Flow   流源   akka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!