隐藏|并行流ParallelStream中隐藏的陷阱
本文插图
前提
这篇文章介绍一下日常开发中并行流ParallelStream中隐藏的陷阱 , 这个问题其实离我们很近 , 特别是喜欢使用JDK1.8+的流式编程的伙伴 , 应该会深有感触 。 标题中所谓的"陷阱" , 其实并不是ParallelStream自身的陷阱 , 而一般是开发者错误使用ParallelStream给自己埋下的陷阱 。
一个故意而为的例子 下面举一个故意而为的例子 , 实际上应该不会有类似的业务代码:
【隐藏|并行流ParallelStream中隐藏的陷阱】public class ParallelStreamMain { public static void main(String[] args) throws Exception { List<List> array = new ArrayList<>() List item1 = new ArrayList<>() List item2 = new ArrayList<>() List target = new ArrayList<>(100) array.add(item1) array.add(item2) array.parallelStream().forEach(x -> { for (int i = 0 i < 100000 i++) { target.add(i) } }) System.out.println(target.size()) }}
某一次执行结果为:163913 。 如果不停地执行这个main方法 , 最终都会得到一个非200000的结果 , 这里的问题就在于使用了并行流parallelStream()方法 。 ParallelStream底层使用了Fork/Join框架实现 , 也就是应用了线程池ForkJoinPool把并行流中的节点抽象为ForkJoinTask进行计算 , 背后用到的"任务窃取"等原理这里就不进行展开 , 只需要明确:
- ForkJoinPool一般使用Runtime.getRuntime().availableProcessors()(此值一般认为是物理机器的逻辑核心数量)作为并行度(parallelism) , 简单认为是可并发执行的任务数 , 并不是工作线程数 。
- 多核机器中 , 使用ParallelStream在流的节点中的所有操作都相当于在「一个多线程环境中」进行操作 , 里面的所有操作都会产生不可预期的结果 , 例如可能会数组越界、添加元素丢失、部分下标index的引用为NULL等等 。
@Dataprivate static class OrderDTO { private String orderId private OrderStatus orderStatus private BigDecimal amount private Long customerId}@Dataprivate static class Order { private Long id private String orderId private Integer orderStatus private BigDecimal amount private Long customerId private OffsetDateTime createTime private OffsetDateTime editTime}public void groupByOrderStatus(Long customerId) { List orders = orderDao.selectByCustomerId(customerId) List orderDTOList = new ArrayList<>() orders.parallelStream().forEach(order -> { OrderDTO dto = new OrderDTO() ...... orderDTOList.add(dto) }) Map<String, List> collect= orderDTOList.stream().collect(Collectors.groupingBy(item -> item.getOrderStatus().getCode())) ......} 该方法的功能是通过客户ID查询订单列表 , 然后把订单列表转化为OrderDTO列表 , 然后再按照订单状态字段进行分组 。 通过生产日志和测试回归发现 , 上面的代码段中groupByOrderStatus()方法会偶发空指针异常 。
初次出现问题的时候 , 由于开发者通过Lambda表达式把多处代码压缩为1行 , 所以从异常栈比较难排查具体发生问题的代码 , 后面把Lambda表达式以句点起点拆分为多行上线后观察一段时间 , 最终定位到发生空指针异常的代码段为Collectors.groupingBy(item -> item.getOrderStatus().getCode()) , 也就是OrderDTO实例中的orderStatus为空对象 。 这里显然 , groupByOrderStatus()方法其实是被封闭在线程栈中调用 , 本不应该有多个线程去并发修改其中的内容 , 这里只剩下一个疑点:使用了parallelStream() 。 后来直接把parallelStream()修改为stream()重新上线 , 该空指针问题不再复现 。
Lambda/Stream其实并不是天然线程安全的 , 线程安全的前提是它们本身被线程封闭调用 , 并且不引入多线程环境 , 像使用了并行流 , 本质就是引入了多线程环境 。 所以 , 在开发功能的时候 , 需要仔细思考一下:
- 是否真的有必要使用Lambda和流式编程?
- 是否真的有必要用到并行流?如果使用了并行流 , 是否需要考虑引入额外的同步机制 , 例如锁?
- 如果引入了额外的同步机制 , 是否考虑是强行使用并行流 , 违反了并行流设计的初衷?
- 其实并发并不能提高性能 , 只能提高吞吐量 , 应该着重去发现和优化性能瓶颈 , 而不是拼命地把上游改造成并发调用 。
笔者有代码洁癖 , 当时还发现了上面的代码存在映射操作 , 正确来说应该使用map()函数 , 而不是forEach()去遍历元素重新装进去另一个列表 , 方法中的逻辑体现了原开发者其实对Lambda一知半解 。
?
小结 回到最初那个问题 , 其实使用并行流也可以保证执行结果和预期一致 , 不过一定需要引入额外的同步机制 , 例如这里使用「监视器」进行同步:
public class ParallelStreamMain { public static void main(String[] args) throws Exception { List<List> array = new ArrayList<>() List item1 = new ArrayList<>() List item2 = new ArrayList<>() List target = new ArrayList<>(100) array.add(item1) array.add(item2) final Object monitor = new Object() array.parallelStream().forEach(x -> { synchronized (monitor) { for (int i = 0 i < 100000 i++) { target.add(i) } } }) System.out.println(target.size()) }} 上面的方法无论执行多少次 , 最终都只会输出:200000 。 这里在并行流中添加同步代码块的逻辑看起来确实比较滑稽 , 仅仅是为了说明如果在多线程环境下 , 对一个容器进行元素增加或者修改 , 只有添加额外的同步机制 , 才能保证最终的结果是符合预期的 。 ParallelStream是一个十分优秀的设计 , 但是需要考量其适用的场景 , 避免踏进自己为自己埋下的并发陷阱 。
推荐阅读
- 净流入|每年超10万人口逃离北京,超20万人口奔赴广深,为何?
- 央视新闻客户端|甘肃陇南文县泥石流灾害 堰塞湖已打开泄水口
- 喜欢走走停停|海拔1800米的地方游览南天湖!为啥重庆丰都漂流风景区美?
- 珠海生活资讯交流|推动琴澳旅游合作, 大横琴文旅展示中心18日在横琴口岸正式开放
- 回流|阳泉:为体制内人才回流畅通渠道
- 老车部落格|造型流畅的捷恩斯电动车首次曝光 外观设计非常有运动感
- 流动|“创文”路上 一起精彩丨让文明出行成为流动风景
- 喵家影视|求指导~河流退水后,如何能够钓到大草鱼?
- |王者荣耀:推塔流阿古朵压制力,教你8分钟优势一面倒堵住高地
- 主流纯电三厢家轿 外观设计感强 智能配置豪华 空间大充电快
