了解 Java 8 和 Java 9 中的顺序与并行流拆分器
一个乍一看并不简单的关于拆分器的问题.
A question about spliterators that at first glance is not straightforward.
在流中,.parallel()
改变流处理的行为.但是,我期望从顺序流和并行流创建的拆分器是相同的.例如,通常在顺序流中,从不调用 .trySplit()
,而在并行流中,它是为了将拆分拆分器移交给另一个线程.
In streams, .parallel()
changes the behaviour that the stream is processed. However I was expecting the spliterators created from sequential and parallel streams to be the same. For example, in sequential streams typically, the .trySplit()
is never invoked, while in parallel streams it is, in order to hand over the split spliterator to another thread.
stream.spliterator()
与 stream.parallel().spliterator()
的区别:
他们可能有不同的特点:
They may have different characteristics:
Stream.of(1L, 2L, 3L).limit(2); // ORDERED
Stream.of(1L, 2L, 3L).limit(2).parallel(); // SUBSIZED, SIZED, ORDERED
这里讨论了另一个废话流拆分器特征策略(并行似乎更好计算):深入理解 java 8 和 java 9 中的拆分器特性
It seems another nonsense stream spliterator characteristics policy (in parallel seems better calculated) discussed here: Understanding deeply spliterator characteristics in java 8 and java 9
它们在使用
.trySplit()
进行拆分方面可能有不同的行为:
They may have different behaviour in terms of splitting using
.trySplit()
:
Stream.of(1L, 2L, 3L); // NON NULL
Stream.of(1L, 2L, 3L).limit(2); // NULL
Stream.of(1L, 2L, 3L).limit(2).parallel(); // NON NULL
为什么最后两个有不同的行为?如果我愿意,为什么我不能拆分顺序流?(例如,丢弃其中一个拆分以进行快速处理可能很有用).
Why do the last two have different behaviours? Why I can't I split a sequential stream if I want to? (Could be useful to discard one of the splits for fast processing, for example).
将拆分器转换为流时的重大影响:
Big impacts when transforming a spliterators to a stream:
spliterator = Stream.of(1L, 2L, 3L).limit(2).spliterator();
stream = StreamSupport.stream(spliterator, true); // No parallel processing!
在这种情况下,拆分器是从禁用拆分功能的顺序流创建的(.trySplit()
返回 null).稍后,需要转换回流时,该流将无法从并行处理中受益.可惜了.
In this case, a spliterator was created from a sequential stream which disables the ability to split (.trySplit()
returns null). When later, there is a need to transform back to a stream, that stream won't benefit from parallel processing. A shame.
大问题:作为一种解决方法,在调用 之前总是将流转换为并行的主要影响是什么.spliterator()
?
The big question: As a workaround, what are the major impacts of always transforming a stream to parallel before invoking .spliterator()
?
// Supports activation of parallel processing later
public static <T> Stream<T> myOperation(Stream<T> stream) {
boolean isParallel = stream.isParallel();
Spliterator<T> spliterator = stream.parallel().spliterator();
return StreamSupport.stream(new Spliterator<T>() {
// My implementation of the interface here (omitted for clarity)
}, isParallel).onClose(stream::close);
}
// Now I have the option to use parallel processing when needed:
myOperation(stream).skip(1).parallel()...
推荐答案
这不是拆分器的一般属性,而只是 wrapping 拆分器封装流管道的属性.
This is not a general property of spliterators, but only of wrapping spliterators encapsulating a stream pipeline.
当您在从拆分器生成且没有链接操作的流上调用 spliterator()
时,您将获得可能支持或不支持 trySplit 的源拆分器
,不管流的parallel
状态.
When you are calling spliterator()
on a stream that has been generated from a spliterator and has no chained operation, you’ll get the source spliterator which may or may not support trySplit
, regardless of the stream parallel
state.
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "foo", "bar", "baz");
Spliterator<String> sp1 = list.spliterator(), sp2=list.stream().spliterator();
// true
System.out.println(sp1.getClass()==sp2.getClass());
// not null
System.out.println(sp2.trySplit());
同样
Spliterator<String> sp = Stream.of("foo", "bar", "baz").spliterator();
// not null
System.out.println(sp.trySplit());
但只要在调用 spliterator()
之前链接操作,就会得到一个包装流管道的拆分器.现在,可以实现执行相关操作的专用拆分器,例如 LimitSpliterator
或 MappingSpliterator
,但这还没有完成,因为将流转换回当其他终端操作不适合时,分离器被认为是最后的手段,而不是高优先级用例.相反,您将始终获得单个实现类的实例,该实例试图将流管道实现的内部工作转换为拆分器 API.
But as soon as you chain operations before calling spliterator()
, you will get a spliterator wrapping the stream pipeline. Now, it would be possible to implement dedicated spliterators performing the associated operation, like a LimitSpliterator
or a MappingSpliterator
, but this has not been done, as converting a stream back to a spliterator has been considered as last resort when the other terminal operations do not fit, not a high priority use case. Instead, you will always get an instance of the single implementation class that tries to translate the inner workings of the stream pipeline implementation to the spliterator API.
对于有状态的操作,这可能很复杂,最值得注意的是,sorted
、distinct
或 skip
&limit
对于非 SIZED
流.对于琐碎的无状态操作,例如 map
或 filter
,提供支持会容易得多,甚至 在代码注释中注明
This can be quiet complicated for stateful operations, most notably, sorted
, distinct
or skip
&limit
for a non-SIZED
stream. For trivial stateless operations, like map
or filter
, it would be much easier to provide support, as has been even remarked in a code comment
在第一次操作时绑定到管道助手的拆分器的抽象包装拆分器.此拆分器不是后期绑定的,并且在首次操作时将绑定到源拆分器.如果存在有状态操作,则无法拆分从顺序流生成的包装拆分器.
Abstract wrapping spliterator that binds to the spliterator of a pipeline helper on first operation. This spliterator is not late-binding and will bind to the source spliterator when first operated on. A wrapping spliterator produced from a sequential stream cannot be split if there are stateful operations present.
…
// @@@ Detect if stateful operations are present or not
// If not then can split otherwise cannot
/**
* True if this spliterator supports splitting
*/
final boolean isParallel;
但目前似乎还没有实现这种检测,所有中间操作都被视为有状态操作.
but it seems that currently this detection has not been implemented and all intermediate operations are treated like stateful operations.
Spliterator<String> sp = Stream.of("foo", "bar", "baz").map(x -> x).spliterator();
// null
System.out.println(sp.trySplit());
<小时>
当您尝试通过始终调用 parallel
来解决此问题时,如果流管道仅包含无状态操作,则不会产生任何影响.但是当有一个有状态的操作时,它可能会显着改变行为.例如,当您有一个 sorted
步骤时,必须先缓冲和排序所有元素,然后才能使用第一个元素.对于并行流,它可能会使用 parallelSort
,即使你从不调用 trySplit
.
When you try to work-around this by always calling parallel
, there will be no impact when the stream pipeline consists of stateless operations only. But when having a stateful operation, it might change the behavior significantly. E.g., when you have a sorted
step, all elements have to be buffered and sorted, before you can consume the first element. For a parallel stream, it will likely use a parallelSort
, even when you never invoke trySplit
.
相关文章