将Akka流源分成两个

编程入门 行业动态 更新时间:2024-10-17 23:25:44
本文介绍了将Akka流源分成两个的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个Akka流源,我想根据一个谓词将其分为两个源。

I have an Akka Streams Source which I want to split into two sources according to a predicate.

例如有一个源(有意简化了类型):

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

和两种方法:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ??? def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

我希望能够拆分来源根据 _。isRight 谓词,将右侧部分传递给 handleSuccess 方法,将左侧部分传递给 handleFailure 方法。

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

我尝试使用 Broadcast 拆分器,但它需要接收器结束。

I tried using Broadcast splitter but it requires Sinks at the end.

推荐答案

尽管您可以选择您要从中检索项目的 Source 无法创建 Source 并产生两个输出的 Source 就像您最终想要的那样。

Although you can choose which side of the Source you want to retrieve items from it's not possible to create a Source that that yields two outputs which is what it seems like you would ultimately want.

给出下面的 GraphStage 基本上将左和右值分成两个输出...

Given the GraphStage below which essentially splits the left and right values into two outputs...

/** * Fans out left and right values of an either * @tparam L left value type * @tparam R right value type */ class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] { import akka.stream.{Attributes, Outlet} import akka.stream.stage.GraphStageLogic override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut") override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var out0demand = false var out1demand = false setHandler(shape.in, new InHandler { override def onPush(): Unit = { if (out0demand && out1demand) { grab(shape.in) match { case Left(l) => out0demand = false push(shape.out0, l) case Right(r) => out1demand = false push(shape.out1, r) } } } }) setHandler(shape.out0, new OutHandler { @scala.throws[Exception](classOf[Exception]) override def onPull(): Unit = { if (!out0demand) { out0demand = true } if (out0demand && out1demand) { pull(shape.in) } } }) setHandler(shape.out1, new OutHandler { @scala.throws[Exception](classOf[Exception]) override def onPull(): Unit = { if (!out1demand) { out1demand = true } if (out0demand && out1demand) { pull(shape.in) } } }) } }

..您可以将它们路由到仅接收一侧:

.. you can route them to only receive one side:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s => import GraphDSL.Implicits._ val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) s ~> eitherFanOut.in eitherFanOut.out0 ~> Sink.ignore SourceShape(eitherFanOut.out1) }) Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

...或者可能更理想,将它们路由到两个单独的 Sink s:

... or probably more desirable, route them to two seperate Sinks:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) => import GraphDSL.Implicits._ val eitherFanOut = b.add(new EitherFanOut[Throwable, String]) s ~> eitherFanOut.in eitherFanOut.out0 ~> l.in eitherFanOut.out1 ~> r.in ClosedShape }) val r = flow.run() Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(导入和初始设置)

import akka.NotUsed import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source} import akka.stream.stage.{GraphStage, InHandler, OutHandler} import akka.stream._ import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Await import scala.concurrent.duration.Duration val classLoader = getClass.getClassLoader implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader) implicit val materializer = ActorMaterializer() val values: List[Either[Throwable, String]] = List( Right("B"), Left(new Throwable), Left(new RuntimeException), Right("B"), Right("C"), Right("G"), Right("I"), Right("F"), Right("T"), Right("A") ) val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)

更多推荐

将Akka流源分成两个

本文发布于:2023-11-25 23:01:35,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1631691.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:两个   Akka

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!