使用 reduce 和 collect 求平均值
我正在尝试了解新的 Java 8 Stream API.
I am trying to understand the new Java 8 Stream APIs.
http://docs.oracle.com/javase/tutorial/集合/流/reduction.html
我找到了使用 collect API 查找数字平均值的示例.但我觉得,同样可以使用 reduce() 来完成.
I found the example of finding average of numbers using collect API. But I felt that, the same can be done using reduce() also.
public class Test {
public static void main(String[] args) {
// Using collect
System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collect(Averager::new, Averager::accept, Averager::combine)
.average());
// Using reduce
System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.reduce(new Averager(), (t, u) -> {
t.accept(u);
return t;
}, (t, u) -> {
t.combine(u);
return t;
}).average());
}
private static class Averager {
private int total = 0;
private int count = 0;
public Averager() {
// System.out.println("Creating averager");
}
public double average() {
// System.out.println("Finding average");
return count > 0 ? ((double) total) / count : 0;
}
public void accept(int i) {
// System.out.println("Accepting " + i);
total += i;
count++;
}
public void combine(Averager other) {
// System.out.println("Combining the averager : " + other);
total += other.total;
count += other.count;
}
@Override
public String toString() {
return "[total : " + total + ", count: " + count + "]";
}
}
}
1) 有什么理由我应该在这里使用 collect 而不是 reduce 吗?
2)如果我启用所有调试系统输出,我可以看到在收集和减少之间执行的操作完全相同.在这两种情况下,组合器都没有被使用.
3)如果我让流并行,收集总是返回正确的结果.reduce() 每次都给我不同的结果.
4) 我不应该在并行流中使用reduce吗?
1) Is there any reason, that I should use collect instead of reduce here?
2) If I enable all the debug sysouts, I can see that the operations perfomed are exactly the same between, collect and reduce. And the combiner was not being used at all, in both cases.
3) If I make the streams parallel, the collect is always returning me correct result. The reduce() is giving me different results each time.
4) Should I not use reduce, in parallel streams?
谢谢,
保罗
推荐答案
reduce
和collect
的区别在于collect
是增强的可以并行处理可变对象的归约形式.collect
算法线程限制了各种结果对象,因此即使它们不是线程安全的,它们也可以安全地进行变异.这就是 Averager
使用 collect
工作的原因.对于使用 reduce
的顺序计算,这通常无关紧要,但对于并行计算,它会给出不正确的结果,正如您所观察到的.
The difference between reduce
and collect
is that collect
is an enhanced form of reduction that can deal with mutable objects in parallel. The collect
algorithm thread-confines the various result objects, so that they can be mutated safely, even if they aren't thread-safe. That's why Averager
works using collect
. For sequential computation using reduce
this doesn't usually matter, but for parallel computation it will give incorrect results, as you observed.
关键点是 reduce
只要处理 values 而不是可变对象就可以工作.您可以通过查看 reduce
的第一个参数来了解这一点.示例代码传递了 new Averager()
,它是一个 单个对象,在并行归约中被多个线程用作标识值.并行流的工作方式是将工作负载分成由各个线程处理的段.如果多个线程正在改变同一个(非线程安全的)对象,那么应该清楚为什么这会导致错误的结果.
A key point is that reduce
works as long as it is dealing with values but not mutable objects. You can see this by looking at the first argument to reduce
. The example code passes new Averager()
which is a single object that's used as the identity value by multiple threads in the parallel reduction. The way parallel streams work is that the workload is split into segments that are processed by individual threads. If multiple threads are mutating the same (non-thread-safe) object, it should be clear why this will lead to incorrect results.
可以使用 reduce
来计算平均值,但您需要使您的累积对象是不可变的.考虑一个对象 ImmutableAverager
:
It is possible to use reduce
to compute an average, but you need to make your accumulation object be immutable. Consider an object ImmutableAverager
:
static class ImmutableAverager {
private final int total;
private final int count;
public ImmutableAverager() {
this.total = 0;
this.count = 0;
}
public ImmutableAverager(int total, int count) {
this.total = total;
this.count = count;
}
public double average() {
return count > 0 ? ((double) total) / count : 0;
}
public ImmutableAverager accept(int i) {
return new ImmutableAverager(total + i, count + 1);
}
public ImmutableAverager combine(ImmutableAverager other) {
return new ImmutableAverager(total + other.total, count + other.count);
}
}
请注意,我已调整 accept
和 combine
的签名以返回新的 ImmutableAverager
而不是变异 this代码>.(这些更改还使方法与
reduce
的函数参数相匹配,因此我们可以使用方法引用.)您可以像这样使用 ImmutableAverager
:
Note that I've adjusted the signatures of accept
and combine
to return a new ImmutableAverager
instead of mutating this
. (These changes also make the methods match the function arguments to reduce
so we can use method references.) You'd use ImmutableAverager
like this:
double average = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.parallel()
.reduce(new ImmutableAverager(),
ImmutableAverager::accept,
ImmutableAverager::combine)
.average();
System.out.println("Average: "+average);
将不可变值对象与 reduce
一起使用应该可以并行得出正确的结果.
Using immutable value objects with reduce
should give the correct results in parallel.
最后,请注意 IntStream
和 DoubleStream
有 summaryStatistics()
方法,而 Collectors
有 averagingDouble
、averagingInt
和 averagingLong
方法可以为您完成这些计算.但是,我认为问题更多是关于收集和归约的机制,而不是关于如何最简洁地进行平均.
Finally, note that IntStream
and DoubleStream
have summaryStatistics()
methods and Collectors
has averagingDouble
, averagingInt
, and averagingLong
methods that can do these computations for you. However, I think the question is more about the mechanics of collection and reduction than about how to do averaging most concisely.
相关文章