Java 8 流串行与并行性能

2022-01-22 00:00:00 performance java-8 java java-stream

在我的机器上,下面的程序打印:

On my machine, the program below prints:

OptionalLong[134043]
 PARALLEL took 127869 ms
OptionalLong[134043]
 SERIAL took 60594 ms

我不清楚为什么串行执行程序比并行执行要快.我在一个相对安静的 8gb 盒子上给了两个程序 -Xms2g -Xmx2g.有人可以澄清发生了什么吗?

It's not clear to my why executing the program in serial is faster than executing it in parallel. I've given both programs -Xms2g -Xmx2g on an 8gb box thats relatively quiet. Can someone clarify whats going on?

import java.util.stream.LongStream;
import java.util.stream.LongStream.Builder;

public class Problem47 {

    public static void main(String[] args) {

        final long startTime = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).parallel().limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime = System.currentTimeMillis();
        System.out.println(" PARALLEL took " +(endTime - startTime) + " ms");

        final long startTime2 = System.currentTimeMillis();
        System.out.println(LongStream.iterate(1, n -> n + 1).limit(1000000).filter(n -> fourConsecutives(n)).findFirst());
        final long endTime2 = System.currentTimeMillis();
        System.out.println(" SERIAL took " +(endTime2 - startTime2) + " ms");
    }

    static boolean fourConsecutives(final long n) {
        return distinctPrimeFactors(n).count() == 4 &&
                distinctPrimeFactors(n + 1).count() == 4 &&
                distinctPrimeFactors(n + 2).count() == 4 &&
                distinctPrimeFactors(n + 3).count() == 4;
    }

    static LongStream distinctPrimeFactors(long number) {
        final Builder builder = LongStream.builder();
        final long limit = number / 2;
        long n = number;
        for (long i = 2; i <= limit; i++) {
            while (n % i == 0) {
                builder.accept(i);
                n /= i;
            }
        }
        return builder.build().distinct();
    }

}

推荐答案

虽然 Brian Goetz 对您的设置是正确的,例如你应该使用 .range(1, 1000000) 而不是 .iterate(1, n -> n + 1).limit(1000000) 并且你的基准测试方法很简单,我想强调一下重点:

While Brian Goetz is right about your setup, e.g. that you should use .range(1, 1000000) rather than .iterate(1, n -> n + 1).limit(1000000) and that your benchmark method is very simplistic, I want to emphasize the important point:

即使在解决了这些问题之后,即使使用挂钟和任务管理器,您也可以看到有问题.在我的机器上,该操作大约需要半分钟,您可以看到并行度在大约两秒后下降到单核.即使一个专门的基准测试工具可以产生不同的结果也没有关系,除非你想一直在一个基准测试工具中运行你的最终应用程序......

even after fixing these issues, even using a wall clock and the TaskManager you can see that there’s something wrong. On my machine the operation takes about half a minute and you can see that the parallelism drops to single core after about two seconds. Even if a specialized benchmark tool could produce different results it wouldn’t matter unless you want to run your final application within a benchmark tool all the time…

现在我们可以尝试更多地模拟您的设置,或者告诉您应该学习有关 Fork/Join 框架的特殊知识,例如 实现者在讨论列表中做了什么.

Now we could try to mock more about your setup or tell you that you should learn special things about the Fork/Join framework like the implementors did on the discussion list.

或者我们尝试另一种实现方式:

Or we try an alternative implementation:

ExecutorService es=Executors.newFixedThreadPool(
                       Runtime.getRuntime().availableProcessors());
AtomicLong found=new AtomicLong(Long.MAX_VALUE);
LongStream.range(1, 1000000).filter(n -> found.get()==Long.MAX_VALUE)
    .forEach(n -> es.submit(()->{
        if(found.get()>n && fourConsecutives(n)) for(;;) {
            long x=found.get();
            if(x<n || found.compareAndSet(x, n)) break;
        }
    }));
es.shutdown();
try { es.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); }
catch (InterruptedException ex) {throw new AssertionError(ex); }
long result=found.get();
System.out.println(result==Long.MAX_VALUE? "not found": result);

在我的机器上,它完成了我对并行执行的预期,只花费了略多于 ⟨sequential time⟩/⟨number of cpu cores⟩.无需更改 fourConsecutives 实现中的任何内容.

On my machine it does what I would expect from parallel execution taking only slightly more than ⟨sequential time⟩/⟨number of cpu cores⟩. Without changing anything in your fourConsecutives implementation.

底线是,至少在处理单个项目需要大量时间时,当前的 Stream 实现(或底层 Fork/Join 框架)存在 已经在这个相关问题中讨论过.如果你想要可靠的并行性,我建议使用经过验证和测试的 ExecutorServices.正如您在我的示例中看到的那样,这并不意味着放弃 Java 8 功能,它们很好地结合在一起.只有 Stream.parallel 引入的自动并行性应该小心使用(鉴于当前的实现).

The bottom line is that, at least when processing a single item takes significant time, the current Stream implementation (or the underlying Fork/Join framework) has problems as already discussed in this related question. If you want reliable parallelism I would recommend to use proved and tested ExecutorServices. As you can see in my example, it does not mean to drop the Java 8 features, they fit together well. Only the automated parallelism introduced with Stream.parallel should be used with care (given the current implementation).

相关文章