RxJava/RxAndroid的操作符使用(二)

编程入门 行业动态 更新时间:2024-10-25 06:33:57

RxJava/RxAndroid的<a href=https://www.elefans.com/category/jswz/34/1770947.html style=操作符使用(二)"/>

RxJava/RxAndroid的操作符使用(二)

文章目录

  • 一、创建操作
    • 1、基本创建
    • 2、快速创建
      • 2.1 empty
      • 2.2 never
      • 2.3 error
      • 2.4 from
      • 2.5 just
    • 3、定时与延时创建操作
      • 3.1 defer
      • 3.2 timer
      • 3.3 interval
      • 3.4 intervalRange
      • 3.5 range
      • 3.6 repeat
  • 二、过滤操作
    • 1、skip/skipLast
    • 2、debounce
    • 3、distinct——去重
    • 4、elementAt——获取指定位置元素
    • 5、filter——过滤
    • 6、first——取第一个数据
    • 7、last——取最后一个
    • 8、ignoreElements & ignoreElement(忽略元素)
    • 9、ofType(过滤类型)
    • 10、sample
    • 11 、take & takeLast
  • 三、组合可观察对象操作符
    • 1、CombineLatest
    • 2、merge
    • 3、zip
    • 4、startWith
    • 5、join
  • 四、变化操作符
    • 1、map
    • 2、flatMap / concatMap
    • 3、scan
    • 4、buffer
    • 5、window
  • 关于RxJava/RxAndroid的全部文章

一、创建操作

1、基本创建

create创建一个基本的被观察者

在使用create()操作符时,最好在被观察者的回调函数subscribe()中加上isDisposed(),以便在观察者断开连接的时候不在执行subscribe()函数中的相关逻辑,避免意想不到的错误出现。

        Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Object> emitter) throws Throwable {try {if(!emitter.isDisposed()){emitter.onNext("a");emitter.onNext("b");}} catch (Exception e) {emitter.onError(e);}}}).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

2、快速创建

完整&快速创建被观察者、数组、集合遍历

操作符作用
empty创建一个只发送 onComplete 事件的 Observable。
never创建一个不发送任何事件的 Observable。
error创建一个只发送 onError 事件的 Observable。
from操作符用于将其他对象或数据结构转换为 Observable,可发送不同类型的数据流
just操作符将对象或一组对象转换为 Observable,并立即发送这些对象,没有延迟。

2.1 empty

创建一个不发射任何items但正常终止的 Observable——create an Observable that emits no items but terminates normally

        Observable.empty().subscribe(value -> Log.e(TAG, "onNext: "+value ),error -> Log.e(TAG, "Error: "+error),()->Log.e(TAG,"onComplete"));

2.2 never

创建一个不发射任何items且不会终止的 Observable——create an Observable that emits no items and does not terminate

        Observable.never().subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

不发送任何事件

2.3 error

创建一个不发射任何items并以错误终止的 Observable——create an Observable that emits no items and terminates with an error

        Observable.error(new Exception("ERROR")).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

2.4 from

以fromAray举例:

        Observable.fromArray(1,2,3,4,5).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

2.5 just

        Observable.just(1,2,3,4,5).subscribe(value -> Log.e(TAG, "onNext: " + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

通过just()创建传入Integer类型的参数构建Observable被观察者,相当于执行了onNext(1)~onNext(5),通过链式编程订阅观察者。注意just的数据一般不能超过10个

注意,如果将 null 传递给 Just,它将返回一个将 null 作为项目发出的 Observable。不要错误地认为这将返回一个空的 Observable(根本不发出任何项目)

3、定时与延时创建操作

定时操作、周期性操作

操作符作用
defer直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件
timer用于延时发送,在给定的延迟后发出单个项目
interval它按照指定时间间隔发出整数序列,通常用于定时操作。
intervalRange类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量
range它发出一个连续的整数序列,可以指定发送的次数
repeat重复发送指定次数的某个事件流

3.1 defer

直到有Observer观察者订阅时,才会通过Observeable的工厂方法动态创建Observeable,并且发送事件

defer不会立即创建 Observable,而是等到观察者订阅时才动态创建,每个观察者都会得到一个新的 Observable 实例。

defer确保了Observable代码在被订阅后才执行(而不是创建后立即执行)

        Observable<Integer> integerObservable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> get() throws Throwable {int randomNumber = (int) (Math.random() * 100);return Observable.just(randomNumber);}});integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, "第一次" + integer.toString());}});integerObservable.subscribe(integer -> Log.e(TAG, "第二次" + integer.toString()));

3.2 timer

构造方法如下:

 timer(long delay, TimeUnit unit)timer(long delay, TimeUnit unit, Scheduler scheduler)
  • delay:延时的时间,类型为Long;
  • unit:表示时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

用于延时发送

        final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Log.e(TAG, "timer:当前时间 ==" + dateFormat.format(System.currentTimeMillis()));Observable.timer(5, TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

表示延迟5s后发送数据

3.3 interval

用于定时发送数据,快速创建Observable被观察者对象,每隔指定的时间就发送相应的事件,事件序列从0开始,无限递增1;

//在指定延迟时间后,每个多少时间发送一次事件
interval(long initialDelay, long period, TimeUnit unit)//在指定的延迟时间后,每隔多少时间发送一次事件,可以指定调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)//每间隔多少时间发送一次事件,使用默认的线程
Observable<Long> interval(long period, TimeUnit unit)//每间隔多少时间发送一次事件,可以指定调度器
interval(long period, TimeUnit unit, Scheduler scheduler)
  • initialDelay: 表示延迟开始的时间,类型为Long
  • period:距离下一次发送事件的时间间隔,类型Long
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。

它会从0开始,然后每隔 1 秒发射一个递增的整数值

        Observable.interval(1,3,TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + dateFormat.format(System.currentTimeMillis())),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

定时发射指定的结果

// 创建一个每秒发射一个递增整数的 ObservableObservable<Long> intervalObservable = Observable.interval(1, TimeUnit.SECONDS);// 使用 map 操作符将递增的整数值映射为您想要的数据类型Observable<String> customObservable = intervalObservable.map(index -> "Data_" + index); // 映射为字符串 "Data_" + index// 订阅并输出结果customObservable.subscribe(data -> System.out.println("Received: " + data),error -> System.err.println("Error: " + error),() -> System.out.println("Completed"));

3.4 intervalRange

类似于interval(),快速创建一个被观察者对象,指定时间间隔就发送事件,可以执行发送事件的数量,数据依次递增1。

intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  • start:表示事件开始的数值大小,类型为Long
  • count:表示事件执行的次数,类型为long,不能为负数;
  • initialDelay:表示延迟开始的时间,类型为Long;
  • period:距离下一次发送事件的时间间隔,类型Long;
  • unit:时间单位,有TimeUnit.SECONDS等多种类型;
  • scheduler:表示调度器,用于指定线程。
        Observable.intervalRange(10, 3, 2, 1,TimeUnit.SECONDS,Schedulers.io()).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

3.5 range

Range 运算符按顺序发出一系列连续整数,可以在其中选择范围的起点及其长度。

它发出一个连续的整数序列,通常不涉及延迟。类似于intervalRange。

 public static Observable<Integer> range(int start, int count)public static Observable<Long> rangeLong(long start, long count)
  • start:事件开始的大小
  • count:发送的事件次数
    Observable.range(10,5).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

3.6 repeat

repeat操作符可以重复发送指定次数的某个事件流,repeat操作符默认在trampoline调度器上执行,repeat默认重复次数为Long.MAX_VALUE,可使用重载方法指定次数以及使用repeatUntil指定条件。

        //一直重复Observable.fromArray(1, 2, 3, 4).repeat();//重复发送5次Observable.fromArray(1, 2, 3, 4).repeat(5);//重复发送直到符合条件时停止重复Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {@Overridepublic boolean getAsBoolean() throws Exception {//自定判断条件,为true即可停止,默认为falsereturn false;}});

二、过滤操作

1、skip/skipLast

可以在Flowable,Observable中使用,表示源发射数据前,跳过多少个。

  1. skip: skip 操作符用于跳过 Observable 开头的一定数量的事件,然后开始发射后续的事件。它忽略序列的头部事件。

    例如,observable.skip(3) 会跳过前面的 3 个事件,然后发射后续的事件。

  2. skipLast: skipLast 操作符用于跳过 Observable 末尾的一定数量的事件,然后发射前面的事件。它忽略序列的末尾事件。

    例如,observable.skipLast(3) 会发射从序列开头到倒数第 3 个事件之前的事件,忽略了最后 3 个事件。

        Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8);integerObservable.skipLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, "accept: " + integer);}});

换成skip后结果如下:

2、debounce

仅当特定时间跨度过去而没有发出另一个项目时,才从 Observable 发出一个项目

Observable.create(emitter -> {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();
}).subscribeOn(Schedulers.io()).debounce(1,TimeUnit.SECONDS).blockingSubscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete")
);

debounce(1, TimeUnit.SECONDS) 表示将事件流中的事件按照时间窗口的方式进行过滤。具体含义是,如果在连续的 1 秒内没有新的事件发射,那么才会将最后一个事件传递给观察者,否则会丢弃之前的事件。

结合图像理解,红色线条为debounce监听的发射节点,也就是每隔一秒发送一次数据。

在0s时发送了1。

在1s时由于没有数据,就没有发送数据。

在1s—2s期间产生了两次数据,分别是2和3。但是debounce只会将距离2s最近一次的数据发送。因此2被不会发送出来。

3、distinct——去重

可作用于Flowable,Observable,去掉数据源重复的数据。

        Observable.just(1,2,3,1,2,3,4).distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

distinctUntilChanged()去掉相邻重复数据。

      Observable.just(1,3,3,2,2,3,4).distinctUntilChanged().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));//还可以指定重复条件Observable.just(1,3,3,2,2,3,4).distinctUntilChanged(new Function<Integer, Boolean>() {@Overridepublic Boolean apply(Integer integer) throws Throwable {return integer>3;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

4、elementAt——获取指定位置元素

//获取索引为1的元素,如果不存在返回Error
Observable.just("a","b","c","d","e").elementAt(1,"Error").subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),
);

5、filter——过滤

用于过滤指定的发射元素。

        Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Throwable {return (integer % 2) != 0;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

6、first——取第一个数据

      //不存在则返回100
Observable.just(1, 2, 3, 4, 5, 6).first(100).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));

7、last——取最后一个

last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。

   Observable.just(1, 2, 3, 4, 5, 6).last(100).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));

8、ignoreElements & ignoreElement(忽略元素)

ignoreElements 作用于FlowableObservableignoreElement作用于MaybeSingle。两者都是忽略掉数据,不发射任何数据,返回完成或者错误时间。

9、ofType(过滤类型)

作用于Flowable、Observable、Maybe,过滤选择类型。

        Observable.just(1, 2, 3, 4.4, 5.5, 6.6).ofType(Integer.class).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));

10、sample

  1. debounce:它等待一段时间,如果在这段时间内没有新事件到达,它会发射最后一个事件。它用于处理高频率事件流,例如用户输入,以确保只处理用户停止输入后的事件。debounce 等待事件流静止,然后发射最后一个事件。
  2. sample:它按照固定的时间间隔从事件流中抽样一个事件,并发射该事件。它用于定期采样事件流,例如从传感器数据中每隔一段时间获取一次数据。sample 定期获取事件,无论事件流是否活跃。
        Observable<Integer> observable = Observable.create(emitter -> {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();});observable.sample(1, TimeUnit.SECONDS).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error));

产生的数据在红线处发送

1在第1s时被发送,2在第2s时被发送,3在第3s时被发送,由于4还未在第5s时就已经onComplete所以4无法被发送

11 、take & takeLast

作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。

        Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.take(4).subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));//打印:1 2 3 4source.takeLast(4).subscribe(value-> Log.e(TAG, "timer:onNext ==" + value));//打印:7 8 9 10

三、组合可观察对象操作符

操作符作用
combineLatest用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件。
merge用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
zip用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
startWith用于在一个 Observable 发射的事件前插入一个或多个初始事件
join用于将两个 Observable 的事件按照时间窗口的方式进行组合。

1、CombineLatest

通过指定的函数将每个 Observable 发出的最新项目组合在一起,并根据该函数的结果发出项目

  • combineLatest 用于将多个 Observable 中最新的事件进行组合,并生成一个新的事件
  • 当任何一个 Observable 发射新数据时,都会生成新的组合事件。
  • 适用于需要及时反应多个数据源最新值变化的情况。
Observable<Integer> source1 = Observable.just(1, 2, 3);
Observable<String> source2 = Observable.just("A", "B", "C");
Observable<Boolean> source3 = Observable.just(true, false, true);Observable<String> combined = ObservablebineLatest(source1,source2,source3,(integer, string, aBoolean) -> integer + " " + string + " " + aBoolean
);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete")
);

2、merge

  • merge 用于将多个 Observable 合并成一个单一的 Observable,按照它们发射事件的顺序合并
  • merge 不会进行事件的组合,只是合并多个 Observable 的事件。
  • 适用于需要将多个 Observable 的事件合并成一个流的情况。

注意:merge只能合并相同类型的Observable

        Observable<Integer> source1 = Observable.just(1, 2, 3);Observable<Integer> source2 = Observable.just(4,5,6);Observable<Integer> source3 = Observable.just(7,8,9);Observable<Integer> combined = Observable.merge(source1,source2,source3);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

3、zip

  • zip 用于一一配对多个 Observable 发射的事件,只有当所有 Observable 都有事件时才生成新事件
  • zip 会等待所有 Observable 都有事件后,才会执行组合函数生成新事件。
  • 适用于需要将多个数据源的事件一一配对的情况。
        Observable<Integer> source1 = Observable.just(1, 2, 3);Observable<String> source2 = Observable.just("A", "B", "C");Observable<Boolean> source3 = Observable.just(true, false, true);Observable<String> combined = Observable.zip(source1,source2,source3,(integer, string, aBoolean) -> integer + " " + string + " " + aBoolean);combined.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

4、startWith

  • startWith 用于在一个 Observable 发射的事件前插入一个或多个初始事件
  • 这些初始事件会作为 Observable 的开头。
  • 适用于需要在 Observable 发射事件前添加一些初始数据的情况。

        Observable<Integer> source = Observable.just(1, 2, 3);Observable<Integer> withStart = source.startWithArray(100,200);withStart.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

5、join

  • join 用于将两个 Observable 的事件按照时间窗口的方式进行组合
  • 可以为每个 Observable 设置时间窗口,然后在这些窗口内组合事件。
  • 适用于需要在时间窗口内组合两个 Observable 的事件的情况。

时间窗口:

  • 固定时间窗口:定义一个固定的时间段,将在该时间段内的事件分为一个时间窗口。
  • 延时时间窗口:定义一个时间段,但在事件发生后延迟一段时间后才分为时间窗口。
  • 动态时间窗口:根据事件的特定条件动态地定义时间窗口。
        Observable<Integer> left = Observable.just(1, 2, 3);Observable<Integer> right = Observable.just(10, 20, 30);left.join(right,leftDuration -> Observable.timer(1, TimeUnit.SECONDS),rightDuration -> Observable.timer(1, TimeUnit.SECONDS),(leftValue, rightValue) -> "Left: " + leftValue + ", Right: " + rightValue).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value));

在这个示例中,我们定义了以下时间窗口规则:

  • 左边的时间窗口规则:leftDuration -> Observable.timer(1, TimeUnit.SECONDS) 表示在左边的事件后等待 1 秒后生成一个时间窗口。
  • 右边的时间窗口规则:rightDuration -> Observable.timer(2, TimeUnit.SECONDS) 表示在右边的事件后等待 2 秒后生成一个时间窗口。

现在让我们看看时间窗口如何影响事件的组合:

  • 当左边的事件 1 发生时,它会进入左边的时间窗口,并等待 1 秒。在此期间,右边的事件没有机会进入左边的时间窗口。
  • 当右边的事件 10 发生时,它会进入右边的时间窗口,并等待 2 秒。在此期间,左边的事件也没有机会进入右边的时间窗口。

只有在左边和右边的事件都在各自的时间窗口内时,它们才会被组合。在这个示例中,左边的事件会在右边的时间窗口内被组合。所以,在 1 秒后,左边的事件 1 和右边的事件 10 被组合成 “Left: 1, Right: 10”。

四、变化操作符

| 操作符 | 说明 |

map()对数据流的类型进行转换
flatMap()对数据流的类型进行包装成另一个数据流
scan()scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。
buffer()缓存指定大小数据
window()缓存指定大小数据,返回新的integerObservable


​ 对上一轮处理过后的数据流进行函数处理
​ 对所有的数据流进行分组
​ 缓存发射的数据流到一定数量,随后发射出数据流集合
​ 缓存发射的数据流到一定数量,随后发射出新的事件流

1、map

        Observable.just(1,2,3).map(new Function<Integer, Object>() {@Overridepublic Object apply(Integer integer) throws Throwable {return integer * 100;}}).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

2、flatMap / concatMap

        Observable observable = Observable.just(isLogin("12346")).flatMap(new Function<Boolean, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Boolean aBoolean) throws Throwable {String Login = "登陆失败,帐号秘密错误";if (aBoolean) Login = "登陆成功";return Observable.just(Login).delay(2, TimeUnit.SECONDS);}});observable.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));private boolean isLogin(String passWord) {if (passWord.equals("123456")) {return true;}return false;}

  1. Observable.just(isLogin("12346")) 创建一个 Observable,它会发射一个布尔值,表示登录是否成功。
  2. .flatMap(new Function<Boolean, ObservableSource<?>>() { ... }:使用 flatMap 操作符将上一步的布尔值结果转换成一个新的 Observable,其中包含登录的结果消息。flatMap 中的 apply 方法根据登录结果 aBoolean 决定返回不同的消息。如果登录成功,返回 “登陆成功” 消息,否则返回 “登陆失败,帐号秘密错误” 消息,并使用 delay 延迟 2 秒发送消息。
  3. observable.subscribe(...):最后,订阅 observable,并设置了三个回调函数,分别处理 onNext、onError、onComplete 事件。

concatMap与flatMap的区别: concatMap是有序的,flatMap是无序的

  1. flatMap():
    • 不保证内部 Observable 的发射顺序,它会尽可能并行地处理内部 Observable,并将它们的发射结果合并到一个单一的 Observable 中。
    • 内部 Observable 可以乱序发射数据,最终结果也可能是乱序的。
  2. concatMap():
    • 保证内部 Observable 的发射顺序,它会按照原始数据的顺序依次处理每个内部 Observable,等待一个内部 Observable 完成后再处理下一个。
    • 内部 Observable 的发射顺序和最终结果的顺序都与原始数据的顺序一致。
        Observable<Integer> source = Observable.just(1, 2, 3);// 使用 flatMapsource.flatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value -> Log.e(TAG, "timer:flatMapOnNext ==" + value));// 使用 concatMapsource.concatMap(num -> Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value -> Log.e(TAG, "timer:concatMapOnNext ==" + value));

3、scan

scan操作符会对发射的数据上一轮发射的数据进行函数处理,并返回的数据供下一轮使用。

        Observable<Integer> observable = Observable.just(1,2,3,4).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Throwable {Log.e(TAG, "integer = " + integer +" integer2 = "+integer2);return integer2-integer;}});observable.subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"));

  1. 初始情况下,前一个累积结果为空(因为没有前一个值),所以第一个数据项 1 直接发射出来,产生的结果是 1。
  2. 接下来,前一个累积结果是 1,当前数据项是 2,所以执行操作 2 - 1,产生的结果是 1。
  3. 再次执行,前一个累积结果是 1,当前数据项是 3,所以执行操作 3 - 1,产生的结果是 2。
  4. 最后,前一个累积结果是 2,当前数据项是 4,执行操作 4 - 2,产生的结果是 2。

4、buffer

buffer操作符可以将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据全部发射出去。

        Observable.just(1,2,3,4,5,6,7,8).buffer(3).subscribe(value -> Log.e(TAG, "timer:onNext ==" + value),error -> Log.e(TAG, "Error: " + error),() -> Log.e(TAG, "onComplete"))

5、window

window操作符和buffer操作符在功能上实现的效果是一样的,但window操作符最大区别在于同样是缓存一定数量的数据项,window操作符最终发射出来的是新的事件流integerObservable,而buffer操作符发射出来的是新的数据流。

也就是说,window操作符发射出来新的事件流中的数据项,还可以经过Rxjava其他操作符进行处理

window 操作符用于将一个 Observable 拆分为多个子 Observable,每个子 Observable 包含一定数量的连续数据项。window 操作符的两个参数的含义如下:

  1. 第一个参数(count):指定每个子 Observable 中包含的数据项的数量。
  2. 第二个参数(skip):指定何时启动新的窗口。它定义了窗口之间的重叠或间隔。如果 skip 等于 count,则窗口之间不重叠。如果 skip 小于 count,则窗口之间有重叠数据。

举个例子来说明:

假设有一个 Observable 发出的数据序列如下:1, 2, 3, 4, 5, 6, 7, 8, 9。

  • 如果你使用 window(3, 2),它的含义是每个窗口包含 3 个数据项,且窗口之间间隔 2 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:3, 4, 5
    • 窗口3:5, 6, 7
    • 窗口4:7, 8, 9
  • 如果你使用 window(3, 3),窗口之间不重叠,每个窗口包含 3 个数据项。那么分割后的子 Observable 将是:
    • 窗口1:1, 2, 3
    • 窗口2:4, 5, 6
    • 窗口3:7, 8, 9

这里只使用了一个参数,用于指定窗口的大小。然后更具window发射新的事件流integerObservable的特性,对这个事件流进行了去重操作。

        Observable.just(1,1,3,4,6,6,7,8).window(3).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Throwable {integerObservable.distinct().subscribe(value -> Log.e(TAG, "timer:onNext ==" + value.toString()));}});

关于RxJava/RxAndroid的全部文章

RxJava/RxAndroid的基本使用方法(一)

RxJava的操作符使用(二)


参考文档:

官方文档:reactivex

RxJava3 Wiki:Home · ReactiveX/RxJava Wiki · GitHub

RxJava3官方github:What’s different in 3.0 · ReactiveX/RxJava Wiki · GitHub

RxJava2 只看这一篇文章就够了–玉刚说

RxJava2最全面、最详细的讲解–苏火火丶

关于背压(Backpressure)的介绍:关于RxJava最友好的文章——背压(Backpressure)

RXJava3+OKHTTP3+Retrofit2(观察者设计模式)讲解+实战

更多推荐

RxJava/RxAndroid的操作符使用(二)

本文发布于:2023-11-15 17:00:09,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1603127.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:操作   RxJava   RxAndroid

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!