一文搞懂响应式编程( 二 )


public class ReactiveProcessor extends SubmissionPublisher implements Flow.Subscriber {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println( Thread.currentThread().getName() +" Reactive processor establish connection ");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(Object item) {System.out.println(Thread.currentThread().getName() + " Reactive processor receive data: "+ item);this.submit(item.toString().toUpperCase());this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("Reactive processor error ");throwable.printStackTrace();this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println(Thread.currentThread().getName() + " Reactive processor receive data complete ");}}如上中间处理器订阅发布者,同时消费者再订阅中间处理器 。中间处理器也可以调节发布订阅的生产消费速率 。
SubmissionPublisher publisher = new SubmissionPublisher<>(); //创建生产者ReactiveProcessor reactiveProcessor = new ReactiveProcessor(); // 创建中间处理器publisher.subscribe(reactiveProcessor); //中间处理器订阅生产者Flow.Subscriber subscriber = new Flow.Subscriber() {...}; //创建消费者reactiveProcessor.subscribe(subscriber); //消费者订阅中间处理器for (int i = 0; i < 10; i++) {publisher.submit("test reactive java : " +i); //生产者生产数据}通过上述生产者-> 中间处理器->消费者,可以将生产者生产的数据全部变成大写,然后再发送给最终的消费者 。
以上式Java中的reactive 编程示例 。Java会不同线程来分别处理消费者与生产者的消息处理
3. ReactorReactor中两个比较关键的对象式Flux和Mono,整个Spring的响应式编程均式基于projectreactor项目 。Reactor是响应式编程的依赖,主要是基于JVM构建非阻塞程序 。
根据Reactor的介绍,此类响应式编程的的三方库(Reactor)主要是解决一些JVM经典异步编程中的一些缺点,并且还可以专注于一些新的特性,如下:

  • 可组合性与可读性 (Composability and readability)
  • 可以使用丰富的运算操作符将数据作为流进行操作
  • 订阅之前,不会有任何事
  • 背压特性(Backpressure ),可以理解为消费者可以向生产者发送产出率过高的信号,从而调整生产速率 。或者消费者可以选择一次性拉去一捆数据进行消费 。
  • 于并发无关的高度抽象的高级功能
其中有这么一段解释,可以形象的说明响应式编程 。
Reactive的程序可以想象成车间的流水线,reactor既是流水线上的传送带,又是处理工作站 。原料从一个原始的生产者出发,最终成为产品被推总给消费者 。
3.1 Flux & Mono下面我们介绍一下Flux和Mono 。
在Reactor中Flux和Mono均是Publisher,即生产者 。两者也有不同 。Flux对象表示0到N个异步的响应序列,而Mono只代表0个(empty)或者1个结果 。
Reactor官网上介绍的Flux示意如下:
一文搞懂响应式编程

文章插图
 
Mono示意如下:
一文搞懂响应式编程

文章插图
 
3.2 Flux Mono创建与使用我们也可以单独引用其依赖 。
使用maven依赖
<dependencies><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency></dependencies>Mono创建分别创建空Mono和一个包含一个String的Mono,并由消费者消费打印 。
Mono.empty().subscribe(System.out::println);Mono.just("Hello Mono Java North").subscribe(System.out::print);Flux创建Flux创建有如下的一些方法,
  • just(通过不定参数创建)
  • range(从某个整数开始,往后的整数数量)
  • fromArray,fromIterable,fromStream,从名称上就可以看出来,通过数组,迭代器,Stream流创建Flux
下面式一些Java代码示例
Flux.just(1,2,3,4,5).subscribe(System.out::print);Flux.range(1,20).subscribe(System.out::print);Flux.fromArray(new String[]{"a1","a2","a3","a4","a5","a6"}).skip(2).subscribe(System.out::print);Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7)).subscribe(System.out::println);Flux.fromStream(Stream.of(Arrays.asList(1,2,3,4,5,6,7))).subscribe(System.out::print);


推荐阅读