架构师之RX响应式编程——RxJava2.0操作符原理

RxJAVA在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器 。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构 。
RxJava的作用:就是异步RxJava的使用,可以使“逻辑复杂的代码”保持极强的阅读性 。
RxAndorid的作用:Android中RxAndorid与RxJava配合使用; RxAndorid 封装了
AndroidSchedulers.mainThread(),Android开发者使用过程中,可以轻松的将任务post Andorid主线程中,执行页面更新操作 。
RxJava的优点简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它 。我总结它的优点主要有两个方面:
  • 简洁,免除传统异步代码逻辑中的callback hell
  • 增加业务逻辑代码的可读性
Rx的操作符有哪些刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了 。下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符
 
架构师之RX响应式编程——RxJava2.0操作符原理

文章插图
 
 
Rx操作符的原理要了解操作符的原理,肯定要从源码入手喽 。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码 。Rx的源码目录结构是比较清晰的,我们先从Observable.create方法来分析
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {e.onNext("s");}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {// 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable,Disposable相当于RxJava1.x中的Subscription,用于解除订阅 。}@Overridepublic void onNext(@NonNull String s) {}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});复制代码create方法如下
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}复制代码代码很简单,第一行判空不用管,第二行调用RxJavaPlugins的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即ObservableCreate对象,ObservableCreate是Observable的子类,实现了Observable的一些抽象方法比如subscribeActual 。事实上Rx的每个操作符都对应Observable的一个子类 。这里create方法接受的是一个ObservableOnSubscribe的接口实现类:
/** * A functional interface that has a {@code subscribe()} method that receives * an instance of an {@link ObservableEmitter} instance that allows pushing * events in a cancellation-safe manner. * * @param <T> the value type pushed */public interface ObservableOnSubscribe<T> {/*** Called for each Observer that subscribes.* @param e the safe emitter instance, never null* @throws Exception on error*/void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;}复制代码通过注释可以知道这个接口的作用是通过一个subscribe方法接受一个ObservableEmitter类型的实例,俗称发射器 。 Observable.create方法执行时,我们传入的就是一个ObservableOnSubscribe类型的匿名内部类,并实现了它的subscribe方法,然后它又被传入create方法的返回对象ObservableCreate,最终成为ObservableCreate的成员source
public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}...复制代码接着我们来看Observable的subscribe方法,它的入参是一个Observer(即观察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)@Overridepublic final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {Exceptions.throwIfFatal(e);// can't call onError because no way to know if a Disposable has been set or not// can't call onSubscribe because the call might have set a Subscription alreadyRxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}复制代码


推荐阅读