如何在没有 isFinite() 和 isOrdered() 方法的情况下安全地使用 Java Streams?

2022-01-22 00:00:00 java java-stream

There is the question on whether java methods should return Collections or Streams, in which Brian Goetz answers that even for finite sequences, Streams should usually be preferred.

But it seems to me that currently many operations on Streams that come from other places cannot be safely performed, and defensive code guards are not possible because Streams do not reveal if they are infinite or unordered.

If parallel was a problem to the operations I want to perform on a Stream(), I can call isParallel() to check or sequential to make sure computation is in parallel (if i remember to).

But if orderedness or finity(sizedness) was relevant to the safety of my program, I cannot write safeguards.

Assuming I consume a library implementing this fictitious interface:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // finite, ordered, sequential
    // IntStream.range(0, 100).boxed()
    // final AtomicInteger atomic = new AtomicInteger();
    
    // // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()) 

    // infinite, unordered, parallel
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel()
    
    // finite, ordered, sequential, should-be-closed
    // Files.lines(Path.path("coordinates.txt")).map(Integer::parseInt)
}

Then what operations can I safely call on this stream to write a correct algorithm?

It seems if I maybe want to do write the elements to a file as a side-effect, I need to be concerned about the stream being parallel:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

And also if it is parallel, based on what Threadpool is it parallel?

If I want to sort the stream (or other non-short-circuit operations), I somehow need to be cautious about it being infinite:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

I can impose a limit before sorting, but which magic number should that be, if I expect a finite stream of unknown size?

Finally maybe I want to compute in parallel to save time and then collect the result:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

But if the stream is not ordered (in that version of the library), then the result might become mangled due to the parallel processing. But how can I guard against this, other than not using parallel (which defeats the performance purpose)?

Collections are explicit about being finite or infinite, about having an order or not, and they do not carry the processing mode or threadpools with them. Those seem like valuable properties for APIs.

Additionally, Streams may sometimes need to be closed, but most commonly not. If I consume a stream from a method (of from a method parameter), should I generally call close?

Also, streams might already have been consumed, and it would be good to be able to handle that case gracefully, so it would be good to check if the stream has already been consumed;

I would wish for some code snippet that can be used to validate assumptions about a stream before processing it, like>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

解决方案

After looking at things a bit (some experimentation and here) as far as I see, there is no way to know definitely whether a stream is finite or not.

More than that, sometimes even it is not determined except at runtime (such as in java 11 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

What you can do is:

  1. You can find out with certainty if it is finite, in a few ways (notice that receiving false on these does not mean it is infinite, only that it may be so):

    1. stream.spliterator().getExactSizeIfKnown() - if this has an known exact size, it is finite, otherwise it will return -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - if it is SIZED will return true.

  2. You can safe-guard yourself, by assuming the worst (depends on your case).

    1. stream.sequential()/stream.parallel() - explicitly set your preferred consumption type.
    2. With potentially infinite stream, assume your worst case on each scenario.

      1. For example assume you want listen to a stream of tweets until you find one by Venkat - it is a potentially infinite operation, but you'd like to wait until such a tweet is found. So in this case, simply go for stream.filter(tweet -> isByVenkat(tweet)).findAny() - it will iterate until such a tweet comes along (or forever).
      2. A different scenario, and probably the more common one, is wanting to do something on all the elements, or only to try a certain amount of time (similar to timeout). For this, I'd recommend always calling stream.limit(x) before calling your operation (collect or allMatch or similar) where x is the amount of tries you're willing to tolerate.

After all this, I'll just mention that I think returning a stream is generally not a good idea, and I'd try to avoid it unless there are large benefits.

相关文章