RxJava的常用操作符
just(T...)
:将传入的参数依次发送出来,快捷创建创建事件队列的方法1
Observable.just("hello","world”);
等同于
1
2
3
4
5
6
7
8Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("world");
subscriber.onCompleted();
}
})from(T[])
/from(Iterable)
: 将传入的数组或Iterable
拆分成具体对象后,依次发送出1
2String[] words = new String[]{"hello","world"};
Observable.from(words);等同于
1
2
3
4
5
6
7
8
9
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello");
subscriber.onNext("world");
subscriber.onCompleted();
}
})
Rxjava的知识点
RxJava的基本实现方式:
Observable即被观察者,它决定了什么时候触发事件以及触发怎样的事件。
Observer即观察者,它决定了事件触发的时候将有怎样的行为。
1 | Observable.create(new Observable.OnSubscribe<String>() { |
除了Observer接口之外,RxJava还内置了一个实现了Observer的抽象类:Subscriber.
1 | Observable.create(new Observable.OnSubscribe<String>() { |
两种使用方式是一样的。它们的区别对于使用者来说主要有两点:
- Subscriber新增加了
onStart()
方法,它会在subscribe刚开始事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或者重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作有线程要求(如弹出一个转菊花的ProgressBar,就必须在主线程执行),onStart()
方法就不适用了,因为它总是在subscribe所发生的线程被调用,而不能指定线程。需要在指定线程来做准备工作,可以使用doOnSubscribe()
方法。 - Subscriber实现了另一个方法
unsubscribe()
,这个方法被调用后,Subscriber将不再接收事件。一般这个方法调用前,可以使用isUnsubscribed()
先判断一下状态。unsubscribe()
主要用于解除引用关系,以避免内存泄露的发生。
Action
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29Action1<String> onNextAction = new Action1<String>() {
public void call(String s) {
// next()
Log.d(TAG,s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
public void call(Throwable throwable) {
// ERROR
}
};
Action0 onCompletedAction = new Action0() {
public void call() {
// complete
Log.d(TAG,"onCompleted");
}
};
// 自动创建Subscriber,并使用onNextAction来定义onNext();
observable.subscribe(onNextAction);
// 自动创建Subscriber,并使用onNextAction onErrorAction 来定义onNext() onError()
observable.subscribe(onNextAction,onErrorAction);
// 自动创建Subscriber,并使用onNextAction onErrorAction onCompletedAction来定义onNext() onError() onCompleted()
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);Action0是RxJava的一个接口,它只有一个方法
call()
,这个方法是无参数无返回值的;由于onCompleted()
方法也是无参数无返回值得,因此Action0可以被当成一个包装对象,将onCompleted()
的内容打包起来将自己作为一个参数传入subscribe()
以实现不完整定义的回调。Action1也是一个接口,它同样只有一个方法call(T t)
,这个方法也无返回值,但是有一个参数;与Action0同理,由于onNext(T t)
和onError(Throwable error)
也是单参数无返回值的,因此Action1可以将onNext(t)
和onError(error)
打包。 RxJava提供了多个ActionX形式的接口,他们可以用以包装不同的无返回值的方法。ActionX的方法是无返回值的。例如,将字符串数组names中的所有字符串依次打印出来:
1
2
3
4
5
6
7
8String[] names = new String[]{"John","Jim","Tom","Alexander"};
Observable.from(names)
.subscribe(new Action1<String>() {
public void call(String s) {
Log.d(TAG,s);
}
});线程控制 —— Scheduler
在不指定线程的情况下,RxJava遵循的是线程不变的原则,即:在哪个线程调用
subscribe()
,就在哪个线程产生事件;在哪个线程产生事件就在哪个线程消费事件。如果需要切换线程,就需要用到Scheduler(调度器)。RxJava通过它来指定每一段代码应该运行在什么样的线程。RxJava已经内置了几个Scheduler:Schedulers.immediate()
:直接在当前线程中运行,相当于不指定线程。这是默认的Scheduler。Schedulers.newThread()
:总是启用新线程,并在新线程执行操作。Schedulers.io()
:I/O操作(读写文件,读写数据库,联网等)所使用的Scheduler。行为模式跟newThread()的差不多,区别在于io()的内部实现是一个无数量上线的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。Schedulers.computation()
:计算所使用的Scheduler。这个计算指的是cpu密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的是固定的线程池,大小为CPU核心数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费cpu。- 另外,Android还有一个专用的
AndroidSchedulers.mainThread()
,它制定的操作将在Android主线程运行。
有了这几个Scheduler,就可以使用
subscribeOn()
和observeOn()
两个方法来对线程进行控制。subscribeOn()
指定subscribe()所发生的线程,即Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。observeOn()
指定Subscriber所运行在的线程。或者叫做事件的消费的线程。1
2
3
4
5
6
7
8
9
10
11
12String[] names = new String[]{"John","Jim","Tom","Alexander"};
Observable.from(names)
// 指定names发生的线程在io()线程,被创建的事件的内容names会在io线程发出
.subscribeOn(Schedulers.io())
// 指定Subscriber的回调发生在主线程,故打印将发生在主线程。
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
public void call(String s) {
Log.d(TAG,s);
}
});事实上,这种在
subscribe()
之前写上两句subscribeOn(Scheduler.io())
和observeOn(AndroidSchedulers.mainThread())
的使用方式非常常见,它适用于多数的“后台线程取数据,主线程显示”的程序策略。变换
所谓变化,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次observeOn()
即可。不过,不同于observeOn()
,subscribeOn()
的位置放在哪里都可以,但它是只能调用一次的。当使用了多个subscribeOn()
的时候,只有第一个subscribeOn()
起作用。然而,虽然超过一个的
subscribeOn()
对事件处理的流程没有影响,但在流程之前确是可以利用的。那就是与Subscriber.onStart()
方法相对应的Observable.doOnSubscribe()
方法,它和Subscriber.onStart()
同样是在subscribe()
调用后且在事件发送前执行,但区别在于它可以指定线程。默认情况下,doOnSubscribe()
执行在subscribe()
发生的线程。如果在doOnSubscribe()
之后有subscribeOn()
方法,则它将执行离他最近的subScribeOn()
所指定的线程。如下,在doOnSubscribe()
后面跟一个subscribeOn(),就能指定准备工作的线程了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18Observable.just("hello","world")
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
public void call() {
// 需要在主线程执行
showProgressBar();
}
})
// 指定主线程
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
public void call(String s) {
Log.d(TAG,s);
}
});Func
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Observable.just("images/sky.png")
.map(new Func1<String, Bitmap>() {
public Bitmap call(String s) {
return getBitmapFromFile(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
public void call(Bitmap bitmap) {
iv.setImageBitmap(bitmap);
}
});这里出现了一个叫做Func1的类。它和Action1非常相似,也是RxJava的一个接口,用于包装含有一个参数的方法。Func跟Action的区别在于,Func包装的是有返回值的方法。另外,和ActionX一样,FuncX也有多个,用于不同参数个数的方法。
- map():事件对象的直接变换。它是RxJava最常用的变换。
- flatMap():它也是把传入的参数转化之后返回另一个对象。但是跟map()不同的是,flatMap()返回的是个Observable对象,并且这个Observable对象并不是被直接发送到了Subscriber的回调方法中。
- throttleFirst():在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听事件。
———————————♥︎举个栗子♥︎————————————
假如我现在需要联网加载百度的页面,然后将返回的内容显示在一个TextView上,就可以这么写:
1 | public class MainActivity extends AppCompatActivity { |
运行效果:
由于网速较快,所以ProgressDialog一闪而过,gif图上面看不出来。