001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.commons.io.function;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.List;
025import java.util.NoSuchElementException;
026import java.util.Objects;
027import java.util.Optional;
028import java.util.Spliterator;
029import java.util.Spliterators;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicReference;
032import java.util.function.BiFunction;
033import java.util.function.IntFunction;
034import java.util.function.ToDoubleFunction;
035import java.util.function.ToIntFunction;
036import java.util.function.ToLongFunction;
037import java.util.function.UnaryOperator;
038import java.util.stream.Collector;
039import java.util.stream.DoubleStream;
040import java.util.stream.IntStream;
041import java.util.stream.LongStream;
042import java.util.stream.Stream;
043import java.util.stream.StreamSupport;
044
045import org.apache.commons.io.IOExceptionList;
046
047/**
048 * Like {@link Stream} but throws {@link IOException}.
049 *
050 * @param <T> the type of the stream elements.
051 * @since 2.12.0
052 */
053public interface IOStream<T> extends IOBaseStream<T, IOStream<T>, Stream<T>> {
054
055    /**
056     * Constructs a new IOStream for the given Stream.
057     *
058     * @param <T> the type of the stream elements.
059     * @param stream The stream to delegate.
060     * @return a new IOStream.
061     */
062    static <T> IOStream<T> adapt(final Stream<T> stream) {
063        return IOStreamAdapter.adapt(stream);
064    }
065
066    /**
067     * This class' version of {@link Stream#empty()}.
068     *
069     * @param <T> the type of the stream elements
070     * @return an empty sequential {@code IOStreamImpl}.
071     * @see Stream#empty()
072     */
073    static <T> IOStream<T> empty() {
074        return IOStreamAdapter.adapt(Stream.empty());
075    }
076
077    /**
078     * Like {@link Stream#iterate(Object, UnaryOperator)} but for IO.
079     *
080     * @param <T> the type of stream elements.
081     * @param seed the initial element.
082     * @param f a function to be applied to the previous element to produce a new element.
083     * @return a new sequential {@code IOStream}.
084     */
085    static <T> IOStream<T> iterate(final T seed, final IOUnaryOperator<T> f) {
086        Objects.requireNonNull(f);
087        final Iterator<T> iterator = new Iterator<T>() {
088            @SuppressWarnings("unchecked")
089            T t = (T) IOStreams.NONE;
090
091            @Override
092            public boolean hasNext() {
093                return true;
094            }
095
096            @Override
097            public T next() throws NoSuchElementException {
098                try {
099                    return t = t == IOStreams.NONE ? seed : f.apply(t);
100                } catch (final IOException e) {
101                    final NoSuchElementException nsee = new NoSuchElementException();
102                    nsee.initCause(e);
103                    throw nsee;
104                }
105            }
106        };
107        return adapt(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false));
108    }
109
110    /**
111     * Null-safe version of {@link StreamSupport#stream(java.util.Spliterator, boolean)}.
112     *
113     * Copied from Apache Commons Lang.
114     *
115     * @param <T> the type of stream elements.
116     * @param values the elements of the new stream, may be {@code null}.
117     * @return the new stream on {@code values} or {@link Stream#empty()}.
118     */
119    @SuppressWarnings("resource") // call to #empty()
120    static <T> IOStream<T> of(final Iterable<T> values) {
121        return values == null ? empty() : adapt(StreamSupport.stream(values.spliterator(), false));
122    }
123
124    /**
125     * Null-safe version of {@link Stream#of(Object[])} for an IO stream.
126     *
127     * @param <T> the type of stream elements.
128     * @param values the elements of the new stream, may be {@code null}.
129     * @return the new stream on {@code values} or {@link Stream#empty()}.
130     */
131    @SuppressWarnings("resource")
132    @SafeVarargs // Creating a stream from an array is safe
133    static <T> IOStream<T> of(final T... values) {
134        return values == null || values.length == 0 ? empty() : adapt(Arrays.stream(values));
135    }
136
137    /**
138     * Returns a sequential {@code IOStreamImpl} containing a single element.
139     *
140     * @param t the single element
141     * @param <T> the type of stream elements
142     * @return a singleton sequential stream
143     */
144    static <T> IOStream<T> of(final T t) {
145        return adapt(Stream.of(t));
146    }
147
148    /**
149     * Like {@link Stream#allMatch(java.util.function.Predicate)} but throws {@link IOException}.
150     *
151     * @param predicate {@link Stream#allMatch(java.util.function.Predicate)}.
152     * @return Like {@link Stream#allMatch(java.util.function.Predicate)}.
153     * @throws IOException if an I/O error occurs.
154     */
155    @SuppressWarnings("unused") // thrown by Erase.
156    default boolean allMatch(final IOPredicate<? super T> predicate) throws IOException {
157        return unwrap().allMatch(t -> Erase.test(predicate, t));
158    }
159
160    /**
161     * Like {@link Stream#anyMatch(java.util.function.Predicate)} but throws {@link IOException}.
162     *
163     * @param predicate {@link Stream#anyMatch(java.util.function.Predicate)}.
164     * @return Like {@link Stream#anyMatch(java.util.function.Predicate)}.
165     * @throws IOException if an I/O error occurs.
166     */
167    @SuppressWarnings("unused") // thrown by Erase.
168    default boolean anyMatch(final IOPredicate<? super T> predicate) throws IOException {
169        return unwrap().anyMatch(t -> Erase.test(predicate, t));
170    }
171
172    /**
173     * TODO Package-private for now, needs IOCollector?
174     *
175     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
176     * would be ideal to have only one.
177     *
178     * Like {@link Stream#collect(Collector)}.
179     *
180     * Package private for now.
181     *
182     * @param <R> Like {@link Stream#collect(Collector)}.
183     * @param <A> Like {@link Stream#collect(Collector)}.
184     * @param collector Like {@link Stream#collect(Collector)}.
185     * @return Like {@link Stream#collect(Collector)}.
186     */
187    default <R, A> R collect(final Collector<? super T, A, R> collector) {
188        return unwrap().collect(collector);
189    }
190
191    /**
192     * Like
193     * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
194     *
195     * @param <R> Like
196     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
197     * @param supplier Like
198     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
199     * @param accumulator Like
200     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
201     * @param combiner Like
202     *        {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
203     * @return Like
204     *         {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
205     * @throws IOException if an I/O error occurs.
206     */
207    @SuppressWarnings("unused") // thrown by Erase.
208    default <R> R collect(final IOSupplier<R> supplier, final IOBiConsumer<R, ? super T> accumulator, final IOBiConsumer<R, R> combiner) throws IOException {
209        return unwrap().collect(() -> Erase.get(supplier), (t, u) -> Erase.accept(accumulator, t, u), (t, u) -> Erase.accept(combiner, t, u));
210    }
211
212    /**
213     * Like {@link Stream#count()}.
214     *
215     * @return Like {@link Stream#count()}.
216     */
217    default long count() {
218        return unwrap().count();
219    }
220
221    /**
222     * Like {@link Stream#distinct()}.
223     *
224     * @return Like {@link Stream#distinct()}.
225     */
226    default IOStream<T> distinct() {
227        return adapt(unwrap().distinct());
228    }
229
230    /**
231     * Like {@link Stream#filter(java.util.function.Predicate)}.
232     *
233     * @param predicate Like {@link Stream#filter(java.util.function.Predicate)}.
234     * @return Like {@link Stream#filter(java.util.function.Predicate)}.
235     * @throws IOException if an I/O error occurs.
236     */
237    @SuppressWarnings("unused") // thrown by Erase.
238    default IOStream<T> filter(final IOPredicate<? super T> predicate) throws IOException {
239        return adapt(unwrap().filter(t -> Erase.test(predicate, t)));
240    }
241
242    /**
243     * Like {@link Stream#findAny()}.
244     *
245     * @return Like {@link Stream#findAny()}.
246     */
247    default Optional<T> findAny() {
248        return unwrap().findAny();
249    }
250
251    /**
252     * Like {@link Stream#findFirst()}.
253     *
254     * @return Like {@link Stream#findFirst()}.
255     */
256    default Optional<T> findFirst() {
257        return unwrap().findFirst();
258    }
259
260    /**
261     * Like {@link Stream#flatMap(java.util.function.Function)}.
262     *
263     * @param <R> Like {@link Stream#flatMap(java.util.function.Function)}.
264     * @param mapper Like {@link Stream#flatMap(java.util.function.Function)}.
265     * @return Like {@link Stream#flatMap(java.util.function.Function)}.
266     * @throws IOException if an I/O error occurs.
267     */
268    @SuppressWarnings("unused") // thrown by Erase.
269    default <R> IOStream<R> flatMap(final IOFunction<? super T, ? extends IOStream<? extends R>> mapper) throws IOException {
270        return adapt(unwrap().flatMap(t -> Erase.apply(mapper, t).unwrap()));
271    }
272
273    /**
274     * TODO Package-private for now, needs IODoubleStream?
275     *
276     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
277     * would be ideal to have only one.
278     *
279     * Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
280     *
281     * @param mapper Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
282     * @return Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
283     * @throws IOException if an I/O error occurs.
284     */
285    @SuppressWarnings("unused") // thrown by Erase.
286    default DoubleStream flatMapToDouble(final IOFunction<? super T, ? extends DoubleStream> mapper) throws IOException {
287        return unwrap().flatMapToDouble(t -> Erase.apply(mapper, t));
288    }
289
290    /**
291     * TODO Package-private for now, needs IOIntStream?
292     *
293     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
294     * would be ideal to have only one.
295     *
296     * Like {@link Stream#flatMapToInt(java.util.function.Function)}.
297     *
298     * @param mapper Like {@link Stream#flatMapToInt(java.util.function.Function)}.
299     * @return Like {@link Stream#flatMapToInt(java.util.function.Function)}.
300     * @throws IOException if an I/O error occurs.
301     */
302    @SuppressWarnings("unused") // thrown by Erase.
303    default IntStream flatMapToInt(final IOFunction<? super T, ? extends IntStream> mapper) throws IOException {
304        return unwrap().flatMapToInt(t -> Erase.apply(mapper, t));
305    }
306
307    /**
308     * TODO Package-private for now, needs IOLongStream?
309     *
310     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
311     * would be ideal to have only one.
312     *
313     * Like {@link Stream#flatMapToLong(java.util.function.Function)}.
314     *
315     * @param mapper Like {@link Stream#flatMapToLong(java.util.function.Function)}.
316     * @return Like {@link Stream#flatMapToLong(java.util.function.Function)}.
317     * @throws IOException if an I/O error occurs.
318     */
319    @SuppressWarnings("unused") // thrown by Erase.
320    default LongStream flatMapToLong(final IOFunction<? super T, ? extends LongStream> mapper) throws IOException {
321        return unwrap().flatMapToLong(t -> Erase.apply(mapper, t));
322    }
323
324    /**
325     * Performs an action for each element gathering any exceptions.
326     *
327     * @param action The action to apply to each element.
328     * @throws IOExceptionList if any I/O errors occur.
329     */
330    default void forAll(final IOConsumer<T> action) throws IOExceptionList {
331        forAll(action, (i, e) -> e);
332    }
333
334    /**
335     * Performs an action for each element gathering any exceptions.
336     *
337     * @param action The action to apply to each element.
338     * @param exSupplier The exception supplier.
339     * @throws IOExceptionList if any I/O errors occur.
340     */
341    default void forAll(final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList {
342        final AtomicReference<List<IOException>> causeList = new AtomicReference<>();
343        final AtomicInteger index = new AtomicInteger();
344        final IOConsumer<T> safeAction = IOStreams.toIOConsumer(action);
345        unwrap().forEach(e -> {
346            try {
347                safeAction.accept(e);
348            } catch (final IOException innerEx) {
349                if (causeList.get() == null) {
350                    // Only allocate if required
351                    causeList.set(new ArrayList<>());
352                }
353                if (exSupplier != null) {
354                    causeList.get().add(exSupplier.apply(index.get(), innerEx));
355                }
356            }
357            index.incrementAndGet();
358        });
359        IOExceptionList.checkEmpty(causeList.get(), null);
360    }
361
362    /**
363     * Like {@link Stream#forEach(java.util.function.Consumer)} but throws {@link IOException}.
364     *
365     * @param action Like {@link Stream#forEach(java.util.function.Consumer)}.
366     * @throws IOException if an I/O error occurs.
367     */
368    @SuppressWarnings("unused") // thrown by Erase.
369    default void forEach(final IOConsumer<? super T> action) throws IOException {
370        unwrap().forEach(e -> Erase.accept(action, e));
371    }
372
373    /**
374     * Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
375     *
376     * @param action Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
377     * @throws IOException if an I/O error occurs.
378     */
379    @SuppressWarnings("unused") // thrown by Erase.
380    default void forEachOrdered(final IOConsumer<? super T> action) throws IOException {
381        unwrap().forEachOrdered(e -> Erase.accept(action, e));
382    }
383
384    /**
385     * Like {@link Stream#limit(long)}.
386     *
387     * @param maxSize Like {@link Stream#limit(long)}.
388     * @return Like {@link Stream#limit(long)}.
389     */
390    default IOStream<T> limit(final long maxSize) {
391        return adapt(unwrap().limit(maxSize));
392    }
393
394    /**
395     * Like {@link Stream#map(java.util.function.Function)}.
396     *
397     * @param <R> Like {@link Stream#map(java.util.function.Function)}.
398     * @param mapper Like {@link Stream#map(java.util.function.Function)}.
399     * @return Like {@link Stream#map(java.util.function.Function)}.
400     * @throws IOException if an I/O error occurs.
401     */
402    @SuppressWarnings("unused") // thrown by Erase.
403    default <R> IOStream<R> map(final IOFunction<? super T, ? extends R> mapper) throws IOException {
404        return adapt(unwrap().map(t -> Erase.apply(mapper, t)));
405    }
406
407    /**
408     * TODO Package-private for now, needs IOToDoubleFunction?
409     *
410     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
411     * would be ideal to have only one.
412     *
413     * Like {@link Stream#mapToDouble(ToDoubleFunction)}.
414     *
415     * Package private for now.
416     *
417     * @param mapper Like {@link Stream#mapToDouble(ToDoubleFunction)}.
418     * @return Like {@link Stream#mapToDouble(ToDoubleFunction)}.
419     */
420    default DoubleStream mapToDouble(final ToDoubleFunction<? super T> mapper) {
421        return unwrap().mapToDouble(mapper);
422    }
423
424    /**
425     * TODO Package-private for now, needs IOToIntFunction?
426     *
427     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
428     * would be ideal to have only one.
429     *
430     * Like {@link Stream#mapToInt(ToIntFunction)}.
431     *
432     * Package private for now.
433     *
434     * @param mapper Like {@link Stream#mapToInt(ToIntFunction)}.
435     * @return Like {@link Stream#mapToInt(ToIntFunction)}.
436     */
437    default IntStream mapToInt(final ToIntFunction<? super T> mapper) {
438        return unwrap().mapToInt(mapper);
439    }
440
441    /**
442     * TODO Package-private for now, needs IOToLongFunction?
443     *
444     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
445     * would be ideal to have only one.
446     *
447     * Like {@link Stream#mapToLong(ToLongFunction)}.
448     *
449     * Package private for now.
450     *
451     * @param mapper Like {@link Stream#mapToLong(ToLongFunction)}.
452     * @return Like {@link Stream#mapToLong(ToLongFunction)}.
453     */
454    default LongStream mapToLong(final ToLongFunction<? super T> mapper) {
455        return unwrap().mapToLong(mapper);
456    }
457
458    /**
459     * Like {@link Stream#max(java.util.Comparator)}.
460     *
461     * @param comparator Like {@link Stream#max(java.util.Comparator)}.
462     * @return Like {@link Stream#max(java.util.Comparator)}.
463     * @throws IOException if an I/O error occurs.
464     */
465    @SuppressWarnings("unused") // thrown by Erase.
466    default Optional<T> max(final IOComparator<? super T> comparator) throws IOException {
467        return unwrap().max((t, u) -> Erase.compare(comparator, t, u));
468    }
469
470    /**
471     * Like {@link Stream#min(java.util.Comparator)}.
472     *
473     * @param comparator Like {@link Stream#min(java.util.Comparator)}.
474     * @return Like {@link Stream#min(java.util.Comparator)}.
475     * @throws IOException if an I/O error occurs.
476     */
477    @SuppressWarnings("unused") // thrown by Erase.
478    default Optional<T> min(final IOComparator<? super T> comparator) throws IOException {
479        return unwrap().min((t, u) -> Erase.compare(comparator, t, u));
480    }
481
482    /**
483     * Like {@link Stream#noneMatch(java.util.function.Predicate)}.
484     *
485     * @param predicate Like {@link Stream#noneMatch(java.util.function.Predicate)}.
486     * @return Like {@link Stream#noneMatch(java.util.function.Predicate)}.
487     * @throws IOException if an I/O error occurs.
488     */
489    @SuppressWarnings("unused") // thrown by Erase.
490    default boolean noneMatch(final IOPredicate<? super T> predicate) throws IOException {
491        return unwrap().noneMatch(t -> Erase.test(predicate, t));
492    }
493
494    /**
495     * Like {@link Stream#peek(java.util.function.Consumer)}.
496     *
497     * @param action Like {@link Stream#peek(java.util.function.Consumer)}.
498     * @return Like {@link Stream#peek(java.util.function.Consumer)}.
499     * @throws IOException if an I/O error occurs.
500     */
501    @SuppressWarnings("unused") // thrown by Erase.
502    default IOStream<T> peek(final IOConsumer<? super T> action) throws IOException {
503        return adapt(unwrap().peek(t -> Erase.accept(action, t)));
504    }
505
506    /**
507     * Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
508     *
509     * @param accumulator Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
510     * @return Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
511     * @throws IOException if an I/O error occurs.
512     */
513    @SuppressWarnings("unused") // thrown by Erase.
514    default Optional<T> reduce(final IOBinaryOperator<T> accumulator) throws IOException {
515        return unwrap().reduce((t, u) -> Erase.apply(accumulator, t, u));
516    }
517
518    /**
519     * Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
520     *
521     * @param identity Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
522     * @param accumulator Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
523     * @return Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
524     * @throws IOException if an I/O error occurs.
525     */
526    @SuppressWarnings("unused") // thrown by Erase.
527    default T reduce(final T identity, final IOBinaryOperator<T> accumulator) throws IOException {
528        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u));
529    }
530
531    /**
532     * Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
533     *
534     * @param <U> Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
535     * @param identity Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
536     * @param accumulator Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
537     * @param combiner Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
538     * @return Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
539     * @throws IOException if an I/O error occurs.
540     */
541    @SuppressWarnings("unused") // thrown by Erase.
542    default <U> U reduce(final U identity, final IOBiFunction<U, ? super T, U> accumulator, final IOBinaryOperator<U> combiner) throws IOException {
543        return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u), (t, u) -> Erase.apply(combiner, t, u));
544    }
545
546    /**
547     * Like {@link Stream#skip(long)}.
548     *
549     * @param n Like {@link Stream#skip(long)}.
550     * @return Like {@link Stream#skip(long)}.
551     */
552    default IOStream<T> skip(final long n) {
553        return adapt(unwrap().skip(n));
554    }
555
556    /**
557     * Like {@link Stream#sorted()}.
558     *
559     * @return Like {@link Stream#sorted()}.
560     */
561    default IOStream<T> sorted() {
562        return adapt(unwrap().sorted());
563    }
564
565    /**
566     * Like {@link Stream#sorted(java.util.Comparator)}.
567     *
568     * @param comparator Like {@link Stream#sorted(java.util.Comparator)}.
569     * @return Like {@link Stream#sorted(java.util.Comparator)}.
570     * @throws IOException if an I/O error occurs.
571     */
572    @SuppressWarnings("unused") // thrown by Erase.
573    default IOStream<T> sorted(final IOComparator<? super T> comparator) throws IOException {
574        return adapt(unwrap().sorted((t, u) -> Erase.compare(comparator, t, u)));
575    }
576
577    /**
578     * Like {@link Stream#toArray()}.
579     *
580     * @return {@link Stream#toArray()}.
581     */
582    default Object[] toArray() {
583        return unwrap().toArray();
584    }
585
586    /**
587     * TODO Package-private for now, needs IOIntFunction?
588     *
589     * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
590     * would be ideal to have only one.
591     *
592     * Like {@link Stream#toArray(IntFunction)}.
593     *
594     * Package private for now.
595     *
596     * @param <A> Like {@link Stream#toArray(IntFunction)}.
597     * @param generator Like {@link Stream#toArray(IntFunction)}.
598     * @return Like {@link Stream#toArray(IntFunction)}.
599     */
600    default <A> A[] toArray(final IntFunction<A[]> generator) {
601        return unwrap().toArray(generator);
602    }
603
604}