001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.lang3.stream;
018
019import java.lang.reflect.Array;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Collections;
023import java.util.List;
024import java.util.Set;
025import java.util.function.BiConsumer;
026import java.util.function.BinaryOperator;
027import java.util.function.Consumer;
028import java.util.function.Function;
029import java.util.function.Predicate;
030import java.util.function.Supplier;
031import java.util.stream.Collector;
032import java.util.stream.Collectors;
033import java.util.stream.Stream;
034
035import org.apache.commons.lang3.function.Failable;
036import org.apache.commons.lang3.function.FailableConsumer;
037import org.apache.commons.lang3.function.FailableFunction;
038import org.apache.commons.lang3.function.FailablePredicate;
039
040/**
041 * Provides utility functions, and classes for working with the
042 * {@code java.util.stream} package, or more generally, with Java 8 lambdas. More
043 * specifically, it attempts to address the fact that lambdas are supposed
044 * not to throw Exceptions, at least not checked Exceptions, AKA instances
045 * of {@link Exception}. This enforces the use of constructs like
046 * <pre>
047 *     Consumer&lt;java.lang.reflect.Method&gt; consumer = (m) -&gt; {
048 *         try {
049 *             m.invoke(o, args);
050 *         } catch (Throwable t) {
051 *             throw Failable.rethrow(t);
052 *         }
053 *    };
054 *    stream.forEach(consumer);
055 * </pre>
056 * Using a {@link FailableStream}, this can be rewritten as follows:
057 * <pre>
058 *     Streams.failable(stream).forEach((m) -&gt; m.invoke(o, args));
059 * </pre>
060 * Obviously, the second version is much more concise and the spirit of
061 * Lambda expressions is met better than in the first version.
062 *
063 * @see Stream
064 * @see Failable
065 * @since 3.11
066 */
067public class Streams {
068
069    public static class ArrayCollector<O> implements Collector<O, List<O>, O[]> {
070        private static final Set<Characteristics> characteristics = Collections.emptySet();
071        private final Class<O> elementType;
072
073        public ArrayCollector(final Class<O> elementType) {
074            this.elementType = elementType;
075        }
076
077        @Override
078        public BiConsumer<List<O>, O> accumulator() {
079            return List::add;
080        }
081
082        @Override
083        public Set<Characteristics> characteristics() {
084            return characteristics;
085        }
086
087        @Override
088        public BinaryOperator<List<O>> combiner() {
089            return (left, right) -> {
090                left.addAll(right);
091                return left;
092            };
093        }
094
095        @Override
096        public Function<List<O>, O[]> finisher() {
097            return list -> {
098                @SuppressWarnings("unchecked")
099                final O[] array = (O[]) Array.newInstance(elementType, list.size());
100                return list.toArray(array);
101            };
102        }
103
104        @Override
105        public Supplier<List<O>> supplier() {
106            return ArrayList::new;
107        }
108    }
109
110    /**
111     * A reduced, and simplified version of a {@link Stream} with failable method signatures.
112     *
113     * @param <O> The streams element type.
114     */
115    public static class FailableStream<O extends Object> {
116
117        private Stream<O> stream;
118        private boolean terminated;
119
120        /**
121         * Constructs a new instance with the given {@code stream}.
122         *
123         * @param stream The stream.
124         */
125        public FailableStream(final Stream<O> stream) {
126            this.stream = stream;
127        }
128
129        /**
130         * Returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on
131         * all elements if not necessary for determining the result. If the stream is empty then {@code true} is
132         * returned and the predicate is not evaluated.
133         *
134         * <p>
135         * This is a short-circuiting terminal operation.
136         *
137         * Note This method evaluates the <em>universal quantification</em> of the predicate over the elements of
138         * the stream (for all x P(x)). If the stream is empty, the quantification is said to be <em>vacuously
139         * satisfied</em> and is always {@code true} (regardless of P(x)).
140         *
141         * @param predicate A non-interfering, stateless predicate to apply to elements of this stream
142         * @return {@code true} If either all elements of the stream match the provided predicate or the stream is
143         *         empty, otherwise {@code false}.
144         */
145        public boolean allMatch(final FailablePredicate<O, ?> predicate) {
146            assertNotTerminated();
147            return stream().allMatch(Failable.asPredicate(predicate));
148        }
149
150        /**
151         * Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on
152         * all elements if not necessary for determining the result. If the stream is empty then {@code false} is
153         * returned and the predicate is not evaluated.
154         *
155         * <p>
156         * This is a short-circuiting terminal operation.
157         *
158         * Note This method evaluates the <em>existential quantification</em> of the predicate over the elements of
159         * the stream (for some x P(x)).
160         *
161         * @param predicate A non-interfering, stateless predicate to apply to elements of this stream
162         * @return {@code true} if any elements of the stream match the provided predicate, otherwise {@code false}
163         */
164        public boolean anyMatch(final FailablePredicate<O, ?> predicate) {
165            assertNotTerminated();
166            return stream().anyMatch(Failable.asPredicate(predicate));
167        }
168
169        protected void assertNotTerminated() {
170            if (terminated) {
171                throw new IllegalStateException("This stream is already terminated.");
172            }
173        }
174
175        /**
176         * Performs a mutable reduction operation on the elements of this stream using a {@code Collector}. A
177         * {@code Collector} encapsulates the functions used as arguments to
178         * {@link #collect(Supplier, BiConsumer, BiConsumer)}, allowing for reuse of collection strategies and
179         * composition of collect operations such as multiple-level grouping or partitioning.
180         *
181         * <p>
182         * If the underlying stream is parallel, and the {@code Collector} is concurrent, and either the stream is
183         * unordered or the collector is unordered, then a concurrent reduction will be performed (see {@link Collector}
184         * for details on concurrent reduction.)
185         *
186         * <p>
187         * This is a terminal operation.
188         *
189         * <p>
190         * When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to
191         * maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe
192         * data structures (such as {@code ArrayList}), no additional synchronization is needed for a parallel
193         * reduction.
194         *
195         * Note The following will accumulate strings into an ArrayList:
196         *
197         * <pre>
198         *     {@code
199         *     List<String> asList = stringStream.collect(Collectors.toList());
200         * }
201         * </pre>
202         *
203         * <p>
204         * The following will classify {@code Person} objects by city:
205         *
206         * <pre>
207         *     {@code
208         *     Map<String, List<Person>> peopleByCity = personStream.collect(Collectors.groupingBy(Person::getCity));
209         * }
210         * </pre>
211         *
212         * <p>
213         * The following will classify {@code Person} objects by state and city, cascading two {@code Collector}s
214         * together:
215         *
216         * <pre>
217         *     {@code
218         *     Map<String, Map<String, List<Person>>> peopleByStateAndCity = personStream
219         *         .collect(Collectors.groupingBy(Person::getState, Collectors.groupingBy(Person::getCity)));
220         * }
221         * </pre>
222         *
223         * @param <R> the type of the result
224         * @param <A> the intermediate accumulation type of the {@code Collector}
225         * @param collector the {@code Collector} describing the reduction
226         * @return the result of the reduction
227         * @see #collect(Supplier, BiConsumer, BiConsumer)
228         * @see Collectors
229         */
230        public <A, R> R collect(final Collector<? super O, A, R> collector) {
231            makeTerminated();
232            return stream().collect(collector);
233        }
234
235        /**
236         * Performs a mutable reduction operation on the elements of this FailableStream. A mutable reduction is one in
237         * which the reduced value is a mutable result container, such as an {@code ArrayList}, and elements are
238         * incorporated by updating the state of the result rather than by replacing the result. This produces a result
239         * equivalent to:
240         *
241         * <pre>
242         * {@code
243         *     R result = supplier.get();
244         *     for (T element : this stream)
245         *         accumulator.accept(result, element);
246         *     return result;
247         * }
248         * </pre>
249         *
250         * <p>
251         * Like {@link #reduce(Object, BinaryOperator)}, {@code collect} operations can be parallelized without
252         * requiring additional synchronization.
253         *
254         * <p>
255         * This is a terminal operation.
256         *
257         * Note There are many existing classes in the JDK whose signatures are well-suited for use with method
258         * references as arguments to {@code collect()}. For example, the following will accumulate strings into an
259         * {@code ArrayList}:
260         *
261         * <pre>
262         *     {@code
263         *     List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
264         * }
265         * </pre>
266         *
267         * <p>
268         * The following will take a stream of strings and concatenates them into a single string:
269         *
270         * <pre>
271         *     {@code
272         *     String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append)
273         *         .toString();
274         * }
275         * </pre>
276         *
277         * @param <R> type of the result
278         * @param <A> Type of the accumulator.
279         * @param pupplier a function that creates a new result container. For a parallel execution, this function may
280         *        be called multiple times and must return a fresh value each time.
281         * @param accumulator An associative, non-interfering, stateless function for incorporating an additional
282         *        element into a result
283         * @param combiner An associative, non-interfering, stateless function for combining two values, which must be
284         *        compatible with the accumulator function
285         * @return The result of the reduction
286         */
287        public <A, R> R collect(final Supplier<R> pupplier, final BiConsumer<R, ? super O> accumulator,
288            final BiConsumer<R, R> combiner) {
289            makeTerminated();
290            return stream().collect(pupplier, accumulator, combiner);
291        }
292
293        /**
294         * Returns a FailableStream consisting of the elements of this stream that match the given FailablePredicate.
295         *
296         * <p>
297         * This is an intermediate operation.
298         *
299         * @param predicate a non-interfering, stateless predicate to apply to each element to determine if it should be
300         *        included.
301         * @return the new stream
302         */
303        public FailableStream<O> filter(final FailablePredicate<O, ?> predicate) {
304            assertNotTerminated();
305            stream = stream.filter(Failable.asPredicate(predicate));
306            return this;
307        }
308
309        /**
310         * Performs an action for each element of this stream.
311         *
312         * <p>
313         * This is a terminal operation.
314         *
315         * <p>
316         * The behavior of this operation is explicitly nondeterministic. For parallel stream pipelines, this operation
317         * does <em>not</em> guarantee to respect the encounter order of the stream, as doing so would sacrifice the
318         * benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever
319         * thread the library chooses. If the action accesses shared state, it is responsible for providing the required
320         * synchronization.
321         *
322         * @param action a non-interfering action to perform on the elements
323         */
324        public void forEach(final FailableConsumer<O, ?> action) {
325            makeTerminated();
326            stream().forEach(Failable.asConsumer(action));
327        }
328
329        protected void makeTerminated() {
330            assertNotTerminated();
331            terminated = true;
332        }
333
334        /**
335         * Returns a stream consisting of the results of applying the given function to the elements of this stream.
336         *
337         * <p>
338         * This is an intermediate operation.
339         *
340         * @param <R> The element type of the new stream
341         * @param mapper A non-interfering, stateless function to apply to each element
342         * @return the new stream
343         */
344        public <R> FailableStream<R> map(final FailableFunction<O, R, ?> mapper) {
345            assertNotTerminated();
346            return new FailableStream<>(stream.map(Failable.asFunction(mapper)));
347        }
348
349        /**
350         * Performs a reduction on the elements of this stream, using the provided identity value and an associative
351         * accumulation function, and returns the reduced value. This is equivalent to:
352         *
353         * <pre>
354         * {@code
355         *     T result = identity;
356         *     for (T element : this stream)
357         *         result = accumulator.apply(result, element)
358         *     return result;
359         * }
360         * </pre>
361         *
362         * but is not constrained to execute sequentially.
363         *
364         * <p>
365         * The {@code identity} value must be an identity for the accumulator function. This means that for all
366         * {@code t}, {@code accumulator.apply(identity, t)} is equal to {@code t}. The {@code accumulator} function
367         * must be an associative function.
368         *
369         * <p>
370         * This is a terminal operation.
371         *
372         * Note Sum, min, max, average, and string concatenation are all special cases of reduction. Summing a
373         * stream of numbers can be expressed as:
374         *
375         * <pre>
376         *     {@code
377         *     Integer sum = integers.reduce(0, (a, b) -> a + b);
378         * }
379         * </pre>
380         *
381         * or:
382         *
383         * <pre>
384         *     {@code
385         *     Integer sum = integers.reduce(0, Integer::sum);
386         * }
387         * </pre>
388         *
389         * <p>
390         * While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running
391         * total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization
392         * and with greatly reduced risk of data races.
393         *
394         * @param identity the identity value for the accumulating function
395         * @param accumulator an associative, non-interfering, stateless function for combining two values
396         * @return the result of the reduction
397         */
398        public O reduce(final O identity, final BinaryOperator<O> accumulator) {
399            makeTerminated();
400            return stream().reduce(identity, accumulator);
401        }
402
403        /**
404         * Converts the FailableStream into an equivalent stream.
405         *
406         * @return A stream, which will return the same elements, which this FailableStream would return.
407         */
408        public Stream<O> stream() {
409            return stream;
410        }
411    }
412
413    /**
414     * Converts the given {@link Collection} into a {@link FailableStream}. This is basically a simplified, reduced
415     * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like
416     * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of
417     * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this:
418     *
419     * <pre>
420     * final List&lt;O&gt; list;
421     * final Method m;
422     * final Function&lt;O, String&gt; mapper = (o) -&gt; {
423     *     try {
424     *         return (String) m.invoke(o);
425     *     } catch (Throwable t) {
426     *         throw Failable.rethrow(t);
427     *     }
428     * };
429     * final List&lt;String&gt; strList = list.stream().map(mapper).collect(Collectors.toList());
430     * </pre>
431     *
432     * as follows:
433     *
434     * <pre>
435     * final List&lt;O&gt; list;
436     * final Method m;
437     * final List&lt;String&gt; strList = Failable.stream(list.stream()).map((o) -&gt; (String) m.invoke(o))
438     *     .collect(Collectors.toList());
439     * </pre>
440     *
441     * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of
442     * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the
443     * spirit of Lambdas better than the first version.
444     *
445     * @param <O> The streams element type.
446     * @param stream The stream, which is being converted.
447     * @return The {@link FailableStream}, which has been created by converting the stream.
448     */
449    public static <O> FailableStream<O> stream(final Collection<O> stream) {
450        return stream(stream.stream());
451    }
452
453    /**
454     * Converts the given {@link Stream stream} into a {@link FailableStream}. This is basically a simplified, reduced
455     * version of the {@link Stream} class, with the same underlying element stream, except that failable objects, like
456     * {@link FailablePredicate}, {@link FailableFunction}, or {@link FailableConsumer} may be applied, instead of
457     * {@link Predicate}, {@link Function}, or {@link Consumer}. The idea is to rewrite a code snippet like this:
458     *
459     * <pre>
460     * final List&lt;O&gt; list;
461     * final Method m;
462     * final Function&lt;O, String&gt; mapper = (o) -&gt; {
463     *     try {
464     *         return (String) m.invoke(o);
465     *     } catch (Throwable t) {
466     *         throw Failable.rethrow(t);
467     *     }
468     * };
469     * final List&lt;String&gt; strList = list.stream().map(mapper).collect(Collectors.toList());
470     * </pre>
471     *
472     * as follows:
473     *
474     * <pre>
475     * final List&lt;O&gt; list;
476     * final Method m;
477     * final List&lt;String&gt; strList = Failable.stream(list.stream()).map((o) -&gt; (String) m.invoke(o))
478     *     .collect(Collectors.toList());
479     * </pre>
480     *
481     * While the second version may not be <em>quite</em> as efficient (because it depends on the creation of
482     * additional, intermediate objects, of type FailableStream), it is much more concise, and readable, and meets the
483     * spirit of Lambdas better than the first version.
484     *
485     * @param <O> The streams element type.
486     * @param stream The stream, which is being converted.
487     * @return The {@link FailableStream}, which has been created by converting the stream.
488     */
489    public static <O> FailableStream<O> stream(final Stream<O> stream) {
490        return new FailableStream<>(stream);
491    }
492
493    /**
494     * Returns a {@code Collector} that accumulates the input elements into a new array.
495     *
496     * @param pElementType Type of an element in the array.
497     * @param <O> the type of the input elements
498     * @return a {@code Collector} which collects all the input elements into an array, in encounter order
499     */
500    public static <O extends Object> Collector<O, ?, O[]> toArray(final Class<O> pElementType) {
501        return new ArrayCollector<>(pElementType);
502    }
503}