RxJava Observable.fromEmitter 奇怪的背压行为

编程入门 行业动态 更新时间:2024-10-26 20:31:01
本文介绍了RxJava Observable.fromEmitter 奇怪的背压行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我一直在使用 Observable.fromEmitter() 作为 Observable.create() 的绝佳替代品.我最近遇到了一些奇怪的行为,我不太明白为什么会这样.我真的很感谢对背压和调度程序有一定了解的人看看这个.

I've been making use of Observable.fromEmitter() as a fantastic alternative to Observable.create(). I've recently run into some weird behaviour and I can't quite work out why this is the case. I'd really appreciate someone with some knowledge on backpressure and schedulers taking a look at this.

public final class EmitterTest { public static void main(String[] args) { Observable<Integer> obs = Observable.fromEmitter(emitter -> { for (int i = 1; i < 1000; i++) { if (i % 5 == 0) { sleep(300L); } emitter.onNext(i); } emitter.onCompleted(); }, Emitter.BackpressureMode.LATEST); obs.subscribeOn(Schedulersputation()) .observeOn(Schedulersputation()) .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128" sleep(10000L); } private static void sleep(Long duration) { try { Thread.sleep(duration); } catch (InterruptedException e) { throw new RuntimeException(e); } } }

这个应用程序的输出是

Received 1 Received 2 ... Received 128

然后它仍然停留在 128(大概是因为这是 RxJava 的默认缓冲区大小).

Then it remains stuck at 128 (assumedly because this is RxJava's default buffer size).

如果我将 fromEmitter() 中指定的模式更改为 BackpressureMode.NONE,则代码按预期工作.如果我删除对 observeOn() 的调用,它也会按预期工作.有人能解释为什么会这样吗?

If I change the mode specified in fromEmitter() to BackpressureMode.NONE, then the code works as intended. If I remove the call to observeOn(), it also works as intended. Is anyone able to explain why this is the case?

推荐答案

这是同池死锁情况.subscribeOn 将下游 request 安排在它正在使用的同一线程上,但如果该线程忙于睡眠/发射循环,则请求永远不会传递到 fromEmitter 因此在一段时间后 LATEST 开始丢弃元素,直到最后一个值(999)被传递,如果主源等待的时间足够长.(这与我们删除的 onBackpressureBlock 的情况类似.)

This is a same-pool deadlock situation. subscribeOn schedules the downstream request on the same thread it is using but if that thread is busy with a sleep/emission loop, the request gets never delivered to fromEmitter and thus after some time LATEST starts to drop elements up until the very end where the very last value (999) is delivered if the main source waits long enough. (This is a similar situation with onBackpressureBlock we removed.)

如果 subscribeOn 没有执行此请求调度,该示例将正常工作.

If subscribeOn didn't do this scheduling of requests, the example would work propery.

我打开了一个问题来找出解决方案.

I've opened an issue to work out the solutions.

目前的解决方法是使用更大的缓冲区大小和 observeOn(有一个过载)或使用 fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

The workaround for now is to use bigger buffer size with observeOn (there's an overload) or use fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

更多推荐

RxJava Observable.fromEmitter 奇怪的背压行为

本文发布于:2023-11-25 12:22:39,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1629733.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:奇怪   RxJava   Observable   fromEmitter

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!