上文中咱们简单提到了JDK9中Flow接口中的静态内部类实现了响应式流的JAVA API,并且提供了一个一个Publisher的实现类SubmissionPublisher 。本文将先梳理一下接口中具体的处理流程,然后再以几个调用者的例子来帮助大家理解 。
JDK9中的实现再放上一下上文中的响应式流的交互流程:
- 订阅者向发布者发送订阅请求 。
- 发布者根据订阅请求生成令牌发送给订阅者 。
- 订阅者根据令牌向发布者发送请求N个数据 。
- 发送者根据订阅者的请求数量返回M(M<=N)个数据
- 重复3,4
- 数据发送完毕后由发布者发送给订阅者结束信号
- 创建发布者
- 创建订阅者
- 订阅令牌交互
- 发送信息
创建发布者对于实现响应流的最开始的步骤,便是创建一个发布者 。之前提到在JDK9中提供了一个发布者的简单实现SubmissionPublisher 。SubmissionPublisher继承自Flow.Publisher,他有三种构造函数:
public SubmissionPublisher() {this(ASYNC_POOL, Flow.defaultBufferSize(), null);}public SubmissionPublisher(Executor executor, int maxBufferCapacity) {this(executor, maxBufferCapacity, null);}public SubmissionPublisher(Executor executor, int maxBufferCapacity,BiConsumer<? super Subscriber<? super T>, ? super Throwable> handler)
SubmissionPublisher将使用Executor作为“线程池”向订阅者发送信息 。如果需要需要设置线程池的话可以自己传入,否则的话再无参的构造函数中将默认使用ForkJoinPool类的commonPool()方法获取,即无餐构造方法中的ASYNC_POOL静态变量 。SubmissionPublisher会为每一个订阅者单独的建立一个缓冲空间,其大小由入参maxBufferCapacity决定 。默认情况下直接使用Flow.defaultBufferSize()来设置,默认为256 。如果缓冲区满了之后会根据发送信息时候的策略确定是阻塞等待还是抛弃数据 。
SubmissionPublisher会在订阅者发生异常的时候(onNext处理中),会调用最后一个参数handler方法,然后才会取消订阅 。默认的时候为null,也就是不会处理异常 。
最简单的创建SubmissionPublisher的方法就是直接使用无参构造方法:
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
上文书说到,因为SubmissionPublisher实现了AutoCloseable接口,所以可以用try来进行资源回收可以省略close()的调用:try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){}
但是也可以手动的调用close()方法来显示的关闭发布者,关闭后再发送数据就会抛出异常:if (complete)throw new IllegalStateException("Closed");
创建订阅者上文中咱们没有手动创建订阅者,而是直接调用SubmissionPublisher中的consume方法使用其内部的订阅者来消费消息 。在本节可以实现接口Flow.Subscriber<T>创建一个SimpleSubscriber类:public class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;/*** 订阅者名称*/private String name;/*** 定义最大消费数量*/private final long maxCount;/*** 计数器*/private long counter;public SimpleSubscriber(String name, long maxCount) {this.name = name;this.maxCount = maxCount <= 0 ? 1 : maxCount;}@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;System.out.printf("订阅者:%s,最大消费数据: %d 。%n", name, maxCount);// 实际上是等于消费全部数据subscription.request(maxCount);}@Overridepublic void onNext(Integer item) {counter++;System.out.printf("订阅者:%s 接收到数据:%d.%n", name, item);if (counter >= maxCount) {System.out.printf("准备取消订阅者: %s 。已处理数据个数:%d 。%n", name, counter);// 处理完毕,取消订阅subscription.cancel();}}@Overridepublic void onError(Throwable t) {System.out.printf("订阅者: %s,出现异常: %s 。%n", name, t.getMessage());}@Overridepublic void onComplete() {System.out.printf("订阅者: %s 处理完成 。%n", name);}}
SimpleSubscriber是一个简单订阅者类,其逻辑是根据构造参数可以定义其名称name与最大处理数据值maxCount,最少处理一个数据 。当发布者进行一个订阅的时候会生成一个令牌Subscription作为参数调用onSubscribe方法 。在订阅者需要捕获该令牌作为后续与发布者交互的纽带 。一般来说在onSubscribe中至少调用一次request且参数需要>0,否则发布者将无法向订阅者发送任何信息,这也是为什么maxCount需要大于0 。
推荐阅读
- 常见的62种http响应代码整理
- apache-4-请求头和响应头
- CAD安装过程中遇到注册机无响应,怎么办?
- Django内置的响应类
- Linux 应急响应入门:入侵排查应该这样做
- Spring Boot 中如何统一 API 接口响应格式?
- Redis响应严重延迟,如何解决?
- 响应速度与智能化如何平衡,携程酒店搜索实践
- 响应式网页中的高度设计,你认真的吗?
- Spring Boot 使用拦截器优雅打印接口响应时间