一次强制执行一个异步可观察对象

编程入门 行业动态 更新时间:2024-10-28 17:18:56
本文介绍了一次强制执行一个异步可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

我正在尝试使用 Observable.FromAsync 将一些 TPL 异步集成到更大的 Rx 链中,就像这个小例子一样:

I'm trying to integrate some TPL async into a larger Rx chain using Observable.FromAsync, like in this small example:

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace rxtest
{
    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .SelectMany((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Take(10)
                .LastOrDefaultAsync();
        }
    }
}

但是,我注意到这似乎是同时启动所有异步任务,而不是一次执行一个,这会导致应用程序的内存使用量激增.SelectMany 似乎与 Merge 没有什么不同.

However, I've noticed that this seems to start all the async tasks concurrently rather than doing them one at a time, which causes memory usage of the app to balloon. The SelectMany appears to be acting no different than a Merge.

在这里,我看到这样的输出:

Here, I see output like this:

start:  0
start:  1
start:  2
...

我想看看:

start:  0
finish: 0
start:  1
finish: 1
start:  2
finish: 2
...

我怎样才能做到这一点?

How can I achieve this?

推荐答案

SelectMany 更改为带有 ConcatSelect:

Change the SelectMany to a Select with a Concat:

    static async Task MainAsync()
    {
        await Observable.Generate(new Random(), x => true,
                                  x => x, x => x.Next(250, 500))
            .Take(10)
            .Select((x, idx) => Observable.FromAsync(async ct =>
            {
                Console.WriteLine("start:  " + idx.ToString());
                await Task.Delay(x, ct);
                Console.WriteLine("finish: " + idx.ToString());
                return idx;
            }))
            .Concat()
            .LastOrDefaultAsync();
    }

编辑 - 我将 Take(10) 移到链上,因为 Generate 不会阻塞 - 所以它阻止了它逃跑.

EDIT - I moved the Take(10) up the chain because the Generate won't block - so it stops this running away.

Select 将每个事件投射到一个流中,该流表示将在订阅上启动的异步任务.Concat 接受一个流,并在前一个子流完成时订阅每个连续的子流,将所有流连接成一个平面流.

The Select projects each event into a stream representing an async task that will start on Subscription. Concat accepts a stream of streams and subscribes to each successive sub-stream when the previous has completed, concatenating all the streams into a single flat stream.

这篇关于一次强制执行一个异步可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-05-01 12:25:48,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1409769.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:强制执行   对象

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!