取消 RX.Net Observer 正在进行的 OnNext 方法

编程入门 行业动态 更新时间:2024-10-23 09:41:16
本文介绍了取消 RX.Net Observer 正在进行的 OnNext 方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

限时送ChatGPT账号..

如我的原始问题所述(参见 将相互依赖的事件流与RX.Net) 我有一个 RX 事件流,只要没有触发某个其他事件,它就只会调用观察者的 OnNext 方法(基本上,只要系统连接,就可以处理更改-* 事件,断开连接时暂停并在系统重新连接后重新开始处理 Change-* 事件).

As described in my original question (see Correlate interdependent Event Streams with RX.Net) I have an RX event stream that shall only call the observer's OnNext method as long as a certain other event is not triggered (basically 'Handle Change-* Events as long as the system is connected, pause while disconnected and re-start handling of the Change-* events once the system has re-connected).

然而,虽然这对新事件很顺利,但我如何取消/向正在进行 .OnNext() 调用发出取消信号?

However, while this works smoothly with new events, how would I cancel / signal cancellation to ongoing .OnNext() calls?

推荐答案

由于您的观察者已经被编写为接受 CancellationToken,我们可以修改您的 Rx 流以提供一个与事件数据一起.我们将使用 Rx CancellationDisposable,我们将在取消订阅流时处理它.

Since your observer is already written to accept a CancellationToken, we can just modify your Rx stream to supply one along with the event data. We'll use the Rx CancellationDisposable that we will dispose of whenever the stream is unsubscribed.

// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
    return Observable.Using(
        () => new CancellationDisposable(),
        cts => source.Select(item => Tuple.Create(cts.Token, item)));
}

将其与另一个问题的解决方案放在一起:

Putting this together with the solution from the other question:

DataSourceLoaded
    .SelectMany(_ => DataSourceFieldChanged
        .Throttle(x)
        .CancelOnUnsubscribe()
        .TakeUntil(DataSourceLoaded))
    .Subscribe(c => handler(c.Item1, c.Item2));

TakeUntil 子句被触发时,它将取消订阅 CancelOnUnsubscribe observable,这将依次处理 CancellationDisposable 并导致要取消的令牌.您的观察者可以观察此令牌并在发生这种情况时停止其工作.

When the TakeUntil clause is triggered, it will unsubscribe from the CancelOnUnsubscribe observable, which will in turn dispose of the CancellationDisposable and cause the token to be cancelled. Your observer can watch this token and stop its work when this happens.

这篇关于取消 RX.Net Observer 正在进行的 OnNext 方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

更多推荐

[db:关键词]

本文发布于:2023-05-01 12:24:30,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1409401.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:正在进行   方法   Net   RX   OnNext

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!