最终它会调用它的子类ObservableCreate的subscribeActual方法:
@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}复制代码
在subscribeActual里首先创建了用于发射事件的CreateEmitter对象parent,CreateEmitter实现了接口Emitter和Disposable,并持有observer 。这段代码的关键语句是source.subscribe(parent),这行代码执行后,就会触发事件源进行发射事件,即e.onNext("s")会被调用 。细心的同学也会注意到这行代码之前,parent先被传入了observer的onSubscribe()方法,而在上面我们说过,observer的onSubscribe()方法接受一个Disposable类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了observer的onSubscribe(),给了我们调用CreateEmitter的解除订阅的方法dispose()的机会 。继续来看CreateEmitter的onNext()方法,它最终是通过调用observer的onNext()方法将事件发射出去的
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}// 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅if (!isDisposed()) {observer.onNext(t);}}...}复制代码
至此,Rx事件源的创建和订阅的流程就走通了 。
下面我们从map操作符来入手看一下Rx操作符的原理,map方法如下
@CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)public final <R> Observable<R> map(Function<? super T, ? extends R> mApper) {ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}复制代码
map方法接受一个Function类型的参数mapper,返回了一个ObservableMap对象,它也是继承自Observable,而mapper被传给了ObservableMap的成员function,同时当前的源Observable被传给ObservableMap的成员source,进入ObservableMap类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {actual.onNext(null);return;}U v;try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}actual.onNext(v);}@Overridepublic int requestFusion(int mode) {return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Exception {T t = qs.poll();return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}}复制代码
可以看到这里用到了装饰者模式,ObservableMap持有来自它上游的事件源source,MapObserver持有来自它下游的事件接收者和我们实现的转换方法function,在subscribeActual()方法中完成ObservableMap对source的订阅,触发MapObserver的onNext()方法,继而将来自source的原始数据经过函数mapper转换后再发射给下游的事件接收者,从而实现map这一功能 。
现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {e.onNext("holen");}}).map(new Function<String, Integer>() {@Overridepublic Integer apply(@NonNull String s) throws Exception {return s.length();}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(@NonNull Integer integer) {}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}});复制代码
推荐阅读
- AMD|为Zen架构CPU的未来 AMD拼了:砸189亿搞研发
- 按摩|明星的高光妆容都出自化妆师之手?高圆圆证实还是自己最了解自己
- Intel|“4nm”工艺首秀:Intel 14代酷睿大小核架构稳了
- Intel|Intel 13代酷睿i5-13600K架构/频率大升级:跑分把AMD 5950X比下去了
- 服务器未响应的解决办法? dns服务器未响应
- 流行|今年流行白发“不染”,这种爆顶染越来越火,白发人群纷纷响应
- 教师|我的教师之梦
- 生科医学|宁夏确诊一例腺鼠疫病例:启动四级应急响应
- 八种常见的业务设计和架构模型 业务设计
- 开源流程引擎Camunda技术架构