ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站www.reactive.io,就像其网站说的”Expertise makes better software.”,响应式编程的目标是提供一致的编程接口, 帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:RxJavaRxSwift、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加 RxAndroid 库就可以,RxAndroid 是对 RxJava 一种更接地气的扩展。下面让我们通过 RxAndroid 去使用、理解 RxJava 吧。

Rx使用观察者模式

创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作

Rx使代码简化

函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

先看简单的例子,通过RxJava将Integer类型转成String类型

    private void funcDemo() {
        Observable.OnSubscribe<Integer> onSubscribe1 = new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(100);
            }
        };

        Func1<Integer, String> func1 = new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return String.valueOf(integer);
            }
        };

        Subscriber<String> subscriber1 = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
                Log.d("onNext(): ", s);
            }
        };

        Observable.create(onSubscribe1)
                .map(func1)
                .subscribe(subscriber1);

        // 将上面分解成三步执行
        // Observable<Integer> observable1 = Observable.create(onSubscribe1);
        // Observable<String> observable2 = observable1.map(func1);
        // observable2.subscribe(subscriber1);
    }

主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。
这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:

    observable1:被观察者,是subscriber1要订阅的对象。  
    onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。  
    subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。

    // 分解成三步执行的代码
    Observable<Integer> observable1 = Observable.create(onSubscribe1);
    Observable<String> observable2 = observable1.map(func1);
    observable2.subscribe(subscriber1);

这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现); 指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法, 之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。

换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串”100”。

简单点说就是:观察者订阅了被观察者要发布的行为。

源码参考个人作品【图灵机器人】

现在结合源码,一步步看它到底是怎么执行的!

第一步 Observable.create(onSubscribe1)

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }

    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

这里面泛型比较多,先忽略泛型符号;hook又是什么东东?翻译过来就是“钩子”,到底是什么钩子呢,查看源码后知道, `hook` 是一个 proxy 对象, 仅仅用作调试的时候可以插入一些测试代码的,那也先忽略。

所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。

第二步 observable1.map(func1)

先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:

    /**
     * Represents a function with one argument.
     */
    public interface Func1<T, R> extends Function {
        R call(T t);
    }

    /**
     * A one-argument action.
     */
    public interface Action1<T> extends Action {
        void call(T t);
    }

Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。
接下来看看 map(func1) 干了神马,上源码。。。

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                // 核心代码
                Subscriber<? super T> subscriber2 = hook.onLift(operator).call(o);
                subscriber2.onStart();
                onSubscribe1.call(subscriber2);
            }
        });
    }

map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。

    /**
     * Operator function for lifting into an Observable.
     */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

    public final class OperatorMap<T, R> implements Operator<R, T> {

        final Func1<? super T, ? extends R> transformer;

        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }

        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new Subscriber<T>(o) {
                @Override
                public void onCompleted() {
                    o.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
                @Override
                public void onNext(T t) {
                    o.onNext(transformer.call(t));
                }
            };
        }
    }

Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值, OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦, 其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。

lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2; 在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象, 新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的 observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。

最后一步 subscribe(subscriber1)

subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果; 在执行订阅后返回了Subscription对象,里面包含两个方法:

    public interface Subscription {
        // 取消订阅
        void unsubscribe();
        // 订阅是否被取消
        boolean isUnsubscribed();
    }

最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用, 所以【图灵机器人】就这样产生啦。

参考文章:

彻底搞懂 RxJava — 基础篇
彻底搞懂 RxJava — 中级篇
彻底搞懂 RxJava — 高级篇

大头鬼Bruce
RxJava基本流程和lift源码分析

扔物线
给 Android 开发者的 RxJava 详解

备注:

observable:
adj. 显著的;觉察得到的;看得见的
n. [物] 可观察量;感觉到的事物

observer:
n. 观察者;[天] 观测者;遵守者

subscribe:
vi. 订阅;捐款;认购;赞成;签署
vt. 签署;赞成;捐助

subscriber:
n. 订户;签署者;捐献者