架构师之RX响应式编程——RxJava2.0操作符原理( 二 )

最终它会调用它的子类ObservableCreate的subscribeActual方法:
@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}复制代码在subscribeActual里首先创建了用于发射事件的CreateEmitter对象parent,CreateEmitter实现了接口Emitter和Disposable,并持有observer 。这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用 。细心的同学也会注意到这行代码之前,parent先被传入了observer的onSubscribe()方法,而在上面我们说过,observer的onSubscribe()方法接受一个Disposable类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer的onSubscribe(),给了我们调用CreateEmitter的解除订阅的方法dispose()的机会 。继续来看CreateEmitter的onNext()方法,它最终是通过调用observer的onNext()方法将事件发射出去的
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}// 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅if (!isDisposed()) {observer.onNext(t);}}...}复制代码至此,Rx事件源的创建和订阅的流程就走通了 。
下面我们从map操作符来入手看一下Rx操作符的原理,map方法如下
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}复制代码map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}actual.onNext(v);}@Overridepublic int requestFusion(int mode) {return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Exception {T t = qs.poll();return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}}复制代码可以看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源source,MapObserver持有来自它下游的事件接收者和我们实现的转换方法function,在subscribeActual()方法中完成ObservableMap对source的订阅,触发MapObserver的onNext()方法,继而将来自source的原始数据经过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能 。
现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {e.onNext("holen");}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(@NonNull String s) throws Exception {return s.length();}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer integer) {}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});复制代码


推荐阅读