将 Akka Stream Source 一分为二

编程入门 行业动态 更新时间:2024-10-12 14:24:45
本文介绍了将 Akka Stream Source 一分为二的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个 Akka Streams Source,我想根据谓词将其拆分为两个源.

I have an Akka Streams Source which I want to split into two sources according to a predicate.

例如有一个来源(类型被有意简化):

E.g. having a source (types are simplified intentionally):

val source: Source[Either[Throwable, String], NotUsed] = ???

还有两种方法:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ??? def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

我希望能够根据 _.isRight 谓词拆分 source 并将右侧部分传递给 handleSuccess 方法和左侧handleFailure 方法的一部分.

I would like to be able to split the source according to _.isRight predicate and pass the right part to handleSuccess method and left part to handleFailure method.

我尝试使用 Broadcast 拆分器,但它最后需要 Sinks.

I tried using Broadcast splitter but it requires Sinks at the end.

推荐答案

编辑:这个其他答案与 divertTo 是一个比我的更好的解决方案,IMO.我将保留我的答案以供后人使用.


Edit: this other answer with divertTo is a better solution than mine, IMO. I'll leave my answer as-is for posterity.

原始答案:

这在 akka-stream-contrib 中实现为 PartitionWith.将此依赖项添加到 SBT 以将其添加到您的项目中:

This is implemented in akka-stream-contrib as PartitionWith. Add this dependency to SBT to pull it in to your project:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"``` `PartitionWith` is shaped like a `Broadcast(2)`, but with potentially different types for each of the two outlets. You provide it with a predicate to apply to each element, and depending on the outcome, they get routed to the applicable outlet. You can then attach a `Sink` or `Flow` to each of these outlets independently as appropriate. Building on [cessationoftime's example](stackoverflow/a/39744355/147806), with the `Broadcast` replaced with a `PartitionWith`: val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s")) val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s")) val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink) ((_, _, _)) { implicit b => (s, l, r) => import GraphDSL.Implicits._ val pw = b.add( PartitionWith.apply[Either[Throwable, String], Throwable, String](identity) ) eitherSource ~> pw.in pw.out0 ~> leftSink pw.out1 ~> rightSink ClosedShape }) val r = flow.run() Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

更多推荐

将 Akka Stream Source 一分为二

本文发布于:2023-11-25 02:34:36,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1627977.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:为二   Akka   Stream   Source

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!