Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(@NonNull Integer integer) throws Exception {Log.i(TAG, "concat : " + integer + "n");}});复制代码
输出结果:
文章插图
4.flatMap
@SchedulerSupport(SchedulerSupport.NONE)public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,boolean delayErrors, int maxConcurrency, int bufferSize) {...return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
看看ObservableFlatMap代码 public ObservableFlatMap(ObservableSource<T> source,Function<? super T, ? extends ObservableSource<? extends U>> mapper,boolean delayErrors, int maxConcurrency, int bufferSize) {super(source);this.mapper = mapper;this.delayErrors = delayErrors;this.maxConcurrency = maxConcurrency;this.bufferSize = bufferSize;}@Overridepublic void subscribeActual(Observer<? super U> t) {if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {return;}source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));}
是不是和MAP超级像,我们这几看MergeObserver onNext做了什么@Overridepublic void onNext(T t) {...p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");...subscribeInner(p);}@SuppressWarnings("unchecked")void subscribeInner(ObservableSource<? extends U> p) {for (;;) {if (p instanceof Callable) {} else {InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);addInner(inner);p.subscribe(inner);break;}}}
省略了很多代码,我们看主要逻辑,获取到flatMap生成的observableSource,然后 p.subscribe(inner);注意这里的P不是observable 看innerObserver的onNext做了什么//这里的onNext事件由 p.subscribe(inner)触发@Overridepublic void onNext(U t) {if (fusionMode == QueueDisposable.NONE) {parent.tryEmit(t, this);} else {parent.drain();}}void tryEmit(U value, InnerObserver<T, U> inner) {if (get() == 0 && compareAndSet(0, 1)) {actual.onNext(value);if (decrementAndGet() == 0) {return;}} else {SimpleQueue<U> q = inner.queue;if (q == null) {q = new SpscLinkedArrayQueue<U>(bufferSize);inner.queue = q;}q.offer(value);if (getAndIncrement() != 0) {return;}}drainLoop();}
在这里我们终于看到我们定义的observer接收到了onNext事件Rx操作符的应用场景说了这么多,其实我们最关心的还是Rx操作符的应用场景 。其实只要存在异步的地方,都可以优雅地使用Rx操作符 。比如很多流行的Rx周边开源项目
文章插图
推荐阅读
- AMD|为Zen架构CPU的未来 AMD拼了:砸189亿搞研发
- 按摩|明星的高光妆容都出自化妆师之手?高圆圆证实还是自己最了解自己
- Intel|“4nm”工艺首秀:Intel 14代酷睿大小核架构稳了
- Intel|Intel 13代酷睿i5-13600K架构/频率大升级:跑分把AMD 5950X比下去了
- 服务器未响应的解决办法? dns服务器未响应
- 流行|今年流行白发“不染”,这种爆顶染越来越火,白发人群纷纷响应
- 教师|我的教师之梦
- 生科医学|宁夏确诊一例腺鼠疫病例:启动四级应急响应
- 八种常见的业务设计和架构模型 业务设计
- 开源流程引擎Camunda技术架构