我需要遍历形状像树的API。例如,目录结构或讨论线程。可以通过以下流程对其进行建模:
I need to traverse an API that is shaped like a tree. For example, a directory structure or threads of discussion. It can be modeled via the following flow:
type ItemId = Int type Data = String case class Item(data: Data, kids: List[ItemId]) def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString // 0 => [1, 9] // 1 => [10, 19] // 2 => [20, 29] // ... // 9 => [90, 99] // _ => [] // NB. I don't have access to this function, only the itemFlow. def nested(id: ItemId): List[ItemId] = if (id == 0) (1 to 9).toList else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList else Nil val itemFlow: Flow[ItemId, Item, NotUsed] = Flow.fromFunction(id => Item(randomData, nested(id)))如何遍历此数据?我得到了以下工作:
How can I traverse this data? I got the following working:
import akka.NotUsed import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.Await import scala.concurrent.duration.Duration implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val loop = GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val source = b.add(Flow[Int]) val merge = b.add(Merge[Int](2)) val fetch = b.add(itemFlow) val bcast = b.add(Broadcast[Item](2)) val kids = b.add(Flow[Item].mapConcat(_.kids)) val data = b.add(Flow[Item].map(_.data)) val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead) source ~> merge ~> fetch ~> bcast ~> data merge <~ buffer <~ kids <~ bcast FlowShape(source.in, data.out) } val flow = Flow.fromGraph(loop) Await.result( Source.single(0).via(flow).runWith(Sink.foreach(println)), Duration.Inf ) system.terminate()但是,由于我正在使用带有缓冲区的流,因此Stream永远不会完成。
However, since I'm using a flow with a buffer, the Stream will never complete.
在上游完成时完成,并且缓冲的元素已耗尽
Completes when upstream completes and buffered elements have been drained
Flow.buffer
我阅读了图表周期,活跃性和僵局部分,但我仍在努力寻找一个nswer。
I read the Graph cycles, liveness, and deadlocks section multiple times and I'm still struggling to find an answer.
这将创建活动锁:
import java.util.concurrent.atomic.AtomicInteger def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = { // keep track of how many element flows, val remaining = new AtomicInteger(1) // 1 = seed // should be > max loop(x) val bufferSize = 10000 val (ref, publisher) = Source.actorRef[S](bufferSize, OverflowStrategy.fail) .toMat(Sink.asPublisher(true))(Keep.both) .run() ref ! seed Source.fromPublisher(publisher) .via(flow) .map{x => loop(x).foreach{ c => remaining.incrementAndGet() ref ! c } x } .takeWhile(_ => remaining.decrementAndGet > 0) }编辑:我添加了一个git repo来测试您的解决方案 github/MasseGuillaume/source-unfold
I added a git repo to test your solution github/MasseGuillaume/source-unfold
推荐答案我通过编写自己的GraphStage解决了这个问题。
I solved this problem by writing my own GraphStage.
import akka.NotUsed import akka.stream._ import akka.stream.scaladsl._ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import scala.concurrent.ExecutionContext import scala.collection.mutable import scala.util.{Success, Failure, Try} import scala.collection.mutable def unfoldTree[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = { Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize)) } object UnfoldSource { implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal { def dequeueN(n: Int): List[A] = { val b = List.newBuilder[A] var i = 0 while (i < n) { val e = self.dequeue b += e i += 1 } b.result() } } } class UnfoldSource[S, E](seeds: List[S], flow: Flow[S, E, NotUsed], loop: E => List[S], bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldSource.out") override val shape: SourceShape[E] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { // Nodes to expand val frontier = mutable.Queue[S]() frontier ++= seeds // Nodes expanded val buffer = mutable.Queue[E]() // Using the flow to fetch more data var inFlight = false // Sink pulled but the buffer was empty var downstreamWaiting = false def isBufferFull() = buffer.size >= bufferSize def fillBuffer(): Unit = { val batchSize = Math.min(bufferSize - buffer.size, frontier.size) val batch = frontier.dequeueN(batchSize) inFlight = true val toProcess = Source(batch) .via(flow) .runWith(Sink.seq)(materializer) val callback = getAsyncCallback[Try[Seq[E]]]{ case Failure(ex) => { fail(out, ex) } case Success(es) => { val got = es.size inFlight = false es.foreach{ e => buffer += e frontier ++= loop(e) } if (downstreamWaiting && buffer.nonEmpty) { val e = buffer.dequeue downstreamWaiting = false sendOne(e) } else { checkCompletion() } () } } toProcess.onComplete(callback.invoke) } override def preStart(): Unit = { checkCompletion() } def checkCompletion(): Unit = { if (!inFlight && buffer.isEmpty && frontier.isEmpty) { completeStage() } } def sendOne(e: E): Unit = { push(out, e) checkCompletion() } def onPull(): Unit = { if (buffer.nonEmpty) { sendOne(buffer.dequeue) } else { downstreamWaiting = true } if (!isBufferFull && frontier.nonEmpty) { fillBuffer() } } setHandler(out, this) } }更多推荐
如何从Flow中创建递归生成值的akka流源?
发布评论