Observable.ObserveOn()似乎无效

编程入门 行业动态 更新时间:2024-10-10 09:18:21
本文介绍了Observable.ObserveOn()似乎无效的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试使用Rx并行处理项目.看来我不能告诉Rx并行运行观察者的OnNext().这是演示的测试代码

I am trying to use use Rx to process items in parallel. It seems I can't tell Rx to run my observer's OnNext() in parallel. Here is test code to demonstrate

[Test] public void ObservableObserveOnNewThreadRunsInParallel() { Console.WriteLine("Starting thread: {0}", Thread.CurrentThread.ManagedThreadId); // store items as they are output var list = new List<Tuple<string, int, int, int, TimeSpan>>(); // used to wait until a sequences are complete var ev = new AutoResetEvent(false); // try these schedulers var schedulers = new[] { Tuple.Create("ThreadPoolScheduler.Instance", (IScheduler)ThreadPoolScheduler.Instance), Tuple.Create("NewThreadScheduler.Default", (IScheduler)NewThreadScheduler.Default), Tuple.Create("TaskPoolScheduler.Default", (IScheduler)TaskPoolScheduler.Default), Tuple.Create("Scheduler.Default", (IScheduler)Scheduler.Default), Tuple.Create("Scheduler.Immediate", (IScheduler)Scheduler.Immediate), }; // try each scheduler foreach (var schedulerTuple in schedulers) { // emit tuples <i, delay> where delay decreases with each new tuple // such that output timing is expected to be reversed var observable = Observable.Range(0, 5) .Select(i => Tuple.Create((int)i, (int)(500 - i * 100))) .Take(5); var dt = DateTime.Now; Tuple<string, IScheduler> scheduler = schedulerTuple; observable // specify the scheduler to use .ObserveOn(schedulerTuple.Item2) .Subscribe( v => { // emulate some work (first items take longer than last items) Thread.Sleep(v.Item2); // record when the item is done recording lock (list) list.Add( Tuple.Create( scheduler.Item1, v.Item1, v.Item2, Thread.CurrentThread.ManagedThreadId, dt - DateTime.Now)); }, // let the test go on () => ev.Set()); // wait until the end of the sequence ev.WaitOne(); } // print observed order foreach (var i in list) { Console.WriteLine(i); } }

输出:

Starting thread: 5 (ThreadPoolScheduler.Instance, 0, 500, 9, -00:00:04.2514251) (ThreadPoolScheduler.Instance, 1, 400, 9, -00:00:04.6524652) (ThreadPoolScheduler.Instance, 2, 300, 9, -00:00:04.9524952) (ThreadPoolScheduler.Instance, 3, 200, 9, -00:00:05.1525152) (ThreadPoolScheduler.Instance, 4, 100, 9, -00:00:05.2525252) (NewThreadScheduler.Default, 0, 500, 11, -00:00:06.5066506) (NewThreadScheduler.Default, 1, 400, 11, -00:00:06.9066906) (NewThreadScheduler.Default, 2, 300, 11, -00:00:07.2067206) (NewThreadScheduler.Default, 3, 200, 11, -00:00:07.4067406) (NewThreadScheduler.Default, 4, 100, 11, -00:00:07.5067506) (TaskPoolScheduler.Default, 0, 500, 12, -00:00:00.5020502) (TaskPoolScheduler.Default, 1, 400, 12, -00:00:00.9020902) (TaskPoolScheduler.Default, 2, 300, 12, -00:00:01.2021202) (TaskPoolScheduler.Default, 3, 200, 12, -00:00:01.4021402) (TaskPoolScheduler.Default, 4, 100, 12, -00:00:01.5021502) (Scheduler.Default, 0, 500, 13, -00:00:00.5020502) (Scheduler.Default, 1, 400, 13, -00:00:00.9020902) (Scheduler.Default, 2, 300, 13, -00:00:01.2021202) (Scheduler.Default, 3, 200, 13, -00:00:01.4021402) (Scheduler.Default, 4, 100, 13, -00:00:01.5021502) (Scheduler.Immediate, 0, 500, 5, -00:00:00.5020502) (Scheduler.Immediate, 1, 400, 5, -00:00:00.9040904) (Scheduler.Immediate, 2, 300, 5, -00:00:01.2041204) (Scheduler.Immediate, 3, 200, 5, -00:00:01.4041404) (Scheduler.Immediate, 4, 100, 5, -00:00:01.5041504)

请注意,即使我显式使用 ObserveOn()指定用于通知的调度程序,每个OnNext调用似乎都在等待上一个调用.

Notice how each OnNext call appear to have waited on the previous call even though I explicitly use ObserveOn() to specify the scheduler to use for notifications.

我希望除Scheduler.Immediate之外的所有人并行运行通知.

I expected all but Scheduler.Immediate to run the notifications in parallel.

有人知道我在做什么错吗?

Anyone know what I am doing wrong ?

推荐答案

这是设计使然.Rx的主要合同之一是所有通知都必须序列化.

This is by design. One of Rx's primary contracts is that all notifications must be serialized.

请参见 Rx设计指南中的§§4.2、6.7.

See §§4.2, 6.7 in the Rx Design Guidelines.

可观察对象表示Rx中的并发性,因此要具有重叠的通知,需要两个或多个可观察对象.通知不会在同一观察者中重叠,但是对于每个观察者而言,它们都会重叠.

Observables represent concurrency in Rx, so to have overlapping notifications requires two or more observables. Notifications won't overlap in the same observer, but they'll overlap with respect to each observer.

例如,如果需要同时执行两个方法(观察者),则需要定义两个可观察对象.

For instance, if you need to execute two methods (observers) concurrently, then you need to define two observables.

从技术上讲,并发所需的不是观察者(订阅),而是观察者(订阅).因此,两次订阅相同的 cold 可观察对象可以产生并发性,具体取决于可观察对象使用的调度程序;但是,两次观察相同的 hot 并不会导致并发.(请参阅我的博客文章:冷热观测值.)

Technically, it's the observers (subscriptions) not the observables that are needed for concurrency; therefore, subscribing to the same cold observable twice can produce concurrency, depending upon the scheduler used by the observable; however, subscribing to the same hot observable twice does not result in concurrency. (See my blog post: Hot and Cold Observables.)

ObserveOn 在通过引入并发性的调度程序时引入并发性.但是如何做到这一点而又不违反§6.7合同呢?好了,它将可观察变量分为两个可观察变量:之前运算符和之后运算符!另外,您可以将其视为两个订阅者或观察者:之前和之后. before 观察者是 ObserveOn 提供的内部观察者. after 观察者是您的观察者,或者是查询中下一个运算符提供的观察者.

ObserveOn introduces concurrency when passed a concurrency-introducing scheduler. But how can it do that without violating the §6.7 contract? Well, it splits the observable into two observables: before the operator and after the operator! Alternatively, you can look at it as two subscriptions or observers: before and after. The before observer is an internal observer that ObserveOn provides. The after observer is your observer, or the observer provided by the next operator in the query.

无论您如何看待,之前中的通知都可以相对于之后中的通知同时发生.但是 after 观察者将仅在 after 可观察的上下文中接收序列化的通知.

No matter how you look at it, the notifications in the before observable can occur concurrently with respect to the notifications in the after observable. But the after observer will only receive serialized notifications in the context of the after observable.

更多推荐

Observable.ObserveOn()似乎无效

本文发布于:2023-10-30 18:28:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1543655.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Observable   ObserveOn

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!