回放历史数据Akka Stream(Playback historical data Akka Stream)

编程入门 行业动态 更新时间:2024-10-28 13:26:15
回放历史数据Akka Stream(Playback historical data Akka Stream)

是否有可能根据Akka Streams定义的时钟发射数据? 或者他们只是在数据到达时放出(忽略背压)? 我特别想知道是否可以用“嘲弄”的时钟回放历史数据,或许可以使用Source.tick 。

Is it possible to emit data according to a defined clock with Akka Streams? Or do they just emit (ignoring backpressure) as fast as their data arrive? I'm particularly wondering if it's possible to playback historical data with a "mocked" clock, somehow using Source.tick perhaps.

最满意答案

这取决于你“定义的时钟”是什么意思。

实际的墙壁时间

正如你所提到的, Source.tick是从系统时钟获取实际时钟的一种可能性。 问题是Sink可能不会以大于或等于Source生成滴答的时间间隔的速率发出需求信号。 例如,您的Sink可能只会每分钟发出一次需求信号,但您在Source.tick中的间隔可能为10 seconds 。 在这种情况下,将从文档中删除5个中间记号:

如果消费者在制作刻度元素时没有请求任何元素,那么它将不会在稍后收到该刻度元素。

模拟时间

始终可以使用Source来模拟时间。

我们可以首先创建一个函数,使用开始时间,结束时间和间隔模拟一个时钟:

type MillisFromEpoch = Long type MillisInterval = Long val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] = (startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] { var currentTime = startTime override def hasNext : Boolean = currentTime < stopTime override def next() : MillisFromEpoch = { val returnMilis = currentTime currentTime += interval return returnMillis } }

这个时钟现在可以提供一个源。 作为一个例子,我们可以创建一个从unix时代开始的时钟,并且增加1秒直到时间结束:

val epoch : MillisFromEpoch = 0L val second : MillisInterval = 1000L val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] = Source fromIterator clock(epoch, Long.MaxValue, 1*second)

或者我们可以创建一个现在开始的时钟,并在60秒内以5秒为间隔递增:

val now : MillisFromEpoch = System.currentTimeMillis() val simulatedClockFromNowSource : Source[MillisFromEpoch,_] = Source fromIterator clock(now, now + 60*second, 5*second)

采样频率

即使下游消费者的速度低于Source指定的时间间隔,也可以使用Source.tick。 我们可以创建一个Flow.filter ,它不断地向Source发送需求信号,但是只会经过一段定义的增量。

我们可以从一个函数开始,用一个内部变量来跟踪时间间隔:

val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean = (interval) => { var lastValidTime : MillisFromEpoch = -1 (timeToCheck) => { if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) { lastValidTime = timeToCheck true } else { false } } }

现在这个函数可以用来创建Flow:

val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] = (frequency) => Flow[MillisFromEpoch] filter frequencySample(frequency)

现在我们可以创建一个具有较高频率(例如1秒)的源频率较低(例如10秒)的流:

val slowFrequency : MillisInterval = 10 * second //simulatedClockFromEpoch ticks every 1 second //frequnencySampleFlow only passes every 10 second tick through val slowSource = simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)

It depends on what you mean by "defined clock".

Actual Wall Time

As you mentioned Source.tick is one possibility to get a clock of actual times coming from the system clock. The problem is that the Sink may not signal demand at a rate that is greater than or equal to the interval that the Source generates ticks. For example, your Sink may only signal demand once every minute but your interval in Source.tick may be 10 seconds. In this case the 5 intermediate ticks will be dropped, from the documentation:

If a consumer has not requested any elements at the point in time when the tick element is produced it will not receive that tick element later.

Simulated Time

It is always possible to simulate time using a Source.

We can first create a function that will simulate a clock using a start time, end time, and interval:

type MillisFromEpoch = Long type MillisInterval = Long val clock : (MillisFromEpoch, MillisFromEpoch, MillisInterval) => () => Iterator[MillisFromEpoch] = (startTime, stopTime, interval) => () => new Iterator[MillisFromEpoch] { var currentTime = startTime override def hasNext : Boolean = currentTime < stopTime override def next() : MillisFromEpoch = { val returnMilis = currentTime currentTime += interval return returnMillis } }

This clock can now feed a Source. As an example we can create a clock that start at unix epoch and increments 1 second until the end of time:

val epoch : MillisFromEpoch = 0L val second : MillisInterval = 1000L val simulatedClockFromEpochSource : Source[MillisFromEpoch,_] = Source fromIterator clock(epoch, Long.MaxValue, 1*second)

Or we can create a clock that starts now, and ends in 60 seconds incrementing by 5 second intervals:

val now : MillisFromEpoch = System.currentTimeMillis() val simulatedClockFromNowSource : Source[MillisFromEpoch,_] = Source fromIterator clock(now, now + 60*second, 5*second)

Sampling Frequency

There is a way to use Source.tick even when the downstream consumer is slower than the tick interval specified at the Source. We can create a Flow.filter that is constantly signaling demand to the Source but will only pass through times that are a defined increment apart.

We can start with a function that does the tracking of the time interval with an internal variable:

val frequencySample : (MillisInterval) => (MillisFromEpoch) => Boolean = (interval) => { var lastValidTime : MillisFromEpoch = -1 (timeToCheck) => { if(lastValidTime < 0 || timeToCheck >= lastValidTime + interval) { lastValidTime = timeToCheck true } else { false } } }

And now this function can be used to create the Flow:

val frequencySampleFlow : (MillisInterval) => Flow[MillisFromEpoch, MillisFromEpoch, _] = (frequency) => Flow[MillisFromEpoch] filter frequencySample(frequency)

Now we can create a Flow that has a slow frequency (e.g. 10 seconds) that is attached to a Source with a higher frequency (e.g. 1 second):

val slowFrequency : MillisInterval = 10 * second //simulatedClockFromEpoch ticks every 1 second //frequnencySampleFlow only passes every 10 second tick through val slowSource = simulatedClockFromEpochSource via frequencySampleFlow(slowFrequency)

更多推荐

本文发布于:2023-07-07 13:34:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1063922.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:历史数据   Akka   Stream   data   historical

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!