如何创建一个可以稍后通过方法调用接收元素的源?

编程入门 行业动态 更新时间:2024-10-22 23:11:26
本文介绍了如何创建一个可以稍后通过方法调用接收元素的源?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我想创建一个 Source 并稍后在其上推送元素,例如:

I would like to create a Source and later push elements on it, like in:

val src = ... // create the Source here // and then, do something like this pushElement(x1, src) pushElement(x2, src)

推荐的方法是什么?

谢谢!

推荐答案

可以通过三种方式实现:

There are three ways this can be achieved:

1.使用 SourceQueue 后期实现

您可以使用 Source.queue 将 Flow 实体化为 SourceQueue:

You can use Source.queue that materializes the Flow into a SourceQueue:

case class Weather(zipCode : String, temperature : Double, raining : Boolean) val bufferSize = 100 //if the buffer fills up then this strategy drops the oldest elements //upon the arrival of a new element. val overflowStrategy = akka.stream.OverflowStrategy.dropHead val queue = Source.queue(bufferSize, overflowStrategy) .filter(!_.raining) .to(Sink foreach println) .run() // in order to "keep" the queue Materialized value instead of the Sink's queue offer Weather("02139", 32.0, true)

2.使用 Actor 进行后期实体化

有一个类似的问答此处,要点是将流具体化为 ActorRef 并向该 ref 发送消息:

There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:

val ref = Source.actorRef[Weather](Int.MaxValue, fail) .filter(!_.raining) .to(Sink foreach println ) .run() // in order to "keep" the ref Materialized value instead of the Sink's ref ! Weather("02139", 32.0, true)

3.使用 Actor 预实现

同样,您可以显式创建一个包含消息缓冲区的 Actor,使用该 Actor 来创建一个源,然后按照答案中的描述发送该 Actor 消息 此处:

Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here:

object WeatherForwarder { def props : Props = Props[WeatherForwarder] } //see provided link for example definition class WeatherForwarder extends Actor {...} val actorRef = actorSystem actorOf WeatherForwarder.props //note the stream has not been instatiated yet actorRef ! Weather("02139", 32.0, true) //stream already has 1 Weather value to process which is sitting in the //ActorRef's internal buffer val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}

更多推荐

如何创建一个可以稍后通过方法调用接收元素的源?

本文发布于:2023-11-25 02:15:58,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1627912.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:创建一个   稍后   元素   方法

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!