1. 前言了解响应式编程,首先我们需要了解函数式操作和Stream的操作,下面我们简单的复习一下喽 。
1.1 常用函数式编程函数式接口中
我们先来回顾一下JAVA中的函数式接口 。常见的有以下几种
- Consumer 一个输入,无输出
- Supplier 无输入,有输出
- Function<T,R> 输入T,输出R
- BiFunction<T,U,R> 输入T,U 输出R
- Predicate 有输入,输出boolean类型
Consumer consumer = (i)-> System.out.println("this is " + i);consumer.accept("consumer");Supplier supplier= () -> "this is supplier";System.out.println(supplier.get());Function<Integer,Integer> function = (i) -> i*i;System.out.println(function.Apply(8));BiFunction<Integer,Integer,String> biFunction = (i,j)-> i+"*"+j+"="+i*j;System.out.println(biFunction.apply(8,8));Predicate<Integer> predicate = (i) -> i.intValue()>3;System.out.println(predicate.test(5));
其执行结果如下:this is consumerthis is supplier648*8=64true
1.2 Stream操作对Stream进行操作,主要有几个关键点:- 生成流
- 流的中间操作 其中中间操作可以有多个,中间操作会返回一个新的流(如 map ,filter,sorted等),然后交给下一个流方法使用 。
- 流的终结操作 终结操作只有一个 。终结操作执行后,流就到了终止状态,无法被操作 (如forEach,toArray , findFirst 等) 。
String[] strArray = {"ss","ss","","sdffg"};Arrays.stream(strArray).forEach(System.out::println);Arrays.asList(strArray).stream().forEach(System.out::println);Stream.of(strArray).forEach(System.out::println);Stream.iterate(1,(i) -> i+1).limit(10).forEach(System.out::println);Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);
简单的流处理示例:String[] strArray1 = {"ss","ss","","sdffg","bca-de","fff"};String collect = Stream.of(strArray1).filter(i -> !i.isEmpty())//过滤空字符串.sorted() //排序.limit(1) //只取第一个元素.map(i -> i.replace("-", ""))//替换 "-".flatMap(i -> Stream.of(i.split("")))//将字符拆成字符数组.sorted() //排序.collect(Collectors.joining());//将字符拼接组合到一起System.out.println(collect);//最后输出abcde
2. Java响应式编程响应式编程会用到一个发布者和一个订阅者,然后通过订阅关系完成数据流的传输 。订阅关系中可以处理一些背压问题,即调节消费者与生产者之间的供需平衡,让整个程序达到最大效率 。文章插图
Java9中java.util.concurrent.Flow接口提供响应式流编程类似的功能 。
下面我们实现一个基于Java 响应式编程的示例:
其中有三个简单步骤:
- 建立生产者
- 构建消费者
- 消费者订阅生产者
- 生产者生产内容
SubmissionPublisher publisher = new SubmissionPublisher<>();//建立生产者Flow.Subscriber subscriber = new Flow.Subscriber() {...};//建立消费者 (其中的实现放到下面)publisher.subscribe(subscriber);//订阅关系for (int i = 0; i < 10; i++) { publisher.submit("test reactive java : " +i); //生产者生产内容}
消费者全部代码如下:Flow.Subscriber subscriber = new Flow.Subscriber() {Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("Subscription establish first ");this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(Object item) {subscription.request(10);System.out.println("receive :"+ item);}@Overridepublic void onError(Throwable throwable) {System.out.println(" onError ");}@Overridepublic void onComplete() {System.out.println(" onComplete ");}};
其中onSubscribe方法表示建立订阅关系onNext接受数据,并请求生产者的数据 。
onError,onComplete则是error或者完成之后的处理方法 。
带有中间处理器的响应式流Reactive Stream 通常会基于如下的模型:
文章插图
下面我们实现一个带有中间处理功能的响应式模型:
下面的Processor 既有发布者,又有订阅者:
推荐阅读
- 理想|不怕库里南的理想L9明日开定:怎么买、有啥权益 一文看懂
- Notifier 一文搞懂Linux内核通知链
- 二级响应是什么意思?
- MacBook|苹果最新自研芯片性能如何?一文了解M2版MacBook Air跑分详情
- 艺术品|和田玉最主要的分清点,搞懂什么是商品,什么是可观赏的艺术品
- 釉里红|一次搞懂12种陶瓷类型
- 拍照|2022年最大超级月亮今晚上演:手机如何拍月亮 一文看懂
- 一文打尽NMS技术
- 一文搞懂shell脚本
- 烫发|染发真的会致癌吗?染发和烫发哪个危害大?一文告诉你