1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.io.function;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.NoSuchElementException;
26 import java.util.Objects;
27 import java.util.Optional;
28 import java.util.Spliterator;
29 import java.util.Spliterators;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicReference;
32 import java.util.function.BiFunction;
33 import java.util.function.IntFunction;
34 import java.util.function.ToDoubleFunction;
35 import java.util.function.ToIntFunction;
36 import java.util.function.ToLongFunction;
37 import java.util.function.UnaryOperator;
38 import java.util.stream.Collector;
39 import java.util.stream.DoubleStream;
40 import java.util.stream.IntStream;
41 import java.util.stream.LongStream;
42 import java.util.stream.Stream;
43 import java.util.stream.StreamSupport;
44
45 import org.apache.commons.io.IOExceptionList;
46
47 /**
48 * Like {@link Stream} but throws {@link IOException}.
49 *
50 * @param <T> the type of the stream elements.
51 * @since 2.12.0
52 */
53 public interface IOStream<T> extends IOBaseStream<T, IOStream<T>, Stream<T>> {
54
55 /**
56 * Constructs a new IOStream for the given Stream.
57 *
58 * @param <T> the type of the stream elements.
59 * @param stream The stream to delegate.
60 * @return a new IOStream.
61 */
62 static <T> IOStream<T> adapt(final Stream<T> stream) {
63 return IOStreamAdapter.adapt(stream);
64 }
65
66 /**
67 * This class' version of {@link Stream#empty()}.
68 *
69 * @param <T> the type of the stream elements
70 * @return an empty sequential {@code IOStreamImpl}.
71 * @see Stream#empty()
72 */
73 static <T> IOStream<T> empty() {
74 return IOStreamAdapter.adapt(Stream.empty());
75 }
76
77 /**
78 * Like {@link Stream#iterate(Object, UnaryOperator)} but for IO.
79 *
80 * @param <T> the type of stream elements.
81 * @param seed the initial element.
82 * @param f a function to be applied to the previous element to produce a new element.
83 * @return a new sequential {@code IOStream}.
84 */
85 static <T> IOStream<T> iterate(final T seed, final IOUnaryOperator<T> f) {
86 Objects.requireNonNull(f);
87 final Iterator<T> iterator = new Iterator<T>() {
88 @SuppressWarnings("unchecked")
89 T t = (T) IOStreams.NONE;
90
91 @Override
92 public boolean hasNext() {
93 return true;
94 }
95
96 @Override
97 public T next() throws NoSuchElementException {
98 try {
99 return t = t == IOStreams.NONE ? seed : f.apply(t);
100 } catch (final IOException e) {
101 final NoSuchElementException nsee = new NoSuchElementException();
102 nsee.initCause(e);
103 throw nsee;
104 }
105 }
106 };
107 return adapt(StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED | Spliterator.IMMUTABLE), false));
108 }
109
110 /**
111 * Null-safe version of {@link StreamSupport#stream(java.util.Spliterator, boolean)}.
112 *
113 * Copied from Apache Commons Lang.
114 *
115 * @param <T> the type of stream elements.
116 * @param values the elements of the new stream, may be {@code null}.
117 * @return the new stream on {@code values} or {@link Stream#empty()}.
118 */
119 static <T> IOStream<T> of(final Iterable<T> values) {
120 return values == null ? empty() : adapt(StreamSupport.stream(values.spliterator(), false));
121 }
122
123 /**
124 * Null-safe version of {@link Stream#of(Object[])} for an IO stream.
125 *
126 * @param <T> the type of stream elements.
127 * @param values the elements of the new stream, may be {@code null}.
128 * @return the new stream on {@code values} or {@link Stream#empty()}.
129 */
130 @SafeVarargs // Creating a stream from an array is safe
131 static <T> IOStream<T> of(final T... values) {
132 return values == null || values.length == 0 ? empty() : adapt(Arrays.stream(values));
133 }
134
135 /**
136 * Returns a sequential {@code IOStreamImpl} containing a single element.
137 *
138 * @param t the single element
139 * @param <T> the type of stream elements
140 * @return a singleton sequential stream
141 */
142 static <T> IOStream<T> of(final T t) {
143 return adapt(Stream.of(t));
144 }
145
146 /**
147 * Like {@link Stream#allMatch(java.util.function.Predicate)} but throws {@link IOException}.
148 *
149 * @param predicate {@link Stream#allMatch(java.util.function.Predicate)}.
150 * @return Like {@link Stream#allMatch(java.util.function.Predicate)}.
151 * @throws IOException if an I/O error occurs.
152 */
153 @SuppressWarnings("unused") // thrown by Erase.
154 default boolean allMatch(final IOPredicate<? super T> predicate) throws IOException {
155 return unwrap().allMatch(t -> Erase.test(predicate, t));
156 }
157
158 /**
159 * Like {@link Stream#anyMatch(java.util.function.Predicate)} but throws {@link IOException}.
160 *
161 * @param predicate {@link Stream#anyMatch(java.util.function.Predicate)}.
162 * @return Like {@link Stream#anyMatch(java.util.function.Predicate)}.
163 * @throws IOException if an I/O error occurs.
164 */
165 @SuppressWarnings("unused") // thrown by Erase.
166 default boolean anyMatch(final IOPredicate<? super T> predicate) throws IOException {
167 return unwrap().anyMatch(t -> Erase.test(predicate, t));
168 }
169
170 /**
171 * TODO Package-private for now, needs IOCollector?
172 *
173 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
174 * would be ideal to have only one.
175 *
176 * Like {@link Stream#collect(Collector)}.
177 *
178 * Package private for now.
179 *
180 * @param <R> Like {@link Stream#collect(Collector)}.
181 * @param <A> Like {@link Stream#collect(Collector)}.
182 * @param collector Like {@link Stream#collect(Collector)}.
183 * @return Like {@link Stream#collect(Collector)}.
184 */
185 default <R, A> R collect(final Collector<? super T, A, R> collector) {
186 return unwrap().collect(collector);
187 }
188
189 /**
190 * Like
191 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
192 *
193 * @param <R> Like
194 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
195 * @param supplier Like
196 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
197 * @param accumulator Like
198 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
199 * @param combiner Like
200 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
201 * @return Like
202 * {@link Stream#collect(java.util.function.Supplier, java.util.function.BiConsumer, java.util.function.BiConsumer)}.
203 * @throws IOException if an I/O error occurs.
204 */
205 @SuppressWarnings("unused") // thrown by Erase.
206 default <R> R collect(final IOSupplier<R> supplier, final IOBiConsumer<R, ? super T> accumulator, final IOBiConsumer<R, R> combiner) throws IOException {
207 return unwrap().collect(() -> Erase.get(supplier), (t, u) -> Erase.accept(accumulator, t, u), (t, u) -> Erase.accept(combiner, t, u));
208 }
209
210 /**
211 * Like {@link Stream#count()}.
212 *
213 * @return Like {@link Stream#count()}.
214 */
215 default long count() {
216 return unwrap().count();
217 }
218
219 /**
220 * Like {@link Stream#distinct()}.
221 *
222 * @return Like {@link Stream#distinct()}.
223 */
224 default IOStream<T> distinct() {
225 return adapt(unwrap().distinct());
226 }
227
228 /**
229 * Like {@link Stream#filter(java.util.function.Predicate)}.
230 *
231 * @param predicate Like {@link Stream#filter(java.util.function.Predicate)}.
232 * @return Like {@link Stream#filter(java.util.function.Predicate)}.
233 * @throws IOException if an I/O error occurs.
234 */
235 @SuppressWarnings("unused") // thrown by Erase.
236 default IOStream<T> filter(final IOPredicate<? super T> predicate) throws IOException {
237 return adapt(unwrap().filter(t -> Erase.test(predicate, t)));
238 }
239
240 /**
241 * Like {@link Stream#findAny()}.
242 *
243 * @return Like {@link Stream#findAny()}.
244 */
245 default Optional<T> findAny() {
246 return unwrap().findAny();
247 }
248
249 /**
250 * Like {@link Stream#findFirst()}.
251 *
252 * @return Like {@link Stream#findFirst()}.
253 */
254 default Optional<T> findFirst() {
255 return unwrap().findFirst();
256 }
257
258 /**
259 * Like {@link Stream#flatMap(java.util.function.Function)}.
260 *
261 * @param <R> Like {@link Stream#flatMap(java.util.function.Function)}.
262 * @param mapper Like {@link Stream#flatMap(java.util.function.Function)}.
263 * @return Like {@link Stream#flatMap(java.util.function.Function)}.
264 * @throws IOException if an I/O error occurs.
265 */
266 @SuppressWarnings({ "unused", "resource" }) // thrown by Erase; resource closed by caller.
267 default <R> IOStream<R> flatMap(final IOFunction<? super T, ? extends IOStream<? extends R>> mapper) throws IOException {
268 return adapt(unwrap().flatMap(t -> Erase.apply(mapper, t).unwrap()));
269 }
270
271 /**
272 * TODO Package-private for now, needs IODoubleStream?
273 *
274 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
275 * would be ideal to have only one.
276 *
277 * Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
278 *
279 * @param mapper Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
280 * @return Like {@link Stream#flatMapToDouble(java.util.function.Function)}.
281 * @throws IOException if an I/O error occurs.
282 */
283 @SuppressWarnings("unused") // thrown by Erase.
284 default DoubleStream flatMapToDouble(final IOFunction<? super T, ? extends DoubleStream> mapper) throws IOException {
285 return unwrap().flatMapToDouble(t -> Erase.apply(mapper, t));
286 }
287
288 /**
289 * TODO Package-private for now, needs IOIntStream?
290 *
291 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
292 * would be ideal to have only one.
293 *
294 * Like {@link Stream#flatMapToInt(java.util.function.Function)}.
295 *
296 * @param mapper Like {@link Stream#flatMapToInt(java.util.function.Function)}.
297 * @return Like {@link Stream#flatMapToInt(java.util.function.Function)}.
298 * @throws IOException if an I/O error occurs.
299 */
300 @SuppressWarnings("unused") // thrown by Erase.
301 default IntStream flatMapToInt(final IOFunction<? super T, ? extends IntStream> mapper) throws IOException {
302 return unwrap().flatMapToInt(t -> Erase.apply(mapper, t));
303 }
304
305 /**
306 * TODO Package-private for now, needs IOLongStream?
307 *
308 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
309 * would be ideal to have only one.
310 *
311 * Like {@link Stream#flatMapToLong(java.util.function.Function)}.
312 *
313 * @param mapper Like {@link Stream#flatMapToLong(java.util.function.Function)}.
314 * @return Like {@link Stream#flatMapToLong(java.util.function.Function)}.
315 * @throws IOException if an I/O error occurs.
316 */
317 @SuppressWarnings("unused") // thrown by Erase.
318 default LongStream flatMapToLong(final IOFunction<? super T, ? extends LongStream> mapper) throws IOException {
319 return unwrap().flatMapToLong(t -> Erase.apply(mapper, t));
320 }
321
322 /**
323 * Performs an action for each element gathering any exceptions.
324 *
325 * @param action The action to apply to each element.
326 * @throws IOExceptionList if any I/O errors occur.
327 */
328 default void forAll(final IOConsumer<T> action) throws IOExceptionList {
329 forAll(action, (i, e) -> e);
330 }
331
332 /**
333 * Performs an action for each element gathering any exceptions.
334 *
335 * @param action The action to apply to each element.
336 * @param exSupplier The exception supplier.
337 * @throws IOExceptionList if any I/O errors occur.
338 */
339 default void forAll(final IOConsumer<T> action, final BiFunction<Integer, IOException, IOException> exSupplier) throws IOExceptionList {
340 final AtomicReference<List<IOException>> causeList = new AtomicReference<>();
341 final AtomicInteger index = new AtomicInteger();
342 final IOConsumer<T> safeAction = IOStreams.toIOConsumer(action);
343 unwrap().forEach(e -> {
344 try {
345 safeAction.accept(e);
346 } catch (final IOException innerEx) {
347 if (causeList.get() == null) {
348 // Only allocate if required
349 causeList.set(new ArrayList<>());
350 }
351 if (exSupplier != null) {
352 causeList.get().add(exSupplier.apply(index.get(), innerEx));
353 }
354 }
355 index.incrementAndGet();
356 });
357 IOExceptionList.checkEmpty(causeList.get(), null);
358 }
359
360 /**
361 * Like {@link Stream#forEach(java.util.function.Consumer)} but throws {@link IOException}.
362 *
363 * @param action Like {@link Stream#forEach(java.util.function.Consumer)}.
364 * @throws IOException if an I/O error occurs.
365 */
366 @SuppressWarnings("unused") // thrown by Erase.
367 default void forEach(final IOConsumer<? super T> action) throws IOException {
368 unwrap().forEach(e -> Erase.accept(action, e));
369 }
370
371 /**
372 * Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
373 *
374 * @param action Like {@link Stream#forEachOrdered(java.util.function.Consumer)}.
375 * @throws IOException if an I/O error occurs.
376 */
377 @SuppressWarnings("unused") // thrown by Erase.
378 default void forEachOrdered(final IOConsumer<? super T> action) throws IOException {
379 unwrap().forEachOrdered(e -> Erase.accept(action, e));
380 }
381
382 /**
383 * Like {@link Stream#limit(long)}.
384 *
385 * @param maxSize Like {@link Stream#limit(long)}.
386 * @return Like {@link Stream#limit(long)}.
387 */
388 default IOStream<T> limit(final long maxSize) {
389 return adapt(unwrap().limit(maxSize));
390 }
391
392 /**
393 * Like {@link Stream#map(java.util.function.Function)}.
394 *
395 * @param <R> Like {@link Stream#map(java.util.function.Function)}.
396 * @param mapper Like {@link Stream#map(java.util.function.Function)}.
397 * @return Like {@link Stream#map(java.util.function.Function)}.
398 * @throws IOException if an I/O error occurs.
399 */
400 @SuppressWarnings("unused") // thrown by Erase.
401 default <R> IOStream<R> map(final IOFunction<? super T, ? extends R> mapper) throws IOException {
402 return adapt(unwrap().map(t -> Erase.apply(mapper, t)));
403 }
404
405 /**
406 * TODO Package-private for now, needs IOToDoubleFunction?
407 *
408 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
409 * would be ideal to have only one.
410 *
411 * Like {@link Stream#mapToDouble(ToDoubleFunction)}.
412 *
413 * Package private for now.
414 *
415 * @param mapper Like {@link Stream#mapToDouble(ToDoubleFunction)}.
416 * @return Like {@link Stream#mapToDouble(ToDoubleFunction)}.
417 */
418 default DoubleStream mapToDouble(final ToDoubleFunction<? super T> mapper) {
419 return unwrap().mapToDouble(mapper);
420 }
421
422 /**
423 * TODO Package-private for now, needs IOToIntFunction?
424 *
425 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
426 * would be ideal to have only one.
427 *
428 * Like {@link Stream#mapToInt(ToIntFunction)}.
429 *
430 * Package private for now.
431 *
432 * @param mapper Like {@link Stream#mapToInt(ToIntFunction)}.
433 * @return Like {@link Stream#mapToInt(ToIntFunction)}.
434 */
435 default IntStream mapToInt(final ToIntFunction<? super T> mapper) {
436 return unwrap().mapToInt(mapper);
437 }
438
439 /**
440 * TODO Package-private for now, needs IOToLongFunction?
441 *
442 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
443 * would be ideal to have only one.
444 *
445 * Like {@link Stream#mapToLong(ToLongFunction)}.
446 *
447 * Package private for now.
448 *
449 * @param mapper Like {@link Stream#mapToLong(ToLongFunction)}.
450 * @return Like {@link Stream#mapToLong(ToLongFunction)}.
451 */
452 default LongStream mapToLong(final ToLongFunction<? super T> mapper) {
453 return unwrap().mapToLong(mapper);
454 }
455
456 /**
457 * Like {@link Stream#max(java.util.Comparator)}.
458 *
459 * @param comparator Like {@link Stream#max(java.util.Comparator)}.
460 * @return Like {@link Stream#max(java.util.Comparator)}.
461 * @throws IOException if an I/O error occurs.
462 */
463 @SuppressWarnings("unused") // thrown by Erase.
464 default Optional<T> max(final IOComparator<? super T> comparator) throws IOException {
465 return unwrap().max((t, u) -> Erase.compare(comparator, t, u));
466 }
467
468 /**
469 * Like {@link Stream#min(java.util.Comparator)}.
470 *
471 * @param comparator Like {@link Stream#min(java.util.Comparator)}.
472 * @return Like {@link Stream#min(java.util.Comparator)}.
473 * @throws IOException if an I/O error occurs.
474 */
475 @SuppressWarnings("unused") // thrown by Erase.
476 default Optional<T> min(final IOComparator<? super T> comparator) throws IOException {
477 return unwrap().min((t, u) -> Erase.compare(comparator, t, u));
478 }
479
480 /**
481 * Like {@link Stream#noneMatch(java.util.function.Predicate)}.
482 *
483 * @param predicate Like {@link Stream#noneMatch(java.util.function.Predicate)}.
484 * @return Like {@link Stream#noneMatch(java.util.function.Predicate)}.
485 * @throws IOException if an I/O error occurs.
486 */
487 @SuppressWarnings("unused") // thrown by Erase.
488 default boolean noneMatch(final IOPredicate<? super T> predicate) throws IOException {
489 return unwrap().noneMatch(t -> Erase.test(predicate, t));
490 }
491
492 /**
493 * Like {@link Stream#peek(java.util.function.Consumer)}.
494 *
495 * @param action Like {@link Stream#peek(java.util.function.Consumer)}.
496 * @return Like {@link Stream#peek(java.util.function.Consumer)}.
497 * @throws IOException if an I/O error occurs.
498 */
499 @SuppressWarnings("unused") // thrown by Erase.
500 default IOStream<T> peek(final IOConsumer<? super T> action) throws IOException {
501 return adapt(unwrap().peek(t -> Erase.accept(action, t)));
502 }
503
504 /**
505 * Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
506 *
507 * @param accumulator Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
508 * @return Like {@link Stream#reduce(java.util.function.BinaryOperator)}.
509 * @throws IOException if an I/O error occurs.
510 */
511 @SuppressWarnings("unused") // thrown by Erase.
512 default Optional<T> reduce(final IOBinaryOperator<T> accumulator) throws IOException {
513 return unwrap().reduce((t, u) -> Erase.apply(accumulator, t, u));
514 }
515
516 /**
517 * Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
518 *
519 * @param identity Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
520 * @param accumulator Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
521 * @return Like {@link Stream#reduce(Object, java.util.function.BinaryOperator)}.
522 * @throws IOException if an I/O error occurs.
523 */
524 @SuppressWarnings("unused") // thrown by Erase.
525 default T reduce(final T identity, final IOBinaryOperator<T> accumulator) throws IOException {
526 return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u));
527 }
528
529 /**
530 * Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
531 *
532 * @param <U> Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
533 * @param identity Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
534 * @param accumulator Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
535 * @param combiner Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
536 * @return Like {@link Stream#reduce(Object, BiFunction, java.util.function.BinaryOperator)}.
537 * @throws IOException if an I/O error occurs.
538 */
539 @SuppressWarnings("unused") // thrown by Erase.
540 default <U> U reduce(final U identity, final IOBiFunction<U, ? super T, U> accumulator, final IOBinaryOperator<U> combiner) throws IOException {
541 return unwrap().reduce(identity, (t, u) -> Erase.apply(accumulator, t, u), (t, u) -> Erase.apply(combiner, t, u));
542 }
543
544 /**
545 * Like {@link Stream#skip(long)}.
546 *
547 * @param n Like {@link Stream#skip(long)}.
548 * @return Like {@link Stream#skip(long)}.
549 */
550 default IOStream<T> skip(final long n) {
551 return adapt(unwrap().skip(n));
552 }
553
554 /**
555 * Like {@link Stream#sorted()}.
556 *
557 * @return Like {@link Stream#sorted()}.
558 */
559 default IOStream<T> sorted() {
560 return adapt(unwrap().sorted());
561 }
562
563 /**
564 * Like {@link Stream#sorted(java.util.Comparator)}.
565 *
566 * @param comparator Like {@link Stream#sorted(java.util.Comparator)}.
567 * @return Like {@link Stream#sorted(java.util.Comparator)}.
568 * @throws IOException if an I/O error occurs.
569 */
570 @SuppressWarnings("unused") // thrown by Erase.
571 default IOStream<T> sorted(final IOComparator<? super T> comparator) throws IOException {
572 return adapt(unwrap().sorted((t, u) -> Erase.compare(comparator, t, u)));
573 }
574
575 /**
576 * Like {@link Stream#toArray()}.
577 *
578 * @return {@link Stream#toArray()}.
579 */
580 default Object[] toArray() {
581 return unwrap().toArray();
582 }
583
584 /**
585 * TODO Package-private for now, needs IOIntFunction?
586 *
587 * Adding this method now and an IO version later is an issue because call sites would have to type-cast to pick one. It
588 * would be ideal to have only one.
589 *
590 * Like {@link Stream#toArray(IntFunction)}.
591 *
592 * Package private for now.
593 *
594 * @param <A> Like {@link Stream#toArray(IntFunction)}.
595 * @param generator Like {@link Stream#toArray(IntFunction)}.
596 * @return Like {@link Stream#toArray(IntFunction)}.
597 */
598 default <A> A[] toArray(final IntFunction<A[]> generator) {
599 return unwrap().toArray(generator);
600 }
601
602 }