ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站www.reactive.io,就像其网站说的”Expertise makes better software.”,响应式编程的目标是提供一致的编程接口, 帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:RxJava 、RxSwift、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加 RxAndroid 库就可以,RxAndroid 是对 RxJava 一种更接地气的扩展。下面让我们通过 RxAndroid 去使用、理解 RxJava 吧。
Rx使用观察者模式
创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作
Rx使代码简化
函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
先看简单的例子,通过RxJava将Integer类型转成String类型
private void funcDemo() {
Observable.OnSubscribe<Integer> onSubscribe1 = new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(100);
}
};
Func1<Integer, String> func1 = new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("onNext(): ", s);
}
};
Observable.create(onSubscribe1)
.map(func1)
.subscribe(subscriber1);
// 将上面分解成三步执行
// Observable<Integer> observable1 = Observable.create(onSubscribe1);
// Observable<String> observable2 = observable1.map(func1);
// observable2.subscribe(subscriber1);
}
主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。
这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:
observable1:被观察者,是subscriber1要订阅的对象。
onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。
subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。
// 分解成三步执行的代码
Observable<Integer> observable1 = Observable.create(onSubscribe1);
Observable<String> observable2 = observable1.map(func1);
observable2.subscribe(subscriber1);
这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现); 指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法, 之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。
换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串”100”。
简单点说就是:观察者订阅了被观察者要发布的行为。
现在结合源码,一步步看它到底是怎么执行的!
第一步 Observable.create(onSubscribe1)
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
这里面泛型比较多,先忽略
所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。
第二步 observable1.map(func1)
先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:
/**
* Represents a function with one argument.
*/
public interface Func1<T, R> extends Function {
R call(T t);
}
/**
* A one-argument action.
*/
public interface Action1<T> extends Action {
void call(T t);
}
Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。
接下来看看 map(func1) 干了神马,上源码。。。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
// 核心代码
Subscriber<? super T> subscriber2 = hook.onLift(operator).call(o);
subscriber2.onStart();
onSubscribe1.call(subscriber2);
}
});
}
map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。
/**
* Operator function for lifting into an Observable.
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
o.onNext(transformer.call(t));
}
};
}
}
Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值, OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦, 其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。
lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2; 在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象, 新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的 observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。
最后一步 subscribe(subscriber1)
subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果; 在执行订阅后返回了Subscription对象,里面包含两个方法:
public interface Subscription {
// 取消订阅
void unsubscribe();
// 订阅是否被取消
boolean isUnsubscribed();
}
最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用, 所以【图灵机器人】就这样产生啦。
参考文章:
彻底搞懂 RxJava — 基础篇
彻底搞懂 RxJava — 中级篇
彻底搞懂 RxJava — 高级篇
备注:
observable:
adj. 显著的;觉察得到的;看得见的
n. [物] 可观察量;感觉到的事物
observer:
n. 观察者;[天] 观测者;遵守者
subscribe:
vi. 订阅;捐款;认购;赞成;签署
vt. 签署;赞成;捐助
subscriber:
n. 订户;签署者;捐献者