来自 HashSet 的并行流不并行运行

我有一些想要并行处理的元素.当我使用 List 时,并行性有效.但是,当我使用 Set 时,它不会并行运行.

I have collection of elements that I want to process in parallel. When I use a List, parallelism works. However, when I use a Set, it does not run in parallel.

我写了一个代码示例来说明问题:

I wrote a code sample that shows the problem:

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("

list print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}

这是我在 Windows 7 上得到的输出

This is the output that I get on windows 7

set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

我们可以看到 Set 中的第一个元素必须在处理第二个元素之前完成.对于 List,第二个元素在第一个元素结束之前开始.

We can see that the first element from the Set had to finish before the second element is processed. For the List, the second element starts before the first element finishes.

你能告诉我是什么导致了这个问题,以及如何使用 Set 集合来避免它吗?

Can you tell me what causes this issue, and how to avoid it using a Set collection?

推荐答案

我可以重现您看到的行为,其中并行度与您指定的 fork-join 池并行度的并行度不匹配.在将 fork-join 池并行度设置为 10 并将集合中的元素数量增加到 50 之后,我看到基于列表的流的并行度仅上升到 6,而基于集合的流的并行度从未超过2.

I can reproduce the behavior you see, where the parallelism doesn't match the parallelism of the fork-join pool parallelism you've specified. After setting the fork-join pool parallelism to 10, and increasing the number of elements in the collection to 50, I see the parallelism of the list-based stream rising only to 6, whereas the parallelism of the set-based stream never gets above 2.

但是,请注意,这种将任务提交到 fork-join 池以在该池中运行并行流的技术是一种实现技巧",并且不能保证正常工作.实际上,用于执行并行流的线程或线程池是未指定的.默认情况下,使用通用的 fork-join 池,但在不同的环境中,最终可能会使用不同的线程池.(考虑一个应用服务器中的容器.)

Note, however, that this technique of submitting a task to a fork-join pool to run the parallel stream in that pool is an implementation "trick" and is not guaranteed to work. Indeed, the threads or thread pool that is used for execution of parallel streams is unspecified. By default, the common fork-join pool is used, but in different environments, different thread pools might end up being used. (Consider a container within an application server.)

在 java.util.stream.AbstractTask 类,LEAF_TARGET 字段决定了完成的拆分量,进而决定了可以实现的并行量.该字段的值基于 ForkJoinPool.getCommonPoolParallelism() 当然使用公共池的并行性,而不是碰巧运行任务的任何池.

In the java.util.stream.AbstractTask class, the LEAF_TARGET field determines the amount of splitting that is done, which in turn determines the amount of parallelism that can be achieved. The value of this field is based on ForkJoinPool.getCommonPoolParallelism() which of course uses the parallelism of the common pool, not whatever pool happens to be running the tasks.

可以说这是一个错误(请参阅 OpenJDK 问题 JDK-8190974),但是,无论如何,这整个区域都未指定.然而,系统的这个领域肯定需要开发,例如在拆分策略、可用并行量、处理阻塞任务等方面.JDK 的未来版本可能会解决其中一些问题.

Arguably this is a bug (see OpenJDK issue JDK-8190974), however, this entire area is unspecified anyway. However, this area of the system definitely needs development, for example in terms of splitting policy, the amount of parallelism available, dealing with blocking tasks, among other issues. A future release of the JDK may address some of these issues.

同时,可以通过使用系统属性来控制公共 fork-join 池的并行性.如果将此行添加到程序中,

Meanwhile, it is possible to control the parallelism of the common fork-join pool through the use of system properties. If you add this line to your program,

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

并且您在公共池中运行流(或者如果您将它们提交到具有足够高并行度集的自己的池),您将观察到更多任务并行运行.

and you run the streams in the common pool (or if you submit them to your own pool that has a sufficiently high level of parallelism set) you will observe that many more tasks are run in parallel.

您还可以使用 -D 选项在命令行上设置此属性.

You can also set this property on the command line using the -D option.

同样,这不是保证行为,将来可能会发生变化.但在可预见的未来,这种技术可能会适用于 JDK 8 的实现.

Again, this is not guaranteed behavior, and it may change in the future. But this technique will probably work for JDK 8 implementations for the forseeable future.

2019-06-12 更新: 错误 JDK-8190974 已在 JDK 10 中修复,并且该修复已向后移植到即将发布的 JDK 8u 版本 (8u222).

UPDATE 2019-06-12: The bug JDK-8190974 was fixed in JDK 10, and the fix has been backported to an upcoming JDK 8u release (8u222).

相关文章