RxJava2使用详解

编程入门 行业动态 更新时间:2024-10-26 04:32:28

RxJava2使用<a href=https://www.elefans.com/category/jswz/34/1770044.html style=详解"/>

RxJava2使用详解

RxJava2使用详解

学习RxJava2之前需要了解下观察者模式,参考上一篇博客 Design pattern–观察者模式

RxJava2是什么

一个在Java VM上使用可观测的序列来组成异步的基于事件的程序的库
官网:

Observer/Subscriber观察者

Observable被观察者(主题Subject)

Subscribe订阅

主题(Observable)和观察者(Observer)通过Subscribe方法来完成订阅关系,当主题发生状态变化,会通知每一个观察者更新自身状态。


RxJava2基本用法

用之前,需要在app/build.grade中添加依赖:

dependencies {compile "io.reactivex.rxjava2:rxjava:2.0.0-RC5"compile "io.reactivex.rxjava2:rxandroid:2.0.0-RC1"
}

看一个简单的例子

public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);}// 按钮点击事件public void click(View view) {Observer observer = createObserver();Observable observable = createObservable();// observable订阅observerobservable.subscribe(observer);}// 创建Observable主题对象public Observable<String> createObservable() {return Observable.create(new ObservableOnSubscribe<String>() {@Override public void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("择天记更新了!!");e.onNext("人民的名义大结局!!");e.onComplete();}});}// 创建Observer观察者对象public Observer<String> createObserver() {return new Observer<String>() {@Override public void onSubscribe(Disposable d) {System.out.println("-- onSubscribe");}@Override public void onNext(String value) {System.out.println("-- onNext:"+value);}@Override public void onError(Throwable e) {System.out.println("-- Throwable");}@Override public void onComplete() {System.out.println("-- onComplete");}};}
}

例子中可以看到,单主题Observeable状态变化的时候可以调用onNext\ onError\ onComplete\ onSubscribe来通知观察者对象,和上一篇博客 Design pattern–观察者模式 中调用主题的change方法通知到每一个观察者对象的update方法是一样的。我们运行一下,看下结果:

04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onSubscribe
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onNext:择天记更新了!!
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onNext:人民的名义大结局!!
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onComplete

当我收到择天记消息的时候,我希望可以专心看择天记,不想再被别的消息骚扰怎么办?我们可以调用Disposable对象的dispose方法,通过dispose来解除订阅关系,如下:

public Observer<String> createObserver() {return new Observer<String>() {Disposable disposable = null;@Override public void onSubscribe(Disposable d) {disposable = d;System.out.println("-- onSubscribe");}@Override public void onNext(String value) {if (value.contains("择天记")) {disposable.dispose();}System.out.println("-- onNext:"+value);}@Override public void onError(Throwable e) {System.out.println("-- Throwable");}@Override public void onComplete() {System.out.println("-- onComplete");}};

运行一下:

04-30 15:35:49.070 26198-26198/com.testdes.des I/System.out: -- onSubscribe
04-30 15:35:49.071 26198-26198/com.testdes.des I/System.out: -- onNext:择天记更新了!!

我们再看下简单的用法,比如Observer对象可以通过Consumer来构建,Observable通过just来返回,简化后的代码:

public void click(View view) {Observable observable = createObservable();observable.subscribe(new Consumer<String>() {@Override public void accept(String o) throws Exception {System.out.println("-- accept:" + o);}});
}public Observable<String> createObservable() {return Observable.just("择天记", "速度激情8");
}

除了just还有fromArray、fromCallable和fromFuture等,看下运行结果:

04-30 15:50:21.613 26579-26579/com.testdes.des I/System.out: -- accept:择天记
04-30 15:50:21.613 26579-26579/com.testdes.des I/System.out: -- accept:速度激情8

RxJava2线程控制

  • Scheduler.immediate()
    直接运行在当前线程,这是默认的scheduler

  • Scheduler.newThread()
    开一个新的线程,并在新的线程中执行操作

  • Scheduler.io()
    io操作(读写文件、网络交互)所使用的scheduler,和newThead类似,区别是io内部维护了一个没有数量上限的线程池,使用io可以避免不必要的线程创建

  • Schedulerputation()
    计算所用的scheduler,计算指的是cpu密集型计算,如图形的计算,使用固定数量(cpu核心数)的线程池

  • Scheduler.mainThread()
    在android主线程中运行

  public void click(View view) {Observable.create(new ObservableOnSubscribe<String>() {@Override public void subscribe(ObservableEmitter<String> e) throws Exception {Log.d("rxjava2", Thread.currentThread().getName());// 执行网络操作String tvName = getTVNameFromNetwork();e.onNext(tvName);e.onComplete();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Override public void accept(String s) throws Exception {Log.d("rxjava2", "--- accept:"+s);}});}public String getTVNameFromNetwork() {// 假设执行网络操作,获取tvNamereturn "三生三世";}

如果不设置subscribeOn(Schedulers.io()),那subscribe方法是在主线程运行的,执行网络操作会报NetworkOnMainThreadException;设置observeOn(AndroidSchedulers.mainThread())是为了让accept方法运行在主线程中,便于操作view对象,运行一下:

04-30 16:49:08.291 28442-28659/com.testdes.des I/System.out: RxCachedThreadScheduler-1
04-30 16:49:08.292 28442-28659/com.testdes.des I/System.out: --- accept:三生三世

RxJava2常用操作符

RxJava的操作符非常多,功能也非常强大,这里只能介绍几个常用操作符。

  • Map
    对Observable发射的每一项数据应用一个函数,执行变换操作

    相信很多人都看不懂是什么意思,不懂就对了^_^,看个例子
  public void click(View view) {Observable.just(2332182).map(new Function<Integer, String>() {@Override public String apply(Integer integer) throws Exception {return integer + "@qq";}}).subscribe(new Consumer<String>() {@Override public void accept(String s) throws Exception {System.out.println("--- accept:" + s);}}, new Consumer<Throwable>() {@Override public void accept(Throwable throwable) throws Exception {throwable.printStackTrace();}});}

通过map操作符可以轻松的把Integer转成String,例子中把2332182一串数字转成邮箱地址:2332182@qq,运行一下:

04-30 18:46:12.354 1855-1855/com.testdes.des I/System.out: --- accept:2332182@qq
  • flatMap
    flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放到一个单独的Observable

我们看一个例子,假设需要执行登录操作,登录成功后再获取用户信息,这个过程怎么样通过rxjava来实现?

  public void click(View view) {// 登录获取用户信息Observable.just(new UserParam("jack", "123")).flatMap(new Function<UserParam, ObservableSource<Integer>>() {@Override public ObservableSource<Integer> apply(UserParam userParam) throws Exception {// 登录int uid = login(userParam);return Observable.just(uid);}}).flatMap(new Function<Integer, ObservableSource<User>>() {@Override public ObservableSource<User> apply(Integer uid) throws Exception {// 获取用户信息User user = getUserInfo(uid);return Observable.just(user);}}).subscribe(new Consumer<User>() {@Override public void accept(User user) throws Exception {System.out.println(user.toString());}});}// 为了演示方便,假设这个login 和 getUserInfo都是调用网络操作// 调用登录接口获取uidpublic int login(UserParam param) {int uid = 1;// get uid from networkreturn uid;}// 获取用户信息public User getUserInfo(int uid) {User user = new User("jackson", "22", "man");// get user info from networkreturn user;}

flatMap可以把一个Observable转成多个Observables,并且无缝连接在一起,这个是非常巧妙的;由于是模拟的网络操作,所以没有加线程控制也不会有问题,运行一下:

04-30 19:27:17.192 3364-3364/com.testdes.des I/System.out: 用户信息[name:jackson, age:22, sex:man]
  • Debounce
    检测在过了一段指定的时间还没有发射数据时才发射数据

关键字搜索正好符合Debounce使用场景,开发过关键字搜索的童鞋一定知道,每次输入一个字符都会触发EditText的onTextChanged,意味着每输入一个字符就会调用一次搜索接口,而理想的情况应该是当我输入完成的时候再调用接口。输入完成可以定义为当停止输入的时间超过200ms或500ms的时候。
Debounce帮我们解决这个问题,示例如下:
先改下配置,由于rxjava需要和edittext绑定在一起,因此依赖rebinding,rebinding中包含rxjava

compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

关键字搜索代码实现如下:

public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);EditText mEditText = (EditText) findViewById(R.id.mInput);RxTextView.textChanges(mEditText).debounce(200, TimeUnit.MILLISECONDS).subscribeOn(AndroidSchedulers.mainThread()).flatMap(new Func1<CharSequence, Observable<List<String>>>() {@Override public Observable<List<String>> call(CharSequence charSequence) {// do searchreturn Observable.just(doSearch(charSequence.toString()));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<String>>() {@Override public void call(List<String> strings) {for (String s : strings) {System.out.println(s + "\n");}}}, new Action1<Throwable>() {@Override public void call(Throwable throwable) {throwable.printStackTrace();}});}// 搜索接口public List<String> doSearch(String key) {// 假设调用网络接口获取搜索结果List<String> list = new ArrayList<>();list.add("abc");return list;}
}
  • throttleFirst
    防止按钮重复点击可以使用该操作符,代码如下:
    RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS).subscribe(new Observer<Void>() {@Override public void onCompleted() {}@Override public void onError(Throwable e) {}@Override public void onNext(Void aVoid) {System.out.println("按钮被点击了");}});
  • merge
    合并操作符,用于合并两个Observable对象
public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);Button button = (Button) findViewById(R.id.button);Observable.merge(getCouponFromLocal(), getCouponFromServer()).subscribe(new Subscriber<List<String>>() {@Override public void onCompleted() {}@Override public void onError(Throwable e) {}@Override public void onNext(List<String> strings) {for (String s : strings) {System.out.println(s + "\n");}}});}// 本地获取优惠券public Observable<List<String>> getCouponFromLocal() {List<String> list = new ArrayList<>();list.add("满99减10");return Observable.just(list);}// 本地获取优惠券(模拟网络接口调用)public Observable<List<String>> getCouponFromServer() {List<String> list = new ArrayList<>();list.add("满500减100");return Observable.just(list);}}
04-30 23:13:04.249 13568-13568/com.testdes.des I/System.out: 满99减10
04-30 23:13:04.249 13568-13568/com.testdes.des I/System.out: 满500减100

还有很多其他的操作符,这里就不一一介绍了,详细的可以参考官方wiki: .x/javadoc/io/reactivex/Flowable.html#flatMap(io.reactivex.functions.Function)

更多推荐

RxJava2使用详解

本文发布于:2024-02-12 23:10:45,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1689755.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:详解

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!