About Learn RxJava

  RxJava其实出来已经很久了,一直以来只是知道,但是并没有去认真的学习过,最近看了一些相关的RxJava相关的资料,亲手把他们Coding了一遍,发现确实用起来感觉很爽。于是我打算在这里记录一下自己学习RxJava的一些笔记和资料供自己Mark一下,方便以后查阅。

基本概念

  • 观察/订阅者:Observer/Subscriber,它决定事件触发的时候将有怎样的行为(事件接收方)
  • 被观察者:Observable,它决定什么时候触发事件以及触发怎样的事件(事件发送方)
  • 订阅:Subscribe,用subscribe()方法将观察者被观察者关联起来。

Subscribe代码形式:

1
2
3
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

线程控制

  • subscribeOn(): 指定 subscribe() 所发生的线程,即Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程,可以从事件发出的开端就造成影响,只能调用一次(可多次调用,只有第一次有作用),默认情况下,链上的操作符将会在调用.subsribeOn()的这个线程上执行任务

  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程,指定的是它之后的操作所在的线程,可多次调用完成线程切换

典型线程变幻形式:

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线

doOnSubscribe()

  SubscriberonStart()方法由于在subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe()将会在什么线程执行。

  而与Subscriber.onStart()相对应的,有一个方法Observable.doOnSubscribe()。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

典型流程代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
//通常处理一些耗时操作任务 eg. network request
//取决于第一个subscribeOn()调用的线程
}
}).subscribeOn(Schedulers.io()).doOnSubscribe(new Action0() {
@Override
public void call() {
//通常做预处理逻辑 eg. showProgressBar
//取决于之后第一个subscribeOn()的线程
}
}).subscribeOn(AndroidSchedulers.mainThread()) //只会影响到doOnSubscribe()的线程
.observeOn(AndroidSchedulers.mainThread()) //切换到UI线程
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
//do in UI thread
}

@Override
public void onError(Throwable e) {
//do in UI thread
}

@Override
public void onNext(Object o) {
//do in UI thread
}
});

资料参考

RxJava  RxAndroid  RxBinding

给 Android 开发者的 RxJava 详解

在正确的线程上观察Observable

Awesome-RxJava