View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import static org.junit.jupiter.api.Assertions.assertEquals;
20  import static org.junit.jupiter.api.Assertions.assertNotNull;
21  
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.nio.charset.StandardCharsets;
25  import java.util.HashMap;
26  import java.util.Random;
27  
28  import org.apache.commons.io.input.CharSequenceInputStream;
29  import org.apache.commons.io.input.DemuxInputStream;
30  import org.apache.commons.io.output.ByteArrayOutputStream;
31  import org.apache.commons.io.output.DemuxOutputStream;
32  import org.apache.commons.io.test.TestUtils;
33  import org.junit.jupiter.api.Test;
34  
35  /**
36   * Tests {@link DemuxInputStream}.
37   */
38  public class DemuxInputStreamTest {
39  
40      private static final class ReaderThread extends Thread {
41          private final DemuxInputStream demuxInputStream;
42          private final InputStream inputStream;
43          private final StringBuffer stringBuffer = new StringBuffer();
44  
45          ReaderThread(final String name, final InputStream input, final DemuxInputStream demux) {
46              super(name);
47              inputStream = input;
48              demuxInputStream = demux;
49          }
50  
51          public String getData() {
52              return stringBuffer.toString();
53          }
54  
55          @Override
56          public void run() {
57              demuxInputStream.bindStream(inputStream);
58  
59              try {
60                  int ch = demuxInputStream.read();
61                  while (-1 != ch) {
62                      // System.out.println( "Reading: " + (char)ch );
63                      stringBuffer.append((char) ch);
64  
65                      final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
66                      TestUtils.sleep(sleepMillis);
67                      ch = demuxInputStream.read();
68                  }
69              } catch (final Exception e) {
70                  e.printStackTrace();
71              }
72          }
73      }
74  
75      private static final class WriterThread extends Thread {
76          private final byte[] byteArray;
77          private final DemuxOutputStream demuxOutputStream;
78          private final OutputStream outputStream;
79  
80          WriterThread(final String name, final String data, final OutputStream output, final DemuxOutputStream demux) {
81              super(name);
82              outputStream = output;
83              demuxOutputStream = demux;
84              byteArray = data.getBytes();
85          }
86  
87          @Override
88          public void run() {
89              demuxOutputStream.bindStream(outputStream);
90              for (final byte element : byteArray) {
91                  try {
92                      // System.out.println( "Writing: " + (char)byteArray[ i ] );
93                      demuxOutputStream.write(element);
94                      final int sleepMillis = Math.abs(RANDOM.nextInt() % 10);
95                      TestUtils.sleep(sleepMillis);
96                  } catch (final Exception e) {
97                      e.printStackTrace();
98                  }
99              }
100         }
101     }
102 
103     private static final Random RANDOM = new Random();
104     private static final String DATA1 = "Data for thread1";
105 
106     private static final String DATA2 = "Data for thread2";
107     private static final String DATA3 = "Data for thread3";
108     private static final String DATA4 = "Data for thread4";
109     private static final String T1 = "Thread1";
110 
111     private static final String T2 = "Thread2";
112     private static final String T3 = "Thread3";
113     private static final String T4 = "Thread4";
114 
115     private final HashMap<String, ByteArrayOutputStream> outputMap = new HashMap<>();
116 
117     private final HashMap<String, Thread> threadMap = new HashMap<>();
118 
119     private void doJoin() throws InterruptedException {
120         for (final String name : threadMap.keySet()) {
121             final Thread thread = threadMap.get(name);
122             thread.join();
123         }
124     }
125 
126     private void doStart() {
127         threadMap.keySet().forEach(name -> threadMap.get(name).start());
128     }
129 
130     private String getInput(final String threadName) {
131         final ReaderThread thread = (ReaderThread) threadMap.get(threadName);
132         assertNotNull(thread, "getInput()");
133         return thread.getData();
134     }
135 
136     private String getOutput(final String threadName) {
137         final ByteArrayOutputStream output = outputMap.get(threadName);
138         assertNotNull(output, "getOutput()");
139         return output.toString(StandardCharsets.UTF_8);
140     }
141 
142     private void startReader(final String name, final String data, final DemuxInputStream demux) {
143         final InputStream input = CharSequenceInputStream.builder().setCharSequence(data).get();
144         final ReaderThread thread = new ReaderThread(name, input, demux);
145         threadMap.put(name, thread);
146     }
147 
148     private void startWriter(final String name, final String data, final DemuxOutputStream demux) {
149         final ByteArrayOutputStream output = new ByteArrayOutputStream();
150         outputMap.put(name, output);
151         final WriterThread thread = new WriterThread(name, data, output, demux);
152         threadMap.put(name, thread);
153     }
154 
155     @Test
156     public void testInputStream() throws Exception {
157         try (final DemuxInputStream input = new DemuxInputStream()) {
158             startReader(T1, DATA1, input);
159             startReader(T2, DATA2, input);
160             startReader(T3, DATA3, input);
161             startReader(T4, DATA4, input);
162 
163             doStart();
164             doJoin();
165 
166             assertEquals(DATA1, getInput(T1), "Data1");
167             assertEquals(DATA2, getInput(T2), "Data2");
168             assertEquals(DATA3, getInput(T3), "Data3");
169             assertEquals(DATA4, getInput(T4), "Data4");
170         }
171     }
172 
173     @Test
174     public void testOutputStream() throws Exception {
175         try (final DemuxOutputStream output = new DemuxOutputStream()) {
176             startWriter(T1, DATA1, output);
177             startWriter(T2, DATA2, output);
178             startWriter(T3, DATA3, output);
179             startWriter(T4, DATA4, output);
180 
181             doStart();
182             doJoin();
183 
184             assertEquals(DATA1, getOutput(T1), "Data1");
185             assertEquals(DATA2, getOutput(T2), "Data2");
186             assertEquals(DATA3, getOutput(T3), "Data3");
187             assertEquals(DATA4, getOutput(T4), "Data4");
188         }
189     }
190 
191     @Test
192     public void testReadEOF() throws Exception {
193         try (final DemuxInputStream input = new DemuxInputStream()) {
194             assertEquals(IOUtils.EOF, input.read());
195         }
196     }
197 }