一文搞懂响应式编程

1. 前言了解响应式编程,首先我们需要了解函数式操作和Stream的操作,下面我们简单的复习一下喽 。
1.1 常用函数式编程函数式接口中
我们先来回顾一下JAVA中的函数式接口 。常见的有以下几种

  • Consumer 一个输入,无输出
  • Supplier 无输入,有输出
  • Function<T,R> 输入T,输出R
  • BiFunction<T,U,R> 输入T,U 输出R
  • Predicate 有输入,输出boolean类型
上面的简单函数式接口示例如下:
Consumer consumer = (i)-> System.out.println("this is " + i);consumer.accept("consumer");Supplier supplier= () -> "this is supplier";System.out.println(supplier.get());Function<Integer,Integer> function = (i) -> i*i;System.out.println(function.Apply(8));BiFunction<Integer,Integer,String> biFunction = (i,j)-> i+"*"+j+"="+i*j;System.out.println(biFunction.apply(8,8));Predicate<Integer> predicate = (i) -> i.intValue()>3;System.out.println(predicate.test(5));其执行结果如下:
this is consumerthis is supplier648*8=64true1.2 Stream操作对Stream进行操作,主要有几个关键点:
  • 生成流
  • 流的中间操作 其中中间操作可以有多个,中间操作会返回一个新的流(如 map ,filter,sorted等),然后交给下一个流方法使用 。
  • 流的终结操作 终结操作只有一个 。终结操作执行后,流就到了终止状态,无法被操作 (如forEach,toArray , findFirst 等) 。
创建流的示例:
String[] strArray = {"ss","ss","","sdffg"};Arrays.stream(strArray).forEach(System.out::println);Arrays.asList(strArray).stream().forEach(System.out::println);Stream.of(strArray).forEach(System.out::println);Stream.iterate(1,(i) -> i+1).limit(10).forEach(System.out::println);Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);简单的流处理示例:
String[] strArray1 = {"ss","ss","","sdffg","bca-de","fff"};String collect = Stream.of(strArray1).filter(i -> !i.isEmpty())//过滤空字符串.sorted() //排序.limit(1) //只取第一个元素.map(i -> i.replace("-", ""))//替换 "-".flatMap(i -> Stream.of(i.split("")))//将字符拆成字符数组.sorted() //排序.collect(Collectors.joining());//将字符拼接组合到一起System.out.println(collect);//最后输出abcde2. Java响应式编程响应式编程会用到一个发布者和一个订阅者,然后通过订阅关系完成数据流的传输 。订阅关系中可以处理一些背压问题,即调节消费者与生产者之间的供需平衡,让整个程序达到最大效率 。
一文搞懂响应式编程

文章插图
 
Java9中java.util.concurrent.Flow接口提供响应式流编程类似的功能 。
下面我们实现一个基于Java 响应式编程的示例:
其中有三个简单步骤:
  1. 建立生产者
  2. 构建消费者
  3. 消费者订阅生产者
  4. 生产者生产内容
SubmissionPublisher publisher = new SubmissionPublisher<>();//建立生产者Flow.Subscriber subscriber = new Flow.Subscriber() {...};//建立消费者 (其中的实现放到下面)publisher.subscribe(subscriber);//订阅关系for (int i = 0; i < 10; i++) { publisher.submit("test reactive java : " +i); //生产者生产内容}消费者全部代码如下:
Flow.Subscriber subscriber = new Flow.Subscriber() {Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("Subscription establish first ");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(Object item) {subscription.request(10);System.out.println("receive :"+ item);}@Overridepublic void onError(Throwable throwable) {System.out.println(" onError ");}@Overridepublic void onComplete() {System.out.println(" onComplete ");}};其中onSubscribe方法表示建立订阅关系
onNext接受数据,并请求生产者的数据 。
onError,onComplete则是error或者完成之后的处理方法 。
带有中间处理器的响应式流Reactive Stream 通常会基于如下的模型:
一文搞懂响应式编程

文章插图
 
下面我们实现一个带有中间处理功能的响应式模型:
下面的Processor 既有发布者,又有订阅者:


推荐阅读