View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.io;
19  
20  import static org.junit.jupiter.api.Assertions.assertEquals;
21  import static org.junit.jupiter.api.Assertions.fail;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.EOFException;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.util.Random;
29  import java.util.concurrent.ExecutorCompletionService;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.Future;
33  import java.util.function.Supplier;
34  import java.util.zip.Inflater;
35  import java.util.zip.InflaterInputStream;
36  
37  import org.junit.jupiter.api.BeforeEach;
38  import org.junit.jupiter.api.Test;
39  
40  /**
41   * See Jira ticket IO-802.
42   */
43  public class IOUtilsMultithreadedSkipTest {
44  
45      private static final String FIXTURE = "TIKA-4065.bin";
46      long seed = 1;
47      private final ThreadLocal<byte[]> threadLocal = ThreadLocal.withInitial(() -> new byte[4096]);
48  
49      private int[] generateExpected(final InputStream is, final int[] skips) throws IOException {
50          final int[] testBytes = new int[skips.length];
51          for (int i = 0; i < skips.length; i++) {
52              try {
53                  IOUtils.skipFully(is, skips[i]);
54                  testBytes[i] = is.read();
55              } catch (final EOFException e) {
56                  testBytes[i] = -1;
57              }
58          }
59          return testBytes;
60      }
61  
62      private int[] generateSkips(final byte[] bytes, final int numSkips, final Random random) {
63          final int[] skips = new int[numSkips];
64          for (int i = 0; i < skips.length; i++) {
65              skips[i] = random.nextInt(bytes.length / numSkips) + bytes.length / 10;
66          }
67          return skips;
68      }
69  
70      private InputStream inflate(final byte[] deflated) throws IOException {
71          final ByteArrayOutputStream bos = new ByteArrayOutputStream();
72          IOUtils.copy(new InflaterInputStream(new ByteArrayInputStream(deflated), new Inflater(true)), bos);
73          return new ByteArrayInputStream(bos.toByteArray());
74      }
75  
76      @BeforeEach
77      public void setUp() {
78          // Not the best random we can use but good enough here.
79          seed = new Random().nextLong();
80      }
81  
82      private void testSkipFullyOnInflaterInputStream(final Supplier<byte[]> baSupplier) throws Exception {
83          final long thisSeed = seed;
84          // thisSeed = -727624427837034313l;
85          final Random random = new Random(thisSeed);
86          final byte[] bytes;
87          try (final InputStream inputStream = getClass().getResourceAsStream(FIXTURE)) {
88              bytes = IOUtils.toByteArray(inputStream);
89          }
90          final int numSkips = random.nextInt(bytes.length) / 100 + 1;
91  
92          final int[] skips = generateSkips(bytes, numSkips, random);
93          final int[] expected;
94          try (final InputStream inflate = inflate(bytes)) {
95              expected = generateExpected(inflate, skips);
96          }
97  
98          final int numThreads = 2;
99          final int iterations = 100;
100         final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
101         final ExecutorCompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
102 
103         for (int i = 0; i < numThreads; i++) {
104             executorCompletionService.submit(() -> {
105                 for (int iteration = 0; iteration < iterations; iteration++) {
106                     try (InputStream is = new InflaterInputStream(new ByteArrayInputStream(bytes), new Inflater(true))) {
107                         for (int skipIndex = 0; skipIndex < skips.length; skipIndex++) {
108                             try {
109                                 IOUtils.skipFully(is, skips[skipIndex], baSupplier);
110                                 final int c = is.read();
111                                 assertEquals(expected[skipIndex], c, "failed on seed=" + seed + " iteration=" + iteration);
112                             } catch (final EOFException e) {
113                                 assertEquals(expected[skipIndex], is.read(), "failed on " + "seed=" + seed + " iteration=" + iteration);
114                             }
115                         }
116                     }
117                 }
118                 return 1;
119             });
120         }
121 
122         int finished = 0;
123         while (finished < numThreads) {
124             // blocking
125             final Future<Integer> future = executorCompletionService.take();
126             try {
127                 future.get();
128             } catch (final Exception e) {
129                 // printStackTrace() for simpler debugging
130                 e.printStackTrace();
131                 fail("failed on seed=" + seed);
132             }
133             finished++;
134         }
135     }
136 
137     @Test
138     public void testSkipFullyOnInflaterInputStream_New_bytes() throws Exception {
139         testSkipFullyOnInflaterInputStream(() -> new byte[4096]);
140     }
141 
142     @Test
143     public void testSkipFullyOnInflaterInputStream_ThreadLocal() throws Exception {
144         testSkipFullyOnInflaterInputStream(threadLocal::get);
145     }
146 
147 }