环境:projectreactor2020.0.14
1. 前言在响应式编程中,Project Reactor提供了两个核心的概念:Mono和Flux 。Mono和Flux都是Reactor中的Publisher,它们可以产生并发布数据,然后可以被订阅和消费 。这两个概念在WebFlux中有着广泛的应用,帮助我们实现异步和非阻塞的编程模型 。
在这个主题中,我们将深入探讨Mono和Flux的基本使用 。我们将了解它们如何被创建,如何订阅它们的事件 , 以及如何处理错误和完成通知 。通过学习这些内容,你将能够更好地理解WebFlux的响应式编程模型,并能够在你的项目中有效地使用Mono和Flux 。
让我们开始吧!
2. 环境依赖<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependencyManagement><dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-bom</artifactId><version>${reactor.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
3. Mono & Flux介绍Flux【新一代WebFlux框架核心技术Reactor响应式编程基本用法】Flux表示了0到N个元素序列,下图展示了Flux如何转换元素
文章插图
Flux
一个Flux<T>是一个标准的Publisher<T> , 它表示一个由0到N个发射项目组成的异步序列,可选地由一个完成信号或一个错误终止 。在响应式流规范中,这三种类型的信号转换为对下游订阅者的onNext、onComplete和onError方法的调用 。
由于可能信号的范围很大,Flux是通用的反应式类型 。请注意,所有事件,甚至是终止事件,都是可选的:只有onComplete事件才能表示一个空的有限序列,但删除onComplete事件就会得到一个无限的空序列(没什么用处 , 除了关于取消的测试) 。类似地 , 无限序列不一定是空的 。例如,Flux.interval(Duration)产生一个无限长的Flux<Long>,并从时钟发出规则的时标 。
MonoMono表示了0个或1个元素序列,下图展示了Mono如何转换元素
文章插图
Mono
Mono<T>是一个专门的发布者<T>,它通过onNext信号发出最多一个项目,然后以onComplete信号终止(Mono成功,有或没有值) , 或只发出一个onError信号(Mono失败) 。
大多数Mono实现都希望在调用onNext之后立即对其订阅者调用onComplete 。Mono.never()是一个异常值:它不会发出任何信号 , 这在技术上并没有被禁止,但在测试之外并不是特别有用 。另一方面,onNext和onError的组合是明确禁止的 。
Mono只提供了可用于` Flux `的操作符子集,有些操作符(特别是那些将Mono与另一个`Publisher`结合的操作符)会切换到`Flux` 。例如,Mono#concatWith(Publisher)返回一个Flux,而Mono#then(Mono)返回另一个Mono 。
注意 , 你可以使用Mono来表示只有完成概念的无值异步进程(类似于Runnable) 。要创建一个,可以使用一个空的Mono<Void> 。
4. Mono & Flux常用操作Mono常用操作
- 创建元素
Mono.just(10).subscribe(System.out::println) ;
Mono.empty()方法:创建一个空的Mono对象,即不包含任何元素 。Mono.justOrEmpty(T value)方法:如果指定值不为null,则创建一个包含该值的Mono对象;否则创建一个空的Mono对象 。
// 输出10Mono.justOrEmpty(10).subscribe(System.out::println) ;// 如果值为null,没有任何输出Mono.justOrEmpty(null).subscribe(System.out::println) ;
文章插图
图片
Mono.fromCallable(Callable<? extends T> supplier)方法:创建一个Mono对象,该对象包含通过调用给定Callable对象的call()方法得到的返回值 。
// 通过Callable方式,我们可以在内部执行其它一些动作Mono.fromCallable(() -> 666).subscribe(System.out::println) ;
文章插图
Mono.fromSupplier(Supplier<? extends T> supplier)方法:创建一个Mono对象,该对象包含通过调用给定Supplier对象的get()方法得到的返回值 。
Mono.fromSupplier(() -> 666).subscribe(System.out::println) ;
推荐阅读
- XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?
- Astro,这个前端框架有点不一样!
- 10大Web开发框架有哪些?
- 无缝集成GORM与Go Web框架
- 前端框架的演进与未来展望
- 为什么RPC框架数十年还在造轮子?EJB骨灰都快找不到了!
- .NET Core下有热门的ORM框架使用方法
- .NET Core中一些优秀的项目和框架
- 常用消息队列框架与技术选型
- Jest:目前最广泛使用的前端 JavaScript 测试框架