为什么我的 RxJava Flowable 在使用 observeOn 时不尊重背压?

编程入门 行业动态 更新时间:2024-10-10 08:22:09
本文介绍了为什么我的 RxJava Flowable 在使用 observeOn 时不尊重背压?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我正在尝试创建一个 Flowable,它发出有关背压的事件以避免内存问题,同时并行运行每个转换阶段以提高效率.我创建了一个简单的测试程序来推理我的程序不同步骤的行为,以及何时发出事件与等待不同阶段.

I am trying to create a Flowable which emits events respecting backpressure to avoid memory issues, while running each stage of transformation in parallel for efficiency. I have created a simple test program to reason about the behavior of the different steps of my program and when events are being emitted vs. waiting on different stages.

我的程序如下:

public static void main(String[] args) throws ExecutionException, InterruptedException { Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) .stream().map(i -> { System.out.println("emitting:" + i); return i; }); Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); Long count = flowable.onBackpressureBuffer(10) .buffer(10) .flatMap(buf -> { System.out.println("Sleeping 500 for batch"); Thread.sleep(500); System.out.println("Got batch of events"); return Flowable.fromIterable(buf); }, 1) .map(x -> x + 1) .doOnNext(i -> { System.out.println(String.format("Sleeping : %d", i)); Thread.sleep(100); System.out.println(i); }) .count() .blockingGet(); System.out.println("count: " + count); }

当我运行这个时,我得到的输出符合预期的背压,其中一批事件被发送到 buffer 中的大小,然后它们被平面映射,最后采取一些行动它们是一一打印的:

When I run this, I get output that respects backpressure as expected, where a batch of events is emmited up to the size in buffer, then they are flatmapped, and finally some action is taken where they are printed one-by-one:

Buffer size: 128 emitting:0 emitting:1 emitting:2 emitting:3 emitting:4 emitting:5 emitting:6 emitting:7 emitting:8 emitting:9 Sleeping 500 for batch Got batch of events Sleeping : 1 1 Sleeping : 2 2 Sleeping : 3 3 Sleeping : 4 4 Sleeping : 5 5 Sleeping : 6 6 Sleeping : 7 7 Sleeping : 8 8 Sleeping : 9 9 Sleeping : 10 10 emitting:10 emitting:11 emitting:12 emitting:13 emitting:14 emitting:15 emitting:16 emitting:17 emitting:18 emitting:19 Sleeping 500 for batch Got batch of events Sleeping : 11 11 Sleeping : 12 12 Sleeping : 13

但是,如果我尝试通过添加一些对 .observeOn(Schedulersputation()) 的调用来并行化不同的操作阶段,那么我的程序似乎不再考虑背压.我的代码现在看起来像:

However if I attempt to parallelize the different stages of operation here by adding some calls to .observeOn(Schedulersputation()) then it seems like my program no longer respects backpressure. My code now looks like:

public static void main(String[] args) throws ExecutionException, InterruptedException { Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList()) .stream().map(i -> { System.out.println("emitting:" + i); return i; }); Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator()); System.out.println(String.format("Buffer size: %d", flowable.bufferSize())); Long count = flowable.onBackpressureBuffer(10) .buffer(10) .observeOn(Schedulersputation()) .flatMap(buf -> { System.out.println("Sleeping 500 for batch"); Thread.sleep(500); System.out.println("Got batch of events"); return Flowable.fromIterable(buf); }, 1) .map(x -> x + 1) .observeOn(Schedulersputation()) .doOnNext(i -> { System.out.println(String.format("Sleeping : %d", i)); Thread.sleep(100); System.out.println(i); }) .observeOn(Schedulersputation()) .count() .blockingGet(); System.out.println("count: " + count); }

我的输出如下,我的所有事件都是预先发出的,而不是遵守各个执行阶段指定的背压和缓冲区:

And my output is the following, where all of my events are emitted upfront instead of respecting the backpressure and buffers specified by the various stages of execution:

Buffer size: 128 emitting:0 emitting:1 emitting:2 emitting:3 emitting:4 emitting:5 emitting:6 emitting:7 emitting:8 emitting:9 emitting:10 Sleeping 500 for batch emitting:11 emitting:12 ... everything else is emitted here ... emitting:998 emitting:999 Got batch of events Sleeping 500 for batch Sleeping : 1 1 Sleeping : 2 2 Sleeping : 3 3 Sleeping : 4 4 Sleeping : 5 Got batch of events Sleeping 500 for batch 5 Sleeping : 6 6 Sleeping : 7 7 Sleeping : 8 8 Sleeping : 9 9 Sleeping : 10 Got batch of events Sleeping 500 for batch 10 Sleeping : 11 11 Sleeping : 12 12 Sleeping : 13 13 Sleeping : 14 14 Sleeping : 15 Got batch of events Sleeping 500 for batch 15 Sleeping : 16 16 Sleeping : 17 17 Sleeping : 18 18 Sleeping : 19 19 Sleeping : 20 Got batch of events Sleeping 500 for batch 20 Sleeping : 21 21 Sleeping : 22 22 Sleeping : 23 23 Sleeping : 24 24 Sleeping : 25 Got batch of events Sleeping 500 for batch 25

假设我的批处理阶段正在调用外部服务,但由于延迟,我希望它们并行运行.我还希望在给定时间控制内存中的项目数量,因为最初发出的项目数量可能变化很大,并且批处理运行的阶段比事件的初始发出要慢得多.

Pretend my stages of batching are calling out to external services, but that I want them to run in parallel because of latency. I also want to have control of the number of items in memory at a given time because the number of items emitted initially could be highly variable, and the stages operating on batches run much slower than the initial emission of events.

如何让我的 Flowable 遵守 Scheduler 的背压?为什么当我添加对 observeOn 的调用时,它似乎只是不尊重背压?

How can I have my Flowable respect backpressure across a Scheduler? Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?

推荐答案

如何让我的 Flowable 尊重背压跨调度程序

How can I have my Flowable respect backpressure across a Scheduler

实际上,应用 onBackpressureBuffer 会使它上面的源与下游应用的任何背压断开连接,因为它是一个无界操作符.你不需要它,因为 Flowable.fromIterable(顺便说一句,RxJava 有一个 range 操作符)支持并尊重背压.

Actually, applying onBackpressureBuffer makes the source above it disconnect from any backpressure applied by downstream as it is an unbounded-in operator. You don't need it because Flowable.fromIterable (and by the way, RxJava has a range operator) supports and honors backpressure.

为什么当我调用observeOn时,它似乎只是不尊重背压?

Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?

在第一个示例中,发生了一种称为调用堆栈阻塞的自然背压.默认情况下,RxJava 是同步的,并且大多数操作符不会引入异步,就像第一个示例中没有的一样.

In the first example, there is a natural backpressure happening called call-stack blocking. RxJava is synchronous by default and most operators don't introduce asynchrony, just like none do in the first example.

observeOn 引入了一个异步边界,因此理论上,阶段可以彼此并行运行.它有一个默认的 128 元素预取缓冲区,可以通过其重载之一进行调整.但是,在您的情况下,buffer(10) 实际上会将预取量放大到 1280,这可能仍会导致一次完全消耗 1000 个元素的长源.

observeOn introduces an asynchronous boundary thus in theory, stages can run in parallel with each other. It has a default 128 element prefetch buffer which can be adjusted via one of its overloads. In your case, however, buffer(10) will actually amplify the prefetch amount to 1280 which may still lead to the complete consumption of your 1000 element long source in one go.

更多推荐

为什么我的 RxJava Flowable 在使用 observeOn 时不尊重背压?

本文发布于:2023-11-25 16:53:59,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1630573.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:RxJava   Flowable   observeOn

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!