问题描述
限时送ChatGPT账号..我尝试编写控制台可观察对象,如下例所示,但它不起作用.订阅存在一些问题.如何解决这些问题?
静态类程序{静态异步任务 Main(string[] args){//var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount();//有效//var observable = FromConsole().Publish().RefCount();//不起作用var observable = FromConsole();//不起作用observable.Subscribe(Console.WriteLine);等待 Task.Delay(1500);observable.Subscribe(Console.WriteLine);等待新的 TaskCompletionSource().Task;}静态 IObservable从控制台(){return Observable.Create(异步观察者=>{而(真){观察者.OnNext(Console.ReadLine());}});}}
如果我使用 Observable.Interval
,它会订阅两次,并且我有一个输入有两个输出.如果我使用了任何版本的 FromConsole
,我就有一个订阅和一个被阻止的线程.
首先,通常最好避免使用 Observable.Create
来创建 observables - 它当然是为了这个目的,但是它可以创建 observables,由于它们的阻塞性质,它们的行为不像你认为的那样.正如你所发现的!
相反,在可能的情况下,使用内置运算符来创建可观察对象.在这种情况下可以做到这一点.
我的 FromConsole
版本是这样的:
static IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();
Observable.Start
实际上就像可观察对象的 Task.Run
.它为我们调用 Console.ReadLine()
而不会阻塞.
Observable.Defer
/Repeat
对重复调用 Observable.Start(() => Console.ReadLine())
.如果没有 Defer
,它只会调用 Observable.Start
并永远重复返回一个字符串.
这就解决了.
现在,第二个问题是您希望看到 Console.ReadLine()
两个订阅对 FromConsole()
observable 输出的值.>
由于 Console.ReadLine
的工作方式,您从每个订阅中获取值,但一次只能获取一个.试试这个代码:
static async Task Main(string[] args){var observable = FromConsole();observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);await new TaskCompletionSource().Task;}静态 IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();
当我运行时,我得到这样的输出:
1:ddfd2:dff1:dfsdfs2:sdffdfd1:sdfsdfsdf
这样做的原因是每个订阅都会启动对 FromConsole
的新订阅.因此,您有两次对 Console.ReadLine()
的调用,它们有效地排队,并且每个调用只获取每个备用输入.因此 1
& 之间的交替2
.
所以,要解决这个问题,您只需要 .Publish().RefCount()
运算符对.
试试这个:
static async Task Main(string[] args){var observable = FromConsole().Publish().RefCount();observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);await new TaskCompletionSource().Task;}静态 IObservableFromConsole() =>可观察的.Defer(() =>可观察的.Start(() => Console.ReadLine())).重复();
我现在得到:
1:你好2:你好1:世界2:世界
简而言之,它结合了非阻塞 FromConsole
observable 和 .Publish().RefCount()
的使用,这使得它以您的方式工作期待.
I have tried to write console observable as in the example below, but it doesn't work. There are some issues with subscriptions. How to solve these issues?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
If I used Observable.Interval
, it subscribes two times and I have two outputs for one input. If I used any version of FromConsole
, I have one subscription and a blocked thread.
To start with, it is usually best to avoid using Observable.Create
to create observables - it's certainly there for that purpose, but it can create observables that don't behave like you think they should because of their blocking nature. As you've discovered!
Instead, when possible, use the built-in operators to create observables. And that can be done in this case.
My version of FromConsole
is this:
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
Observable.Start
effectively is like Task.Run
for observables. It calls Console.ReadLine()
for us without blocking.
The Observable.Defer
/Repeat
pair repeatedly calls Observable.Start(() => Console.ReadLine())
. Without the Defer
it would just call Observable.Start
and repeatedly return the one string forever.
That solves that.
Now, the second issue is that you want to see the value from the Console.ReadLine()
output by both subscriptions to the FromConsole()
observable.
Due to the way Console.ReadLine
works, you are getting values from each subscription, but only one at a time. Try this code:
static async Task Main(string[] args)
{
var observable = FromConsole();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
When I run that I get this kind of output:
1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
The reason for this is that each subscription starts up a fresh subscription to FromConsole
. So you have two calls to Console.ReadLine()
they effectively queue and each one only gets each alternate input. Hence the alternation between 1
& 2
.
So, to solve this you simply need the .Publish().RefCount()
operator pair.
Try this:
static async Task Main(string[] args)
{
var observable = FromConsole().Publish().RefCount();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
I now get:
1:Hello
2:Hello
1:World
2:World
In a nutshell, it's the combination of the non-blocking FromConsole
observable and the use of .Publish().RefCount()
that makes this work the way you expect.
这篇关于如何制作 IObservable<string>从控制台输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论