我有一个最初收听一个来源的IObservable:
_itemsChanged = Observable.Merge(item1.ObserveItemChanged); _itemsChanged.Subscribe(_ => Console.WriteLine("item changed"));当应用程序运行时,可能会添加其他项目,我想将新项目的流合并到现有项目中:
var item2 = new Item(); _itemsChanged.Merge(item2.ObserveItemChanged);但是,我只在item1更改时才看到输出。 我是否必须以某种方式更新订阅? 我希望合并的流可以立即被接收。
I have an IObservable that is initially listening to one source:
_itemsChanged = Observable.Merge(item1.ObserveItemChanged); _itemsChanged.Subscribe(_ => Console.WriteLine("item changed"));As the app runs, additional items might be added and I want to merge the new item's stream to my existing one:
var item2 = new Item(); _itemsChanged.Merge(item2.ObserveItemChanged);However, I only see output when item1 is changed. Do I have to refresh the subscription somehow? I expected the merged stream to be picked up right away.
最满意答案
基本思想是使用您订阅的Subject<IObservable<T>> 。 这使您可以动态添加更多项目。 这是一种抽象它的方法(主题是Subject<Item>然后我们投影并合并到Observable<T> ):
var subject = new Subject<Item>(); IObserver<Item> newItems = subject; IObservable<T> itemsChanged = subject.SelectMany(item => item.ObserveItemChanged); itemsChanged.Subscribe(_ => Console.WriteLine("change")); // add items newItems.OnNext(item1); newItems.OnNext(item2);The basic idea is to use a Subject<IObservable<T>> that you are subscribed to. This lets you add more items on the fly. Here is a way to abstract it a bit (the subject is a Subject<Item> which we then project and merge into the Observable<T>):
var subject = new Subject<Item>(); IObserver<Item> newItems = subject; IObservable<T> itemsChanged = subject.SelectMany(item => item.ObserveItemChanged); itemsChanged.Subscribe(_ => Console.WriteLine("change")); // add items newItems.OnNext(item1); newItems.OnNext(item2);更多推荐
发布评论