使用 reduce 和 collect 求平均值

我正在尝试了解新的 Java 8 Stream API.

I am trying to understand the new Java 8 Stream APIs.

http://docs.oracle.com/javase/tutorial/集合/流/reduction.html

我找到了使用 collect API 查找数字平均值的示例.但我觉得,同样可以使用 reduce() 来完成.

I found the example of finding average of numbers using collect API. But I felt that, the same can be done using reduce() also.

public class Test {

    public static void main(String[] args) {
        // Using collect
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .collect(Averager::new, Averager::accept, Averager::combine)
            .average());

        // Using reduce
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .reduce(new Averager(), (t, u) -> {
                t.accept(u);
                return t;
            }, (t, u) -> {
                t.combine(u);
                return t;
            }).average());
    }

    private static class Averager {
        private int total = 0;
        private int count = 0;

        public Averager() {
            // System.out.println("Creating averager");
        }

        public double average() {
            // System.out.println("Finding average");
            return count > 0 ? ((double) total) / count : 0;
        }

        public void accept(int i) {
            // System.out.println("Accepting " + i);
            total += i;
            count++;
        }

        public void combine(Averager other) {
            // System.out.println("Combining the averager : " + other);
            total += other.total;
            count += other.count;
        }

        @Override
        public String toString() {
            return "[total : " + total + ", count: " + count + "]";
        }
    }
}

1) 有什么理由我应该在这里使用 collect 而不是 reduce 吗?
2)如果我启用所有调试系统输出,我可以看到在收集和减少之间执行的操作完全相同.在这两种情况下,组合器都没有被使用.
3)如果我让流并行,收集总是返回正确的结果.reduce() 每次都给我不同的结果.
4) 我不应该在并行流中使用reduce吗?

1) Is there any reason, that I should use collect instead of reduce here?
2) If I enable all the debug sysouts, I can see that the operations perfomed are exactly the same between, collect and reduce. And the combiner was not being used at all, in both cases.
3) If I make the streams parallel, the collect is always returning me correct result. The reduce() is giving me different results each time.
4) Should I not use reduce, in parallel streams?

谢谢,
保罗

推荐答案

reducecollect的区别在于collect是增强的可以并行处理可变对象的归约形式.collect 算法线程限制了各种结果对象,因此即使它们不是线程安全的,它们也可以安全地进行变异.这就是 Averager 使用 collect 工作的原因.对于使用 reduce 的顺序计算,这通常无关紧要,但对于并行计算,它会给出不正确的结果,正如您所观察到的.

The difference between reduce and collect is that collect is an enhanced form of reduction that can deal with mutable objects in parallel. The collect algorithm thread-confines the various result objects, so that they can be mutated safely, even if they aren't thread-safe. That's why Averager works using collect. For sequential computation using reduce this doesn't usually matter, but for parallel computation it will give incorrect results, as you observed.

关键点是 reduce 只要处理 values 而不是可变对象就可以工作.您可以通过查看 reduce 的第一个参数来了解这一点.示例代码传递了 new Averager(),它是一个 单个对象,在并行归约中被多个线程用作标识值.并行流的工作方式是将工作负载分成由各个线程处理的段.如果多个线程正在改变同一个(非线程安全的)对象,那么应该清楚为什么这会导致错误的结果.

A key point is that reduce works as long as it is dealing with values but not mutable objects. You can see this by looking at the first argument to reduce. The example code passes new Averager() which is a single object that's used as the identity value by multiple threads in the parallel reduction. The way parallel streams work is that the workload is split into segments that are processed by individual threads. If multiple threads are mutating the same (non-thread-safe) object, it should be clear why this will lead to incorrect results.

可以使用 reduce 来计算平均值,但您需要使您的累积对象是不可变的.考虑一个对象 ImmutableAverager:

It is possible to use reduce to compute an average, but you need to make your accumulation object be immutable. Consider an object ImmutableAverager:

static class ImmutableAverager {
    private final int total;
    private final int count;

    public ImmutableAverager() {
        this.total = 0;
        this.count = 0;
    }
    
    public ImmutableAverager(int total, int count) {
        this.total = total;
        this.count = count;
    }

    public double average() {
        return count > 0 ? ((double) total) / count : 0;
    }

    public ImmutableAverager accept(int i) {
        return new ImmutableAverager(total + i, count + 1);
    }

    public ImmutableAverager combine(ImmutableAverager other) {
        return new ImmutableAverager(total + other.total, count + other.count);
    }
}

请注意,我已调整 acceptcombine 的签名以返回新的 ImmutableAverager 而不是变异 this.(这些更改还使方法与 reduce 的函数参数相匹配,因此我们可以使用方法引用.)您可以像这样使用 ImmutableAverager:

Note that I've adjusted the signatures of accept and combine to return a new ImmutableAverager instead of mutating this. (These changes also make the methods match the function arguments to reduce so we can use method references.) You'd use ImmutableAverager like this:

    double average = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .parallel()
            .reduce(new ImmutableAverager(), 
                    ImmutableAverager::accept,
                    ImmutableAverager::combine)
            .average();
    System.out.println("Average: "+average);

将不可变值对象与 reduce 一起使用应该可以并行得出正确的结果.

Using immutable value objects with reduce should give the correct results in parallel.

最后,请注意 IntStreamDoubleStreamsummaryStatistics() 方法,而 CollectorsaveragingDoubleaveragingIntaveragingLong 方法可以为您完成这些计算.但是,我认为问题更多是关于收集和归约的机制,而不是关于如何最简洁地进行平均.

Finally, note that IntStream and DoubleStream have summaryStatistics() methods and Collectors has averagingDouble, averagingInt, and averagingLong methods that can do these computations for you. However, I think the question is more about the mechanics of collection and reduction than about how to do averaging most concisely.

相关文章