问题描述
限时送ChatGPT账号..我正在尝试使用 Observable.FromAsync
将一些 TPL 异步集成到更大的 Rx 链中,就像这个小例子一样:
I'm trying to integrate some TPL async into a larger Rx chain using Observable.FromAsync
, like in this small example:
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
namespace rxtest
{
class Program
{
static void Main(string[] args)
{
MainAsync().Wait();
}
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.SelectMany((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Take(10)
.LastOrDefaultAsync();
}
}
}
但是,我注意到这似乎是同时启动所有异步任务,而不是一次执行一个,这会导致应用程序的内存使用量激增.SelectMany
似乎与 Merge
没有什么不同.
However, I've noticed that this seems to start all the async tasks concurrently rather than doing them one at a time, which causes memory usage of the app to balloon. The SelectMany
appears to be acting no different than a Merge
.
在这里,我看到这样的输出:
Here, I see output like this:
start: 0
start: 1
start: 2
...
我想看看:
start: 0
finish: 0
start: 1
finish: 1
start: 2
finish: 2
...
我怎样才能做到这一点?
How can I achieve this?
推荐答案
将 SelectMany
更改为带有 Concat
的 Select
:>
Change the SelectMany
to a Select
with a Concat
:
static async Task MainAsync()
{
await Observable.Generate(new Random(), x => true,
x => x, x => x.Next(250, 500))
.Take(10)
.Select((x, idx) => Observable.FromAsync(async ct =>
{
Console.WriteLine("start: " + idx.ToString());
await Task.Delay(x, ct);
Console.WriteLine("finish: " + idx.ToString());
return idx;
}))
.Concat()
.LastOrDefaultAsync();
}
编辑 - 我将 Take(10) 移到链上,因为 Generate 不会阻塞 - 所以它阻止了它逃跑.
EDIT - I moved the Take(10) up the chain because the Generate won't block - so it stops this running away.
Select
将每个事件投射到一个流中,该流表示将在订阅上启动的异步任务.Concat
接受一个流,并在前一个子流完成时订阅每个连续的子流,将所有流连接成一个平面流.
The Select
projects each event into a stream representing an async task that will start on Subscription. Concat
accepts a stream of streams and subscribes to each successive sub-stream when the previous has completed, concatenating all the streams into a single flat stream.
这篇关于一次强制执行一个异步可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论