上课:
class Foo { DateTime Timestamp {get; set;} }...和一个IObservable<Foo>,并且保证单调递增 Timestamp s,我如何才能基于这些Timestamp生成一个IObservable<IList<Foo>>分块到列表中?
...and an IObservable<Foo>, with guaranteed monotonically increasing Timestamps, how can I generate an IObservable<IList<Foo>> chunked into Lists based on those Timestamps?
即每个IList<Foo>应该有五秒钟的事件,或任何其他事件.我知道可以将Buffer与TimeSpan重载一起使用,但是我需要从事件本身中抽出时间,而不是挂钟. (除非这里有巧妙的方法提供IScheduler并将IObservable本身用作.Now的来源?)
I.e. each IList<Foo> should have five seconds of events, or whatever. I know I can use Buffer with a TimeSpan overload, but I need to take the time from the events themselves, not the wall clock. (Unless there a clever way of providing an IScheduler here which uses the IObservable itself as the source of .Now?)
如果我尝试像这样使用Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries)重载:
If I try to use the Observable.Buffer(this IObservable<Foo> source, IObservable<Foo> bufferBoundaries) overload like so:
IObservable<Foo> foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged(); pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad pub.Connect();...然后IList实例包含导致窗口关闭的项目,但是我真的希望该项目进入下一个窗口/缓冲区.
...then the IList instances contain the item that causes the window to be closed, but I really want this item to go into the next window/buffer.
例如带有时间戳[0, 1, 10, 11, 15],您将获得[[0], [1, 10], [11, 15]]的块,而不是[[0, 1], [10, 11], [15]]
E.g. with timestamps [0, 1, 10, 11, 15] you will get blocks of [[0], [1, 10], [11, 15]] instead of [[0, 1], [10, 11], [15]]
推荐答案这是个主意.组键条件是窗口号",我使用GroupByUntil.这样,您便可以在示例中获得所需的输出(并且我已经像该示例一样使用了int流-但您可以替换需要为窗口编号的任何内容).
Here's an idea. The group key condition is the "window number" and I use GroupByUntil. This gives you the desired output in your example (and I've used an int stream just like that example - but you can substitute whatever you need to number your windows).
public class Tests : ReactiveTest { public void Test() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable<int>( OnNext(0, 0), OnNext(1, 1), OnNext(10, 10), OnNext(11, 11), OnNext(15, 15), OnCompleted(16, 0)); xs.Publish(ps => // (1) ps.GroupByUntil( p => p / 5, // (2) grp => ps.Where(p => p / 5 != grp.Key)) // (3) .SelectMany(x => x.ToList())) // (4) .Subscribe(Console.WriteLine); scheduler.Start(); } }注释
如果您包含nuget包rx-testing,则此示例将在LINQPad中很好地运行.新建一个Tests实例,然后运行Test()方法.
This example will run in LINQPad quite nicely if you include nuget package rx-testing. New up a Tests instance and just run the Test() method.
更多推荐
如何窗口/缓冲IObservable< T>根据Func< T>分成大块.
发布评论