架构师之RX响应式编程——RxJava2.0操作符原理( 四 )


Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {Log.i(TAG, "concat : " + integer + "n");}});复制代码输出结果:

架构师之RX响应式编程——RxJava2.0操作符原理

文章插图
 
4.flatMap@SchedulerSupport(SchedulerSupport.NONE)public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,boolean delayErrors, int maxConcurrency, int bufferSize) {...return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));看看ObservableFlatMap代码
public ObservableFlatMap(ObservableSource<T> source,Function<? super T, ? extends ObservableSource<? extends U>> mapper,boolean delayErrors, int maxConcurrency, int bufferSize) {super(source);this.mapper = mapper;this.delayErrors = delayErrors;this.maxConcurrency = maxConcurrency;this.bufferSize = bufferSize;}@Overridepublic void subscribeActual(Observer<? super U> t) {if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {return;}source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));}是不是和MAP超级像,我们这几看MergeObserver onNext做了什么
@Overridepublic void onNext(T t) {...p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");...subscribeInner(p);}@SuppressWarnings("unchecked")void subscribeInner(ObservableSource<? extends U> p) {for (;;) {if (p instanceof Callable) {} else {InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);addInner(inner);p.subscribe(inner);break;}}}省略了很多代码,我们看主要逻辑,获取到flatMap生成的observableSource,然后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext做了什么
//这里的onNext事件由 p.subscribe(inner)触发@Overridepublic void onNext(U t) {if (fusionMode == QueueDisposable.NONE) {parent.tryEmit(t, this);} else {parent.drain();}}void tryEmit(U value, InnerObserver<T, U> inner) {if (get() == 0 && compareAndSet(0, 1)) {actual.onNext(value);if (decrementAndGet() == 0) {return;}} else {SimpleQueue<U> q = inner.queue;if (q == null) {q = new SpscLinkedArrayQueue<U>(bufferSize);inner.queue = q;}q.offer(value);if (getAndIncrement() != 0) {return;}}drainLoop();}在这里我们终于看到我们定义的observer接收到了onNext事件
Rx操作符的应用场景说了这么多,其实我们最关心的还是Rx操作符的应用场景 。其实只要存在异步的地方,都可以优雅地使用Rx操作符 。比如很多流行的Rx周边开源项目
 
架构师之RX响应式编程——RxJava2.0操作符原理

文章插图




推荐阅读