我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去.您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,使得 1 个元素进入并不一定意味着 1 个元素通过出口传出.
I have implemented a custom component in akka stream which takes elements as input, groups and merges them based on a key and sends them out through one of a dozen outlets. You can think of this component as a kind of GroupBy component which does not partition the flow into subflows, but actual flows. In addition to partitioning incoming elements, it merges them into one element, i.e. there is some buffering happening inside the component such that 1 element in does not necessarily mean 1 element out through an outlet.
以下是所述组件的简化实现.
Below is a simplified implementation of said component.
class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] { val in = Inlet[A]("CustomGroupBy.in") val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out")) override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs) /* ... */ }我现在如何将该组件的每个出口连接到不同的 Sink 并组合所有这些 Sink 的物化值.
I now what to connect each outlet of that component to a different Sink and combine the materialized value of all these sinks.
我已经尝试了一些使用图形 DSL 的方法,但还没有完全让它工作.有没有人愿意为我提供一个片段来做到这一点,或者为我指明正确的方向?
I have tried a few things with the graph DSL, but have not quite managed to get it working. Would anyone be so kind as to provide me with a snippet to do that or point me in the right direction?
提前致谢!
推荐答案您很可能需要内置的 广播 阶段.示例用法可以在此处找到:
You most likely want the built-in broadcast stage. Example usage can be found here:
val bcast = builder.add(Broadcast[Int](2)) in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out bcast ~> f4 ~> merge更多推荐
Akka Stream 连接到多个接收器
发布评论