热的可观察性和IDisposable(Hot observable and IDisposable)

编程入门 行业动态 更新时间:2024-10-26 13:23:47
热的可观察性和IDisposable(Hot observable and IDisposable)

我想在hot observable和IDisposable对象上找到最佳实践作为事件类型。

假设我的代码将Bitmap对象生成为热可观察对象,并且我有几个订阅者。 例如:

public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory.GetFiles(path, "*.bmp") .ToObservable(scheduler) .Select(x => new Bitmap(x)) .Publish() .RefCount(); } public void Main() { var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); var process1 = images.Subscribe(SaveBwImages); var process2 = images.Subscribe(SaveScaledImages); var process3 = images.Select(Cats).Subscribe(SaveCatsImages); }

所以问题是:处理热量可观察源的一次性资源的最佳实践是什么?

在这个例子中,我想在使用后处理图像,但我无法弄清楚 - 究竟是什么时候?

在订阅订阅事件的顺序中,这并不明显,因此我无法处理“最后”的事件。

提前致谢。

I'd like to find best practices on hot observable and IDisposable objects as event type.

Assume my code produce Bitmap objects as a hot observable and I have several subscribers. For example:

public static IObservable<Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory.GetFiles(path, "*.bmp") .ToObservable(scheduler) .Select(x => new Bitmap(x)) .Publish() .RefCount(); } public void Main() { var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); var process1 = images.Subscribe(SaveBwImages); var process2 = images.Subscribe(SaveScaledImages); var process3 = images.Select(Cats).Subscribe(SaveCatsImages); }

So the question is: what is the best practices to handle disposable resources that are source of a hot observable?

In this example I want to Dispose images after use, but I can't figure out - when exactly?

That is not obvious in which order subscribe events will be called so I cannot dispose on a 'last' one.

Thanks in advance.

最满意答案

你的观察不热。 它是一个带有共享源的冷可观察对象,它只会使后续的观察者表现得好像它们有一个热的可观察量。 它最好被描述为温暖的可观察者。

我们来看一个例子:

var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount(); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); Thread.Sleep(10000); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); }); Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .Publish() .RefCount() .Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });

当我运行这个时,我得到:

A A B C A B C E E E

“B”和“C”观察者错过了序列的第一个值。

并且,在完成“A”,“B”和“C”观察者之后,序列结束,因此“D”永远不会得到值。 我必须创建一个全新的observable才能显示值“E”。

所以,在你的代码中你有一个问题,如果第一个观察者在第二个和第三个订阅之前完成一个或多个值,那么那些观察者会错过值。 那是你要的吗?

然而,你的问题是关于如何处理从一个可观察者返回的一次性价值。 如果您使用Observable.Using这很简单。

这与您的代码类似:

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))) .Publish() .RefCount(); }

现在,如果我运行此代码:

var query = ImagesInFolder(Scheduler.Default); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); Thread.Sleep(10000); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });

我得到这个输出:

A B C Disposed! A B C Disposed! A B C Disposed!

同样“D”永远不会产生任何值 - 并且“B”和“C”可能会错过值,但这确实显示了如何返回一个可观察的值,该值可以在观察者完成时自动处理。

你的代码看起来像这样:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory .GetFiles(path, "*.bmp") .ToObservable(scheduler) .SelectMany(x => Observable .Using( () => new System.Drawing.Bitmap(x), bm => Observable.Return(bm))) .Publish() .RefCount(); }

但是,你仍然处于可能缺失价值的土地上。

因此你需要真正做到这一点:

public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory .GetFiles(path, "*.bmp") .ToObservable(scheduler) .SelectMany(x => Observable .Using( () => new System.Drawing.Bitmap(x), bm => Observable.Return(bm))) .Publish(); }

然后你这样称呼它:

public void Main() { var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); var process1 = images.Subscribe(SaveBwImages); var process2 = images.Subscribe(SaveScaledImages); var process3 = images.Select(Cats).Subscribe(SaveCatsImages); images.Connect(); }

另一个选项是删除整个.Publish().RefCount()代码并确保在订阅时自己正确执行。

试试这段代码:

void Main() { ImagesInFolder(Scheduler.Default) .Publish(iif => Observable .Merge( iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }), iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }), iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; }))) .Subscribe(); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); }

我明白了:

A B C Disposed! A B C Disposed! A B C Disposed!

再次,一个Disposed! 在每个观察者运行之后,但现在的问题是我改变了每个观察者处理的延迟,但代码仍然输出的是观察者被添加的顺序。 问题是Rx按顺序运行每个观察者,并且每个生成的值都是顺序的。

我希望您认为可以使用.Publish()并行处理。 你没有。

.Publish()并行运行的方法是完全删除.Publish() 。

做这种事:

void Main() { ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); }

我现在得到这个:

A Disposed! C Disposed! A Disposed! B Disposed! A Disposed! C Disposed! C Disposed! B Disposed! B Disposed!

代码现在并行运行并尽快完成 - 并在订阅完成时正确处理IDisposable 。 你只是没有获得与每个观察者共享一个可支配资源的乐趣,但你也没有得到所有的行为问题。

Your observable isn't hot. It's a cold observable with a shared source and it only makes the subsequent observers behave as if they got a hot observable. It's probably best described as a warm observable.

Let's look at an example:

var query = Observable.Range(0, 3).ObserveOn(Scheduler.Default).Publish().RefCount(); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); Thread.Sleep(10000); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); }); Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .Publish() .RefCount() .Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("E"); });

When I run this I get:

A A B C A B C E E E

The "B" & "C" observers miss the first value of the sequence.

And, after the "A", "B", and "C" observers are done the sequence is finished, so "D" never gets a value. I've had to create a brand new observable to get the values "E" to display.

So, in your code you have a problem, if the first observer finishes one or more values before the second and third subscribe then those observers miss values. Is that what you want?

Nevertheless, your question asks about how to deal with disposable values returned from an observable. It's simple if you use Observable.Using.

Here's a similar situation to your code:

public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))) .Publish() .RefCount(); }

Now if I run this code:

var query = ImagesInFolder(Scheduler.Default); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); }); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); }); Thread.Sleep(10000); query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });

I get this output:

A B C Disposed! A B C Disposed! A B C Disposed!

Again "D" never produces any values - and it's possible for "B" & "C" to miss values, but this does show how to return an observable value that automatically gets disposed with the observer/s is/are finished.

Your code would look like this:

public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory .GetFiles(path, "*.bmp") .ToObservable(scheduler) .SelectMany(x => Observable .Using( () => new System.Drawing.Bitmap(x), bm => Observable.Return(bm))) .Publish() .RefCount(); }

However, you're still in the land of possibly missing values.

Therefore you need to really do this:

public static IConnectableObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler) { return Directory .GetFiles(path, "*.bmp") .ToObservable(scheduler) .SelectMany(x => Observable .Using( () => new System.Drawing.Bitmap(x), bm => Observable.Return(bm))) .Publish(); }

Then you call it like this:

public void Main() { var images = ImagesInFolder("c:\Users\VASIYA\Desktop\Sample Images", TaskPoolScheduler.Instance); var process1 = images.Subscribe(SaveBwImages); var process2 = images.Subscribe(SaveScaledImages); var process3 = images.Select(Cats).Subscribe(SaveCatsImages); images.Connect(); }

The other option is to drop the whole .Publish().RefCount() code and make sure you do it properly yourself when you subscribe.

Try this code:

void Main() { ImagesInFolder(Scheduler.Default) .Publish(iif => Observable .Merge( iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }), iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }), iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; }))) .Subscribe(); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); }

I get this out:

A B C Disposed! A B C Disposed! A B C Disposed!

Again, one Disposed! after each observer has run, but the problem now is that I changed the delay in the processing of each observer, but the code still output the is the order that the observers were added. The issue is that Rx runs each observer in sequence and each value produced is in sequence.

I expect that you thought you might get parallel processing using .Publish(). You don't.

The way to get this to run in parallel is to drop the .Publish() entirely.

Just do this kind of thing:

void Main() { ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); }

I now get this:

A Disposed! C Disposed! A Disposed! B Disposed! A Disposed! C Disposed! C Disposed! B Disposed! B Disposed!

The code now runs in parallel and finishes as fast as possible - and correctly disposes of the IDisposable when the subscription finishes. You just don't get the joy of sharing a single disposable resource with each observer, but you don't get all of the behavioural issues either.

更多推荐

本文发布于:2023-07-19 09:11:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1176893.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:IDisposable   Hot   observable

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!