不过,真的是吗这样?我们来试一下带有parallel(), sorted()的删除操作:
list.stream()
.sorted()
.parallel()
.peek(list::remove)
.forEach(System.out::println);
这个会输出 :
7 6 2 5 8 4 1 0 9 3 |
现在列表里包含:
[8]
唉呀。居然没有删完所有的元素?!谁能解决这个问题,我免费请他喝酒!
这些行为看起来都是不确定的,我只能建议你在使用流的时候不要去修改它内部的数据集合。这样做是没用的。
忘了去消费流
你觉得下面这个流在做什么?
IntStream.range(1, 5)
.peek(System.out::println)
.peek(i -> {
if (i == 5)
throw new RuntimeException("bang");
});
看完这段代码,你觉得应该会输出(1 2 3 4 5)然后抛出一个异常。不过并不是这样。它什么也不会做。这个流并没有被消费掉,它只是静静的待在那里。
正如别的流API或者DSL那样,你可能会忘了调用这个终止操作。当你使用peek()的时候也是这样的,因为peek有点类似于forEach()。
在jOOQ中也存在这样的情况,如果你忘了去调用 execute()或者fetch():
DSL.using(configuration)
.update(TABLE)
.set(TABLE.COL1, 1)
.set(TABLE.COL2, "abc")
.where(TABLE.ID.eq(3));
杯具。忘了调用execute方法了。
并行流死锁
终于快讲完了~
如果你没有正确地进行同步的话,所有的并发系统都可能碰到死锁。现实中的例子可能不那么明显,不过如果你想自己创造一个场景的话倒是很容易。下面这个parallel()流肯定会造成死锁:
Object[] locks = { new Object(), new Object() }; IntStream .range(1, 5) .parallel() .peek(Unchecked.intConsumer(i -> { synchronized (locks[i % locks.length]) { Thread.sleep(100); synchronized (locks[(i + 1) % locks.length]) { Thread.sleep(50); } } })) .forEach(System.out::println); |
注意这里Unchecked.intConsumer()的使用,它把IntConsumer接口转化成了 org.jooq.lambda.fi.util.function.CheckedIntConsumer,这样你才可以抛出已检查异常。
好吧。这下你的机器倒霉了。这些线程会一直阻塞下去:-)
不过好消息就是,在Java里面要写出一个这种教科书上的死锁可不是那么容易。
想进一步了解的话,可以看下Brian Goetz在StackOverflow上的一个回答。
结论
引入了流和函数式编程之后,我们开始会碰到许多新的难以发现的BUG。这些BUG很难避免,除非你见过并且还时刻保持警惕。你必须去考虑操作的顺序,还得注意流是不是无限的。
流是一个非常强大的工具,但也是一个首先得去熟练掌握的工具。