我正在尝试从pubnub订阅中创建一个observable。
使用pubnub,您可以订阅实时流,您必须提供三个回调:一个用于新收到的数据,一个用于发生错误,一个用于连接状态更改。我有以下代码(writelines只是为了编译):
var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx" ); 行动< dynamic> userCallback = o => Console.Out.WriteLine(" Data received:{0}",o); 行动< dynamic> connectCallback = o => Console.Out.WriteLine(" Connection changed:{0}",o); 行动< dynamic> errorCallback = o => Console.Out.WriteLine(" Error:{0}",o); pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363e4443ce87",userCallback,connectCallback,errorCallback); IObservable< dynamic> obs = .................
现在我要创建一个obtableable,它将userCallback与onNext和errorCallback绑定到onError。有人可以给我一个指示如何做到这一点。我现在已经使用实现IObservable< T>的自定义类完成了它。但我不相信这是最佳做法?!
亲切的问候,
Arno
解决方案
这是一个相当普遍的模式,Rx解决得非常好。
您希望将服务订阅包装在Observable.Create中,然后可以将各种回调转换为Observable通知:
var stream = Observable.Create< dynamic>( obs => { var pubnub = new Pubnub(" demo"," xxxxxxxxxxxxxxxxxxxxxxxxxxxx"); pubnub.Subscribe(" 057bdc6b-9f9c-44e4-bc1a-363e4443ce87",o => { Console.Out.WriteLine("收到的数据:{0}",o); obs.O nNext(o); }, o => Console.Out.WriteLine(" Connection changed:{0}",o), ex => { Console.Out.WriteLine(" Error:{0}",o); obs.OnError(ex); }); return()=> pubnub.Unsubscribe(...); });这也意味着您还可以在流完成时显式取消订阅基础服务(我假设这将是必需的)。
Hi,
I'm trying to create an observable from a pubnub subscription.
With pubnub you can subscribe to a real time stream and you have to supply three callbacks: One for new data that is received, one for if an error occurred and one for if the connectionstate changes. I have the following code (writelines are just to make it compile):
var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"); Action<dynamic> userCallback = o => Console.Out.WriteLine("Data received: {0}", o); Action<dynamic> connectCallback = o => Console.Out.WriteLine("Connection changed: {0}", o); Action<dynamic> errorCallback = o => Console.Out.WriteLine("Error: {0}", o); pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", userCallback, connectCallback, errorCallback); IObservable<dynamic> obs = .................
Now I want to create an observable that ties the userCallback to onNext and errorCallback to onError. Can someone give me an indication how to do this. I now have done it with a custom class that implements IObservable<T> but I don’t believe that this is best practice?!
Kind regards,
Arno
解决方案This is a fairly common pattern that Rx solves particularly well.
You want to wrap your service subscription in an Observable.Create, which can then turn the various callbacks into Observable notifications:
var stream = Observable.Create<dynamic>( obs => { var pubnub = new Pubnub("demo", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx"); pubnub.Subscribe("057bdc6b-9f9c-44e4-bc1a-363e4443ce87", o => { Console.Out.WriteLine("Data received: {0}", o); obs.OnNext(o); }, o => Console.Out.WriteLine("Connection changed: {0}", o), ex => { Console.Out.WriteLine("Error: {0}", o); obs.OnError(ex); }); return () => pubnub.Unsubscribe(...); });This also means you can also explicitly unsubscribe from the underlying service when your stream completes (I'm assuming this will be required).
更多推荐
从单独的Action< t>创建observable
发布评论