嵌套的 Java 8 并行 forEach 循环执行不佳.这种行为是预期的吗?

注意:我已经在另一篇 SO 帖子中解决了这个问题 - 在嵌套的 Java 8 并行流操作中使用信号量可能会出现死锁.这是一个错误吗? - 但这篇文章的标题表明问题与信号量的使用有关 - 这有点分散了讨论的注意力.我创建这个是为了强调嵌套循环可能存在性能问题——尽管这两个问题可能有一个共同的原因(也许是因为我花了很多时间来解决这个问题).(我不认为它是重复的,因为它强调了另一种症状 - 但如果你确实删除它).

Note: I already addressed this problem in another SO post - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? -, but the title of this post suggested that the problem is related to the use of a semaphore - which somewhat distracted the discussion. I am creating this one to stress that nested loops might have a performance issue - although both problems have likely a common cause (and maybe because it took me a lot of time to figure out this problem). (I don't see it as a duplicate, because it is stressing another symptom - but if you do just delete it).

问题:如果嵌套两个Java 8 stream.parallel().forEach 循环并且所有任务都是独立的、无状态的等等——除了提交到公共FJ池——那么嵌套并行循环内的并行循环的性能比将顺序循环嵌套在并行循环内要差得多.更糟糕的是:如果包含内部循环的操作是同步的,你会得到一个死锁.

Problem: If you nest two Java 8 stream.parallel().forEach loops and all tasks are independent, stateless, etc. - except for being submitted to the common FJ pool -, then nesting a parallel loop inside a parallel loop performs much poorer than nesting a sequential loop inside a parallel loop. Even worse: If the operation containing the inner loop is synchronized, your will get a DEADLOCK.

性能问题演示

没有同步",您仍然可以观察到性能问题.您可以在以下位置找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java(有关更详细的说明,请参阅那里的 JavaDoc).

Without the 'synchronized' you can still observe a performance problem. You find a demo code for this at: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (see the JavaDoc there for a more detailed description).

我们这里的设置如下:我们有一个嵌套的 stream.parallel().forEach().

Our setup here is as follows: We have a nested stream.parallel().forEach().

  • 内部循环是独立的(无状态、无干扰等 - 使用公共池除外)并且在最坏的情况下总共消耗 1 秒,即如果按顺序处理.
  • 外循环的一半任务在该循环前 10 秒消耗.
  • 在该循环后 10 秒消耗一半.
  • 因此每个线程总共消耗 11 秒(最坏情况).* 我们有一个布尔值,它允许将内部循环从并行()切换到顺序().

现在:将 24 个外循环任务提交到并行度为 8 的池中,我们预计最多 24/8 * 11 = 33 秒(在 8 核或更好的机器上).

Now: submitting 24 outer-loop-tasks to a pool with parallelism 8 we would expect 24/8 * 11 = 33 seconds at best (on an 8 core or better machine).

结果是:

  • 使用内部顺序循环:33 秒.
  • 使用内部并行循环:>80 秒(我有 92 秒).

问题:您能确认一下这种行为吗?这是人们对框架的期望吗?(我现在更加小心了,声称这是一个错误,但我个人认为这是由于 ForkJoinTask 的实现中的错误造成的.备注:我已将此发布到并发兴趣(请参阅 http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ),但到目前为止我还没有从那里得到确认).

Question: Can you confirm this behavior? Is this something one would expect from the framework? (I am a bit more careful now with a claim that this is a bug, but I personally believe that it is due to a bug in the implementation of ForkJoinTask. Remark: I have posted this to concurrency-interest (see http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), but so far I did not get confirmation from there).

僵局演示

下面的代码会死锁

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

其中 numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000doWork 是一些无状态的 CPU 刻录机.

where numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000 and doWork is some stateless CPU burner.

您可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java(有关更详细的说明,请参阅那里的 JavaDoc).

You find a complete demo code at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (see the JavaDoc there for a more detailed description).

这是预期的行为吗?请注意,有关 Java 并行流的文档没有提到任何嵌套或同步问题.此外,没有提到两者都使用共同的分叉连接池这一事实.

Is this behavior expected? Note that the documentation on Java parallel streams does not mention any issue with nesting or synchronization. Also, the fact that both use a common fork-join-pool is not mentioned.

更新

另一个关于性能问题的测试可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java- 这个测试没有任何阻塞操作(没有 Thread.sleep 和不同步).我在这里编译了更多注释:http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Another test on the performance issue can be found at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - this test come without any blocking operation (no Thread.sleep and not synchronized). I compiled some more remarks here: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新 2

似乎这个问题和更严重的信号量死锁已经在 J​​ava8 u40 中得到修复.

It appears as if this problem and the more severe DEADLOCK with semaphores has been fixed in Java8 u40.

推荐答案

问题是你配置的相当有限的并行度被外部流处理吃掉了:如果你说你想要八个线程并处理一个流parallel() 超过八个项目,它将创建八个工作线程并让它们处理项目.

The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel() it will create eight worker threads and let them process items.

然后在您的消费者中,您正在使用 parallel() 处理另一个流,但没有剩余的工作线程.由于工作线程在等待内部流处理结束时被阻塞,因此 ForkJoinPool 必须创建违反您配置的并行度的新工作线程.在我看来,它不会回收这些扩展线程,而是让它们在处理后立即死亡.因此,在您的内部处理中,会创建和处理新线程,这是一项昂贵的操作.

Then within your consumer you are processing another stream using parallel() but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.

您可能会将其视为一个缺陷,即启动线程不参与并行流处理的计算,而只是等待结果,但即使已修复,您仍然会遇到一个很难解决的一般问题(如果有的话)修复:

You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:

每当工作线程与外部流项目的数量之间的比率较低时,实现会将它们全部用于外部流,因为它不知道流是外部流.因此,并行执行内部流请求的工作线程比可用的多.使用调用者线程参与计算可以修复它,使其性能等于串行计算,但在这里获得并行执行的优势不适用于固定数量的工作线程的概念.

Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.

请注意,您在这里只是触及了这个问题的表面,因为您对项目的处理时间相当平衡.如果内部项和外部项的处理都出现分歧(与同一级别的项相比),问题将更加严重.

Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.

更新:通过分析和查看代码,似乎 ForkJoinPool 确实 尝试使用等待线程进行工作窃取",但根据具体情况使用不同的代码Thread 是工作线程还是其他线程.结果,一个工作线程实际上大约有 80% 的时间在等待,并且几乎没有做任何工作,而其他线程确实对计算做出了贡献……

Update: by profiling and looking at the code it seems that the ForkJoinPool does attempts to use the waiting thread for "work stealing" but using different code depending on the fact whether the Thread is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…

更新 2:为了完整起见,这里是注释中描述的简单并行执行方法.由于它将每个项目排入队列,因此当单个项目的执行时间相当短时,预计会有很多开销.所以这不是一个复杂的解决方案,而是一个演示,它可以在没有太多魔法的情况下处理长时间运行的任务……

Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

相关文章