001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.stream;
018
019import java.lang.reflect.Array;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Set;
025import java.util.function.BiConsumer;
026import java.util.function.BinaryOperator;
027import java.util.function.Consumer;
028import java.util.function.Function;
029import java.util.function.Predicate;
030import java.util.function.Supplier;
031import java.util.stream.Collector;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034
035import org.apache.commons.lang3.function.Failable;
036import org.apache.commons.lang3.function.FailableConsumer;
037import org.apache.commons.lang3.function.FailableFunction;
038import org.apache.commons.lang3.function.FailablePredicate;
039
040/**
041 * Provides utility functions, and classes for working with the
042 * {@code java.util.stream} package, or more generally, with Java 8 lambdas. More
043 * specifically, it attempts to address the fact that lambdas are supposed
044 * not to throw Exceptions, at least not checked Exceptions, AKA instances
045 * of {@link Exception}. This enforces the use of constructs like
046 * <pre>
047 *     Consumer&lt;java.lang.reflect.Method&gt; consumer = m -&gt; {
048 *         try {
049 *             m.invoke(o, args);
050 *         } catch (Throwable t) {
051 *             throw Failable.rethrow(t);
052 *         }
053 *    };
054 *    stream.forEach(consumer);
055 * </pre>
056 * Using a {@link FailableStream}, this can be rewritten as follows:
057 * <pre>
058 *     Streams.failable(stream).forEach((m) -&gt; m.invoke(o, args));
059 * </pre>
060 * Obviously, the second version is much more concise and the spirit of
061 * Lambda expressions is met better than in the first version.
062 *
063 * @see Stream
064 * @see Failable
065 * @since 3.11
066 */
067public class Streams {
068
069    /**
070     * A Collector type for arrays.
071     *
072     * @param <O> The array type.
073     */
074    public static class ArrayCollector<O> implements Collector<O, List<O>, O[]> {
075        private static final Set<Characteristics> characteristics = Collections.emptySet();
076        private final Class<O> elementType;
077
078        /**
079         * Constructs a new instance for the given element type.
080         *
081         * @param elementType The element type.
082         */
083        public ArrayCollector(final Class<O> elementType) {
084            this.elementType = elementType;
085        }
086
087        @Override
088        public BiConsumer<List<O>, O> accumulator() {
089            return List::add;
090        }
091
092        @Override
093        public Set<Characteristics> characteristics() {
094            return characteristics;
095        }
096
097        @Override
098        public BinaryOperator<List<O>> combiner() {
099            return (left, right) -> {
100                left.addAll(right);
101                return left;
102            };
103        }
104
105        @Override
106        public Function<List<O>, O[]> finisher() {
107            return list -> {
108                @SuppressWarnings("unchecked")
109                final O[] array = (O[]) Array.newInstance(elementType, list.size());
110                return list.toArray(array);
111            };
112        }
113
114        @Override
115        public Supplier<List<O>> supplier() {
116            return ArrayList::new;
117        }
118    }
119
120    /**
121     * A reduced, and simplified version of a {@link Stream} with failable method signatures.
122     *
123     * @param <O> The streams element type.
124     */
125    public static class FailableStream<O extends Object> {
126
127        private Stream<O> stream;
128        private boolean terminated;
129
130        /**
131         * Constructs a new instance with the given {@code stream}.
132         *
133         * @param stream The stream.
134         */
135        public FailableStream(final Stream<O> stream) {
136            this.stream = stream;
137        }
138
139        /**
140         * Returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on
141         * all elements if not necessary for determining the result. If the stream is empty then {@code true} is
142         * returned and the predicate is not evaluated.
143         *
144         * <p>
145         * This is a short-circuiting terminal operation.
146         *
147         * Note This method evaluates the <em>universal quantification</em> of the predicate over the elements of
148         * the stream (for all x P(x)). If the stream is empty, the quantification is said to be <em>vacuously
149         * satisfied</em> and is always {@code true} (regardless of P(x)).
150         *
151         * @param predicate A non-interfering, stateless predicate to apply to elements of this stream
152         * @return {@code true} If either all elements of the stream match the provided predicate or the stream is
153         *         empty, otherwise {@code false}.
154         */
155        public boolean allMatch(final FailablePredicate<O, ?> predicate) {
156            assertNotTerminated();
157            return stream().allMatch(Failable.asPredicate(predicate));
158        }
159
160        /**
161         * Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on
162         * all elements if not necessary for determining the result. If the stream is empty then {@code false} is
163         * returned and the predicate is not evaluated.
164         *
165         * <p>
166         * This is a short-circuiting terminal operation.
167         *
168         * Note This method evaluates the <em>existential quantification</em> of the predicate over the elements of
169         * the stream (for some x P(x)).
170         *
171         * @param predicate A non-interfering, stateless predicate to apply to elements of this stream
172         * @return {@code true} if any elements of the stream match the provided predicate, otherwise {@code false}
173         */
174        public boolean anyMatch(final FailablePredicate<O, ?> predicate) {
175            assertNotTerminated();
176            return stream().anyMatch(Failable.asPredicate(predicate));
177        }
178
179        protected void assertNotTerminated() {
180            if (terminated) {
181                throw new IllegalStateException("This stream is already terminated.");
182            }
183        }
184
185        /**
186         * Performs a mutable reduction operation on the elements of this stream using a {@code Collector}. A
187         * {@code Collector} encapsulates the functions used as arguments to
188         * {@link #collect(Supplier, BiConsumer, BiConsumer)}, allowing for reuse of collection strategies and
189         * composition of collect operations such as multiple-level grouping or partitioning.
190         *
191         * <p>
192         * If the underlying stream is parallel, and the {@code Collector} is concurrent, and either the stream is
193         * unordered or the collector is unordered, then a concurrent reduction will be performed (see {@link Collector}
194         * for details on concurrent reduction.)
195         *
196         * <p>
197         * This is a terminal operation.
198         *
199         * <p>
200         * When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to
201         * maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe
202         * data structures (such as {@code ArrayList}), no additional synchronization is needed for a parallel
203         * reduction.
204         *
205         * Note The following will accumulate strings into an ArrayList:
206         *
207         * <pre>
208         *     {@code
209         *     List<String> asList = stringStream.collect(Collectors.toList());
210         * }
211         * </pre>
212         *
213         * <p>
214         * The following will classify {@code Person} objects by city:
215         *
216         * <pre>
217         *     {@code
218         *     Map<String, List<Person>> peopleByCity = personStream.collect(Collectors.groupingBy(Person::getCity));
219         * }
220         * </pre>
221         *
222         * <p>
223         * The following will classify {@code Person} objects by state and city, cascading two {@code Collector}s
224         * together:
225         *
226         * <pre>
227         *     {@code
228         *     Map<String, Map<String, List<Person>>> peopleByStateAndCity = personStream
229         *         .collect(Collectors.groupingBy(Person::getState, Collectors.groupingBy(Person::getCity)));
230         * }
231         * </pre>
232         *
233         * @param <R> the type of the result
234         * @param <A> the intermediate accumulation type of the {@code Collector}
235         * @param collector the {@code Collector} describing the reduction
236         * @return the result of the reduction
237         * @see #collect(Supplier, BiConsumer, BiConsumer)
238         * @see Collectors
239         */
240        public <A, R> R collect(final Collector<? super O, A, R> collector) {
241            makeTerminated();
242            return stream().collect(collector);
243        }
244
245        /**
246         * Performs a mutable reduction operation on the elements of this FailableStream. A mutable reduction is one in
247         * which the reduced value is a mutable result container, such as an {@code ArrayList}, and elements are
248         * incorporated by updating the state of the result rather than by replacing the result. This produces a result
249         * equivalent to:
250         *
251         * <pre>
252         * {@code
253         *     R result = supplier.get();
254         *     for (T element : this stream)
255         *         accumulator.accept(result, element);
256         *     return result;
257         * }
258         * </pre>
259         *
260         * <p>
261         * Like {@link #reduce(Object, BinaryOperator)}, {@code collect} operations can be parallelized without
262         * requiring additional synchronization.
263         *
264         * <p>
265         * This is a terminal operation.
266         *
267         * Note There are many existing classes in the JDK whose signatures are well-suited for use with method
268         * references as arguments to {@code collect()}. For example, the following will accumulate strings into an
269         * {@code ArrayList}:
270         *
271         * <pre>
272         *     {@code
273         *     List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
274         * }
275         * </pre>
276         *
277         * <p>
278         * The following will take a stream of strings and concatenates them into a single string:
279         *
280         * <pre>
281         *     {@code
282         *     String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
283         *         .toString();
284         * }
285         * </pre>
286         *
287         * @param <R> type of the result
288         * @param <A> Type of the accumulator.
289         * @param pupplier a function that creates a new result container. For a parallel execution, this function may
290         *        be called multiple times and must return a fresh value each time.
291         * @param accumulator An associative, non-interfering, stateless function for incorporating an additional
292         *        element into a result
293         * @param combiner An associative, non-interfering, stateless function for combining two values, which must be
294         *        compatible with the accumulator function
295         * @return The result of the reduction
296         */
297        public <A, R> R collect(final Supplier<R> pupplier, final BiConsumer<R, ? super O> accumulator,
298            final BiConsumer<R, R> combiner) {
299            makeTerminated();
300            return stream().collect(pupplier, accumulator, combiner);
301        }
302
303        /**
304         * Returns a FailableStream consisting of the elements of this stream that match the given FailablePredicate.
305         *
306         * <p>
307         * This is an intermediate operation.
308         *
309         * @param predicate a non-interfering, stateless predicate to apply to each element to determine if it should be
310         *        included.
311         * @return the new stream
312         */
313        public FailableStream<O> filter(final FailablePredicate<O, ?> predicate) {
314            assertNotTerminated();
315            stream = stream.filter(Failable.asPredicate(predicate));
316            return this;
317        }
318
319        /**
320         * Performs an action for each element of this stream.
321         *
322         * <p>
323         * This is a terminal operation.
324         *
325         * <p>
326         * The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation
327         * does <em>not</em> guarantee to respect the encounter order of the stream, as doing so would sacrifice the
328         * benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever
329         * thread the library chooses. If the action accesses shared state, it is responsible for providing the required
330         * synchronization.
331         *
332         * @param action a non-interfering action to perform on the elements
333         */
334        public void forEach(final FailableConsumer<O, ?> action) {
335            makeTerminated();
336            stream().forEach(Failable.asConsumer(action));
337        }
338
339        protected void makeTerminated() {
340            assertNotTerminated();
341            terminated = true;
342        }
343
344        /**
345         * Returns a stream consisting of the results of applying the given function to the elements of this stream.
346         *
347         * <p>
348         * This is an intermediate operation.
349         *
350         * @param <R> The element type of the new stream
351         * @param mapper A non-interfering, stateless function to apply to each element
352         * @return the new stream
353         */
354        public <R> FailableStream<R> map(final FailableFunction<O, R, ?> mapper) {
355            assertNotTerminated();
356            return new FailableStream<>(stream.map(Failable.asFunction(mapper)));
357        }
358
359        /**
360         * Performs a reduction on the elements of this stream, using the provided identity value and an associative
361         * accumulation function, and returns the reduced value. This is equivalent to:
362         *
363         * <pre>
364         * {@code
365         *     T result = identity;
366         *     for (T element : this stream)
367         *         result = accumulator.apply(result, element)
368         *     return result;
369         * }
370         * </pre>
371         *
372         * but is not constrained to execute sequentially.
373         *
374         * <p>
375         * The {@code identity} value must be an identity for the accumulator function. This means that for all
376         * {@code t}, {@code accumulator.apply(identity, t)} is equal to {@code t}. The {@code accumulator} function
377         * must be an associative function.
378         *
379         * <p>
380         * This is a terminal operation.
381         *
382         * Note Sum, min, max, average, and string concatenation are all special cases of reduction. Summing a
383         * stream of numbers can be expressed as:
384         *
385         * <pre>
386         *     {@code
387         *     Integer sum = integers.reduce(0, (a, b) -> a + b);
388         * }
389         * </pre>
390         *
391         * or:
392         *
393         * <pre>
394         *     {@code
395         *     Integer sum = integers.reduce(0, Integer::sum);
396         * }
397         * </pre>
398         *
399         * <p>
400         * While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running
401         * total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization
402         * and with greatly reduced risk of data races.
403         *
404         * @param identity the identity value for the accumulating function
405         * @param accumulator an associative, non-interfering, stateless function for combining two values
406         * @return the result of the reduction
407         */
408        public O reduce(final O identity, final BinaryOperator<O> accumulator) {
409            makeTerminated();
410            return stream().reduce(identity, accumulator);
411        }
412
413        /**
414         * Converts the FailableStream into an equivalent stream.
415         *
416         * @return A stream, which will return the same elements, which this FailableStream would return.
417         */
418        public Stream<O> stream() {
419            return stream;
420        }
421    }
422
423    /**
424     * Converts the given {@link Collection} into a {@link FailableStream}. This is basically a simplified, reduced
425     * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like
426     * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of
427     * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this:
428     *
429     * <pre>
430     * final List&lt;O&gt; list;
431     * final Method m;
432     * final Function&lt;O, String&gt; mapper = (o) -&gt; {
433     *     try {
434     *         return (String) m.invoke(o);
435     *     } catch (Throwable t) {
436     *         throw Failable.rethrow(t);
437     *     }
438     * };
439     * final List&lt;String&gt; strList = list.stream().map(mapper).collect(Collectors.toList());
440     * </pre>
441     *
442     * as follows:
443     *
444     * <pre>
445     * final List&lt;O&gt; list;
446     * final Method m;
447     * final List&lt;String&gt; strList = Failable.stream(list.stream()).map((o) -&gt; (String) m.invoke(o))
448     *     .collect(Collectors.toList());
449     * </pre>
450     *
451     * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of
452     * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the
453     * spirit of Lambdas better than the first version.
454     *
455     * @param <O> The streams element type.
456     * @param stream The stream, which is being converted.
457     * @return The {@link FailableStream}, which has been created by converting the stream.
458     */
459    public static <O> FailableStream<O> stream(final Collection<O> stream) {
460        return stream(stream.stream());
461    }
462
463    /**
464     * Converts the given {@link Stream stream} into a {@link FailableStream}. This is basically a simplified, reduced
465     * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like
466     * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of
467     * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this:
468     *
469     * <pre>
470     * final List&lt;O&gt; list;
471     * final Method m;
472     * final Function&lt;O, String&gt; mapper = (o) -&gt; {
473     *     try {
474     *         return (String) m.invoke(o);
475     *     } catch (Throwable t) {
476     *         throw Failable.rethrow(t);
477     *     }
478     * };
479     * final List&lt;String&gt; strList = list.stream().map(mapper).collect(Collectors.toList());
480     * </pre>
481     *
482     * as follows:
483     *
484     * <pre>
485     * final List&lt;O&gt; list;
486     * final Method m;
487     * final List&lt;String&gt; strList = Failable.stream(list.stream()).map((o) -&gt; (String) m.invoke(o))
488     *     .collect(Collectors.toList());
489     * </pre>
490     *
491     * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of
492     * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the
493     * spirit of Lambdas better than the first version.
494     *
495     * @param <O> The streams element type.
496     * @param stream The stream, which is being converted.
497     * @return The {@link FailableStream}, which has been created by converting the stream.
498     */
499    public static <O> FailableStream<O> stream(final Stream<O> stream) {
500        return new FailableStream<>(stream);
501    }
502
503    /**
504     * Returns a {@code Collector} that accumulates the input elements into a new array.
505     *
506     * @param pElementType Type of an element in the array.
507     * @param <O> the type of the input elements
508     * @return a {@code Collector} which collects all the input elements into an array, in encounter order
509     */
510    public static <O extends Object> Collector<O, ?, O[]> toArray(final Class<O> pElementType) {
511        return new ArrayCollector<>(pElementType);
512    }
513}