详解"/>
RxJava2使用详解
RxJava2使用详解
学习RxJava2之前需要了解下观察者模式,参考上一篇博客 Design pattern–观察者模式
RxJava2是什么
一个在Java VM上使用可观测的序列来组成异步的基于事件的程序的库
官网:
Observer/Subscriber观察者
Observable被观察者(主题Subject)
Subscribe订阅
主题(Observable)和观察者(Observer)通过Subscribe方法来完成订阅关系,当主题发生状态变化,会通知每一个观察者更新自身状态。
RxJava2基本用法
用之前,需要在app/build.grade中添加依赖:
dependencies {compile "io.reactivex.rxjava2:rxjava:2.0.0-RC5"compile "io.reactivex.rxjava2:rxandroid:2.0.0-RC1"
}
看一个简单的例子
public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);}// 按钮点击事件public void click(View view) {Observer observer = createObserver();Observable observable = createObservable();// observable订阅observerobservable.subscribe(observer);}// 创建Observable主题对象public Observable<String> createObservable() {return Observable.create(new ObservableOnSubscribe<String>() {@Override public void subscribe(ObservableEmitter<String> e) throws Exception {e.onNext("择天记更新了!!");e.onNext("人民的名义大结局!!");e.onComplete();}});}// 创建Observer观察者对象public Observer<String> createObserver() {return new Observer<String>() {@Override public void onSubscribe(Disposable d) {System.out.println("-- onSubscribe");}@Override public void onNext(String value) {System.out.println("-- onNext:"+value);}@Override public void onError(Throwable e) {System.out.println("-- Throwable");}@Override public void onComplete() {System.out.println("-- onComplete");}};}
}
例子中可以看到,单主题Observeable状态变化的时候可以调用onNext\ onError\ onComplete\ onSubscribe来通知观察者对象,和上一篇博客 Design pattern–观察者模式 中调用主题的change方法通知到每一个观察者对象的update方法是一样的。我们运行一下,看下结果:
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onSubscribe
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onNext:择天记更新了!!
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onNext:人民的名义大结局!!
04-30 15:16:46.491 25444-25444/com.testdes.des I/System.out: -- onComplete
当我收到择天记消息的时候,我希望可以专心看择天记,不想再被别的消息骚扰怎么办?我们可以调用Disposable对象的dispose方法,通过dispose来解除订阅关系,如下:
public Observer<String> createObserver() {return new Observer<String>() {Disposable disposable = null;@Override public void onSubscribe(Disposable d) {disposable = d;System.out.println("-- onSubscribe");}@Override public void onNext(String value) {if (value.contains("择天记")) {disposable.dispose();}System.out.println("-- onNext:"+value);}@Override public void onError(Throwable e) {System.out.println("-- Throwable");}@Override public void onComplete() {System.out.println("-- onComplete");}};
运行一下:
04-30 15:35:49.070 26198-26198/com.testdes.des I/System.out: -- onSubscribe
04-30 15:35:49.071 26198-26198/com.testdes.des I/System.out: -- onNext:择天记更新了!!
我们再看下简单的用法,比如Observer对象可以通过Consumer来构建,Observable通过just来返回,简化后的代码:
public void click(View view) {Observable observable = createObservable();observable.subscribe(new Consumer<String>() {@Override public void accept(String o) throws Exception {System.out.println("-- accept:" + o);}});
}public Observable<String> createObservable() {return Observable.just("择天记", "速度激情8");
}
除了just还有fromArray、fromCallable和fromFuture等,看下运行结果:
04-30 15:50:21.613 26579-26579/com.testdes.des I/System.out: -- accept:择天记
04-30 15:50:21.613 26579-26579/com.testdes.des I/System.out: -- accept:速度激情8
RxJava2线程控制
Scheduler.immediate()
直接运行在当前线程,这是默认的schedulerScheduler.newThread()
开一个新的线程,并在新的线程中执行操作Scheduler.io()
io操作(读写文件、网络交互)所使用的scheduler,和newThead类似,区别是io内部维护了一个没有数量上限的线程池,使用io可以避免不必要的线程创建Schedulerputation()
计算所用的scheduler,计算指的是cpu密集型计算,如图形的计算,使用固定数量(cpu核心数)的线程池Scheduler.mainThread()
在android主线程中运行
public void click(View view) {Observable.create(new ObservableOnSubscribe<String>() {@Override public void subscribe(ObservableEmitter<String> e) throws Exception {Log.d("rxjava2", Thread.currentThread().getName());// 执行网络操作String tvName = getTVNameFromNetwork();e.onNext(tvName);e.onComplete();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Override public void accept(String s) throws Exception {Log.d("rxjava2", "--- accept:"+s);}});}public String getTVNameFromNetwork() {// 假设执行网络操作,获取tvNamereturn "三生三世";}
如果不设置subscribeOn(Schedulers.io()),那subscribe方法是在主线程运行的,执行网络操作会报NetworkOnMainThreadException;设置observeOn(AndroidSchedulers.mainThread())是为了让accept方法运行在主线程中,便于操作view对象,运行一下:
04-30 16:49:08.291 28442-28659/com.testdes.des I/System.out: RxCachedThreadScheduler-1
04-30 16:49:08.292 28442-28659/com.testdes.des I/System.out: --- accept:三生三世
RxJava2常用操作符
RxJava的操作符非常多,功能也非常强大,这里只能介绍几个常用操作符。
- Map
对Observable发射的每一项数据应用一个函数,执行变换操作
相信很多人都看不懂是什么意思,不懂就对了^_^,看个例子
public void click(View view) {Observable.just(2332182).map(new Function<Integer, String>() {@Override public String apply(Integer integer) throws Exception {return integer + "@qq";}}).subscribe(new Consumer<String>() {@Override public void accept(String s) throws Exception {System.out.println("--- accept:" + s);}}, new Consumer<Throwable>() {@Override public void accept(Throwable throwable) throws Exception {throwable.printStackTrace();}});}
通过map操作符可以轻松的把Integer转成String,例子中把2332182一串数字转成邮箱地址:2332182@qq,运行一下:
04-30 18:46:12.354 1855-1855/com.testdes.des I/System.out: --- accept:2332182@qq
- flatMap
flatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放到一个单独的Observable
我们看一个例子,假设需要执行登录操作,登录成功后再获取用户信息,这个过程怎么样通过rxjava来实现?
public void click(View view) {// 登录获取用户信息Observable.just(new UserParam("jack", "123")).flatMap(new Function<UserParam, ObservableSource<Integer>>() {@Override public ObservableSource<Integer> apply(UserParam userParam) throws Exception {// 登录int uid = login(userParam);return Observable.just(uid);}}).flatMap(new Function<Integer, ObservableSource<User>>() {@Override public ObservableSource<User> apply(Integer uid) throws Exception {// 获取用户信息User user = getUserInfo(uid);return Observable.just(user);}}).subscribe(new Consumer<User>() {@Override public void accept(User user) throws Exception {System.out.println(user.toString());}});}// 为了演示方便,假设这个login 和 getUserInfo都是调用网络操作// 调用登录接口获取uidpublic int login(UserParam param) {int uid = 1;// get uid from networkreturn uid;}// 获取用户信息public User getUserInfo(int uid) {User user = new User("jackson", "22", "man");// get user info from networkreturn user;}
flatMap可以把一个Observable转成多个Observables,并且无缝连接在一起,这个是非常巧妙的;由于是模拟的网络操作,所以没有加线程控制也不会有问题,运行一下:
04-30 19:27:17.192 3364-3364/com.testdes.des I/System.out: 用户信息[name:jackson, age:22, sex:man]
- Debounce
检测在过了一段指定的时间还没有发射数据时才发射数据
关键字搜索正好符合Debounce使用场景,开发过关键字搜索的童鞋一定知道,每次输入一个字符都会触发EditText的onTextChanged,意味着每输入一个字符就会调用一次搜索接口,而理想的情况应该是当我输入完成的时候再调用接口。输入完成可以定义为当停止输入的时间超过200ms或500ms的时候。
Debounce帮我们解决这个问题,示例如下:
先改下配置,由于rxjava需要和edittext绑定在一起,因此依赖rebinding,rebinding中包含rxjava
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
关键字搜索代码实现如下:
public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);EditText mEditText = (EditText) findViewById(R.id.mInput);RxTextView.textChanges(mEditText).debounce(200, TimeUnit.MILLISECONDS).subscribeOn(AndroidSchedulers.mainThread()).flatMap(new Func1<CharSequence, Observable<List<String>>>() {@Override public Observable<List<String>> call(CharSequence charSequence) {// do searchreturn Observable.just(doSearch(charSequence.toString()));}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<List<String>>() {@Override public void call(List<String> strings) {for (String s : strings) {System.out.println(s + "\n");}}}, new Action1<Throwable>() {@Override public void call(Throwable throwable) {throwable.printStackTrace();}});}// 搜索接口public List<String> doSearch(String key) {// 假设调用网络接口获取搜索结果List<String> list = new ArrayList<>();list.add("abc");return list;}
}
- throttleFirst
防止按钮重复点击可以使用该操作符,代码如下:
RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS).subscribe(new Observer<Void>() {@Override public void onCompleted() {}@Override public void onError(Throwable e) {}@Override public void onNext(Void aVoid) {System.out.println("按钮被点击了");}});
- merge
合并操作符,用于合并两个Observable对象
public class RxjavaActivity extends AppCompatActivity{@Override protected void onCreate(@Nullable Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_rxjava);Button button = (Button) findViewById(R.id.button);Observable.merge(getCouponFromLocal(), getCouponFromServer()).subscribe(new Subscriber<List<String>>() {@Override public void onCompleted() {}@Override public void onError(Throwable e) {}@Override public void onNext(List<String> strings) {for (String s : strings) {System.out.println(s + "\n");}}});}// 本地获取优惠券public Observable<List<String>> getCouponFromLocal() {List<String> list = new ArrayList<>();list.add("满99减10");return Observable.just(list);}// 本地获取优惠券(模拟网络接口调用)public Observable<List<String>> getCouponFromServer() {List<String> list = new ArrayList<>();list.add("满500减100");return Observable.just(list);}}
04-30 23:13:04.249 13568-13568/com.testdes.des I/System.out: 满99减10
04-30 23:13:04.249 13568-13568/com.testdes.des I/System.out: 满500减100
还有很多其他的操作符,这里就不一一介绍了,详细的可以参考官方wiki: .x/javadoc/io/reactivex/Flowable.html#flatMap(io.reactivex.functions.Function)
更多推荐
RxJava2使用详解
发布评论