如何窗口/缓冲IObservable< T>根据Func< T>分成大块.

编程入门 行业动态 更新时间:2024-10-12 12:25:54
本文介绍了如何窗口/缓冲IObservable< T>根据Func< T>分成大块.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

上课:

class Foo { DateTime Timestamp {get; set;} }

...和一个IObservable<Foo>,并且保证单调递增 Timestamp s,我如何才能基于这些Timestamp生成一个IObservable<IList<Foo>>分块到列表中?

...and an IObservable<Foo>, with guaranteed monotonically increasing Timestamps, how can I generate an IObservable<IList<Foo>> chunked into Lists based on those Timestamps?

即每个IList<Foo>应该有五秒钟的事件,或任何其他事件.我知道可以将Buffer与TimeSpan重载一起使用,但是我需要从事件本身中抽出时间,而不是挂钟. (除非这里有巧妙的方法提供IScheduler并将IObservable本身用作.Now的来源?)

I.e. each IList<Foo> should have five seconds of events, or whatever. I know I can use Buffer with a TimeSpan overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler here which uses the IObservable itself as the source of .Now?)

如果我尝试像这样使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)重载:

If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries) overload like so:

IObservable<Foo> foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged(); pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad pub.Connect();

...然后IList实例包含导致窗口关闭的项目,但是我真的希望该项目进入下一个窗口/缓冲区.

...then the IList instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.

例如带有时间戳[0, 1, 10, 11, 15],您将获得[[0], [1, 10], [11, 15]]的块,而不是[[0, 1], [10, 11], [15]]

E.g. with timestamps [0, 1, 10, 11, 15] you will get blocks of [[0], [1, 10], [11, 15]] instead of [[0, 1], [10, 11], [15]]

推荐答案

这是个主意.组键条件是窗口号",我使用GroupByUntil.这样,您便可以在示例中获得所需的输出(并且我已经像该示例一样使用了int流-但您可以替换需要为窗口编号的任何内容).

Here's an idea. The group key condition is the "window number" and I use GroupByUntil. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).

public class Tests : ReactiveTest { public void Test() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable<int>( OnNext(0, 0), OnNext(1, 1), OnNext(10, 10), OnNext(11, 11), OnNext(15, 15), OnCompleted(16, 0)); xs.Publish(ps => // (1) ps.GroupByUntil( p => p / 5, // (2) grp => ps.Where(p => p / 5 != grp.Key)) // (3) .SelectMany(x => x.ToList())) // (4) .Subscribe(Console.WriteLine); scheduler.Start(); } }

注释

  • 我们发布源流,因为我们将订阅不止一次.
  • 这是创建组密钥的功能-使用此密钥可根据您的商品类型生成一个窗口编号.
  • 这是组终止条件-使用此条件检查源流中另一个窗口中的项目.请注意,这意味着直到窗口外部的元素到达或源流终止,窗口才会关闭.如果您考虑一下,这是显而易见的-窗口结束后,所需的输出需要考虑下一个元素.请注意,如果您的源与实时有任何关系,则可以将其与Observable.Timer+Select合并,该Observable.Timer+Select输出您的术语的null/默认实例,以更早地终止流.
  • SelectMany将组放入列表中并拉平流.
  • We publish the source stream because we will subscribe more than once.
  • This is a function to create a group key - use this to generate a window number from your item type.
  • This is the group termination condition - use this to inspect the source stream for an item in another window. Note that means a window won't close until an element outside of it arrives, or the source stream terminates. This is obvious if you think about it - your desired output requires consideration of next element after a window ends. Note if your source bears any relation to real time, you could merge this with an Observable.Timer+Select that outputs a null/default instance of your term to terminate the stream earlier.
  • SelectMany puts the groups into lists and flattens the stream.
  • 如果您包含nuget包rx-testing,则此示例将在LINQPad中很好地运行.新建一个Tests实例,然后运行Test()方法.

    This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test() method.

    更多推荐

    如何窗口/缓冲IObservable&lt; T&gt;根据Func&lt; T&gt;分成大块.

    本文发布于:2023-11-12 10:23:18,感谢您对本站的认可!
    本文链接:https://www.elefans.com/category/jswz/34/1581216.html
    版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
    本文标签:窗口   IObservable   amp   Func   gt

    发布评论

    评论列表 (有 0 条评论)
    草根站长

    >www.elefans.com

    编程频道|电子爱好者 - 技术资讯及电子产品介绍!