在 Java 中,如何高效、优雅地流式传输树节点的后代?

2022-01-22 00:00:00 algorithm java-8 java java-stream

假设我们有一个由唯一的 String 标识的对象集合,以及一个定义它们的层次结构的类 Tree.该类是使用 Map 从节点(由它们的 ID 表示)到它们各自子 ID 的 Collection 实现的.

Assume we have a collection of objects that are identified by unique Strings, along with a class Tree that defines a hierarchy on them. That class is implemented using a Map from nodes (represented by their IDs) to Collections of their respective children's IDs.

class Tree {
  private Map<String, Collection<String>> edges;

  // ...

  public Stream<String> descendants(String node) {
    // To be defined.
  }
}

我想启用流式传输节点的后代.一个简单的解决方案是:

I would like to enable streaming a node's descendants. A simple solution is this:

private Stream<String> children(String node) {
    return edges.getOrDefault(node, Collections.emptyList()).stream();
}

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        children(node).flatMap(this::descendants)
    );
}

在继续之前,我想对这个解决方案做以下断言.(我对这些正确吗?)

Before continuing, I would like to make the following assertions about this solution. (Am I correct about these?)

  1. 遍历从 descendants 返回的 Stream 会消耗资源(时间和内存)——相对于树的大小——与手的复杂性顺序相同-对递归进行编码.特别是,代表迭代状态的中间对象(Streams,Spliterators,...)形成一个堆栈,因此任何给定时间的内存需求都在复杂度与树的深度相同.

  1. Walking the Stream returned from descendants consumes resources (time and memory) – relative to the size of the tree – in the same order of complexity as hand-coding the recursion would. In particular, the intermediate objects representing the iteration state (Streams, Spliterators, ...) form a stack and therefore the memory requirement at any given time is in the same order of complexity as the tree's depth.

据我了解 this,只要我对 Stream<执行终止操作/code> 从 descendants 返回,对 flatMap 的根级调用将导致所有包含的 Stream - 每个(递归)调用一个descendants – 立即实现.因此,生成的 Stream 仅在第一级递归上是惰性的,但不会超出.(根据Tagir Valeevs 回答编辑.)

As I understand this, as soon as I perform a terminating operation on the Stream returned from descendants, the root-level call to flatMap will cause all contained Streams – one for each (recursive) call to descendants – to be realized immediately. Thus, the resulting Stream is only lazy on the first level of recursion, but not beyond. (Edited according to Tagir Valeevs answer.)

如果我正确理解了这些要点,我的问题是:如何定义 descendants 以便生成的 Stream 是惰性的?

If I understood these points correctly, my question is this: How can I define descendants so that the resulting Stream is lazy?

我希望解决方案尽可能优雅,因为我更喜欢隐式迭代状态的解决方案.(澄清我的意思:我知道我可以编写一个遍历树的 Spliterator,同时在每个级别上维护一个显式的 Spliterator 堆栈.我想避免这种情况.)

I would like the solution to be as elegant as possible, in the sense that I prefer a solution which leaves the iteration state implicit. (To clarify what I mean by that: I know that I could write a Spliterator that walks the tree while maintaining an explicit stack of Spliterators on each level. I would like to avoid that.)

(Java 中是否有一种方法可以将其表述为生产者-消费者工作流,就像在 Julia 和 Go 等语言中使用的那样?)

(Is there possibly a way in Java to formulate this as a producer-consumer workflow, like one could use in languages like Julia and Go?)

推荐答案

对我来说,你的解决方案已经尽可能优雅了,它的有限懒惰不是你的错.最简单的解决方案是等到 JRE 开发人员修复它.已经用 Java 10 完成了.

To me, your solution is already as elegant as possible and the limited laziness of it not your fault. The simplest solution is to wait until it gets fixed by the JRE developers. It has been done with Java 10.

但是,如果今天实施的这种有限的惰性确实是一个问题,那么也许是时候以一般方式解决这个问题了.嗯,它是关于实现一个Spliterator,但不是特定于你的任务.相反,它是 flatmap 操作的重新实现,服务于原始实现的有限惰性很重要的所有情况:

However, if this limited laziness of today’s implementation really is a concern, it’s perhaps time to solve this in a general way. Well, it is about implementing a Spliterator, but not specific to your task. Instead, it’s a re-implementation of the flatmap operation serving all cases where the limited laziness of the original implementation matters:

public class FlatMappingSpliterator<E,S> extends Spliterators.AbstractSpliterator<E>
implements Consumer<S> {

    static final boolean USE_ORIGINAL_IMPL
        = Boolean.getBoolean("stream.flatmap.usestandard");

    public static <T,R> Stream<R> flatMap(
        Stream<T> in, Function<? super T,? extends Stream<? extends R>> mapper) {

        if(USE_ORIGINAL_IMPL)
            return in.flatMap(mapper);

        Objects.requireNonNull(in);
        Objects.requireNonNull(mapper);
        return StreamSupport.stream(
            new FlatMappingSpliterator<>(sp(in), mapper), in.isParallel()
        ).onClose(in::close);
    }

    final Spliterator<S> src;
    final Function<? super S, ? extends Stream<? extends E>> f;
    Stream<? extends E> currStream;
    Spliterator<E> curr;

    private FlatMappingSpliterator(
        Spliterator<S> src, Function<? super S, ? extends Stream<? extends E>> f) {
        // actually, the mapping function can change the size to anything,
        // but it seems, with the current stream implementation, we are
        // better off with an estimate being wrong by magnitudes than with
        // reporting unknown size
        super(src.estimateSize()+100, src.characteristics()&ORDERED);
        this.src = src;
        this.f = f;
    }

    private void closeCurr() {
        try { currStream.close(); } finally { currStream=null; curr=null; }
    }

    public void accept(S s) {
        curr=sp(currStream=f.apply(s));
    }

    @Override
    public boolean tryAdvance(Consumer<? super E> action) {
        do {
            if(curr!=null) {
                if(curr.tryAdvance(action))
                    return true;
                closeCurr();
            }
        } while(src.tryAdvance(this));
        return false;
    }

    @Override
    public void forEachRemaining(Consumer<? super E> action) {
        if(curr!=null) {
            curr.forEachRemaining(action);
            closeCurr();
        }
        src.forEachRemaining(s->{
            try(Stream<? extends E> str=f.apply(s)) {
                if(str!=null) str.spliterator().forEachRemaining(action);
            }
        });
    }

    @SuppressWarnings("unchecked")
    private static <X> Spliterator<X> sp(Stream<? extends X> str) {
        return str!=null? ((Stream<X>)str).spliterator(): null;
    }

    @Override
    public Spliterator<E> trySplit() {
        Spliterator<S> split = src.trySplit();
        if(split==null) {
            Spliterator<E> prefix = curr;
            while(prefix==null && src.tryAdvance(s->curr=sp(f.apply(s))))
                prefix=curr;
            curr=null;
            return prefix;
        }
        FlatMappingSpliterator<E,S> prefix=new FlatMappingSpliterator<>(split, f);
        if(curr!=null) {
            prefix.curr=curr;
            curr=null;
        }
        return prefix;
    }
}

使用它所需要做的就是在代码中添加 flatMap 方法的 import static 并更改 stream.flatmap(函数)flatmap(stream, function).

All you need for using it, is to add a import static of the flatMap method to your code and change expressions of the form stream.flatmap(function) to flatmap(stream, function).

即在你的代码中

public Stream<String> descendants(String node) {
    return Stream.concat(
        Stream.of(node),
        flatMap(children(node), this::descendants)
    );
}

那么你就有了完全的懒惰行为.即使使用无限流,我也对其进行了测试......

then you have full lazy behavior. I tested it even with infinite streams…

请注意,我添加了一个切换按钮以允许返回到原始实现,例如在命令行上指定 -Dstream.flatmap.usestandard=true 时.

Note that I added a toggle to allow turning back to the original implementation, e.g. when specifying    -Dstream.flatmap.usestandard=true on the command line.

相关文章