使用流和减少消费者链的订单保证

2022-01-22 00:00:00 reduce java-8 java java-stream consumer

因此,在当前场景中,我们有一组 API,如下所列:

So as it goes in the current scenario, we have a set of APIs as listed below:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();

在这些之上,我们的调度程序之一执行任务,例如

Over these, one of our schedulers performs the tasks e.g.

private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}

在查看此内容时,我想到了一个更灵活的实现1来执行如下所示的任务:

While reviewing this, I thought of moving to a more flexible implementation 1 of performing tasks which would look like:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}

<小时>

我现在想到的一点是,Javadoc 明确指出


The point that strikes my mind now is that the Javadoc clearly states that

accumulator - 一个关联的、无干扰的、无状态的函数结合两个值

accumulator - an associative, non-interfering, stateless function for combining two values

接下来我在想如何确保处理顺序在java8流中?要有序(处理顺序与遇到顺序相同)!

Next up I was thinking How to ensure order of processing in java8 streams? to be ordered (processing order to be same as encounter order)!

好的,从 List 生成的流将是 ordered 并且除非流在 reduceparallel代码>以下实现将起作用.2

Okay, the stream generated out of a List would be ordered and unless the stream is made parallel before reduce the following implementation shall work. 2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}

问.这个假设 2 成立吗?是否可以保证始终按照原始代码的顺序执行消费者?

Q. Does this assumption 2 hold true? Would it be guaranteed to always execute the consumers in the order that the original code had them?

问.是否有可能以某种方式将 1 暴露给被调用者以执行任务?

Q. Is there a possibility somehow to expose 1 as well to the callees to perform tasks?

推荐答案

正如 Andreas 指出的,Consumer::andThen 是一个关联函数,虽然生成的消费者可能具有不同的内部结构,但它仍然是等价的.

As Andreas pointed out, Consumer::andThen is an associative function and while the resulting consumer may have a different internal structure, it is still equivalent.

但是让我们调试一下

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('
');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, 'u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, 'u251c');
            sb.setCharAt(myHandle+1, 'u2500');
        }
        return sb;
    }
}

将打印

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

而将缩减代码更改为

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}

在我的机器上打印

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│   ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│   └─combined
│     ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│     └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
  ├─combined
  │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
  │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
  └─combined
    ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
    └─combined
      ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
      └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

说明了安德烈亚斯回答的重点,但也强调了一个完全不同的问题.您可以通过使用来最大化它,例如示例代码中的 IntStream.range(0, 100).

illustrating the point of Andreas’ answer, but also highlighting an entirely different problem. You may max it out by using, e.g. IntStream.range(0, 100) in the example code.

并行评估的结果实际上比顺序评估要好,因为顺序评估会创建不平衡的树.当接受任意的消费者流时,这可能是一个实际的性能问题,甚至在尝试评估结果消费者时会导致 StackOverflowError.

The result of the parallel evaluation is actually better than the sequential evaluation, as the sequential evaluation creates an unbalanced tree. When accepting an arbitrary stream of consumers, this can be an actual performance issue or even lead to a StackOverflowError when trying to evaluate the resulting consumer.

对于任何数量不小的消费者,您实际上需要一个平衡的消费者树,但为此使用并行流不是正确的解决方案,因为 a) Consumer::andThen 是一个廉价的操作并行评估没有真正的好处,并且 b) 平衡将取决于不相关的属性,例如流源的性质和 CPU 内核的数量,它们决定了减少何时回落到顺序算法.

For any nontrivial number of consumers, you actually want a balanced consumer tree, but using a parallel stream for that is not the right solution, as a) Consumer::andThen is a cheap operation with no real benefit from parallel evaluation and b) the balancing would depend on unrelated properties, like the nature of the stream source and the number of CPU cores, which determine when the reduction falls back to the sequential algorithm.

当然,最简单的解决方案是

Of course, the simplest solution would be

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}

但是当你想构造一个复合的Consumer进行复用时,你可以使用

But when you want to construct a compound Consumer for re-using, you may use

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}

当消费者数量超过阈值时,代码将仅使用循环提供单个 Consumer.对于大量消费者而言,这是最简单、最有效的解决方案,事实上,您可以针对较小的消费者放弃所有其他方法,但仍能获得合理的性能……

The code will provide a single Consumer just using a loop when the number of consumers exceeds a threshold. This is the simplest and most efficient solution for a larger number of consumers and in fact, you could drop all other approaches for the smaller numbers and still get a reasonable performance…

请注意,这仍然不会妨碍消费者流的并行处理,如果他们的构造真的从中受益的话.

Note that this still doesn’t hinder parallel processing of the stream of consumers, if their construction really benefits from it.

相关文章