View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNotNull;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.HashMap;
26  import java.util.Random;
27  
28  import org.apache.commons.io.input.DemuxInputStream;
29  import org.apache.commons.io.output.ByteArrayOutputStream;
30  import org.apache.commons.io.output.DemuxOutputStream;
31  import org.apache.commons.io.testtools.TestUtils;
32  import org.junit.Test;
33  
34  /**
35   * Basic unit tests for the multiplexing streams.
36   */
37  public class DemuxTestCase {
38      private static final String T1 = "Thread1";
39      private static final String T2 = "Thread2";
40      private static final String T3 = "Thread3";
41      private static final String T4 = "Thread4";
42  
43      private static final String DATA1 = "Data for thread1";
44      private static final String DATA2 = "Data for thread2";
45      private static final String DATA3 = "Data for thread3";
46      private static final String DATA4 = "Data for thread4";
47  
48      private static final Random c_random = new Random();
49      private final HashMap<String, ByteArrayOutputStream> m_outputMap = new HashMap<String, ByteArrayOutputStream>();
50      private final HashMap<String, Thread> m_threadMap = new HashMap<String, Thread>();
51  
52      @SuppressWarnings("deprecation") // unavoidable until Java 7
53      private String getOutput(final String threadName) {
54          final ByteArrayOutputStream output =
55                  m_outputMap.get(threadName);
56          assertNotNull("getOutput()", output);
57  
58          return output.toString(Charsets.UTF_8);
59      }
60  
61      private String getInput(final String threadName) {
62          final ReaderThread thread = (ReaderThread) m_threadMap.get(threadName);
63          assertNotNull("getInput()", thread);
64  
65          return thread.getData();
66      }
67  
68      private void doStart()
69              throws Exception {
70          for (String name : m_threadMap.keySet()) {
71              final Thread thread = m_threadMap.get(name);
72              thread.start();
73          }
74      }
75  
76      private void doJoin()
77              throws Exception {
78          for (String name : m_threadMap.keySet()) {
79              final Thread thread = m_threadMap.get(name);
80              thread.join();
81          }
82      }
83  
84      private void startWriter(final String name,
85                               final String data,
86                               final DemuxOutputStream demux)
87              throws Exception {
88          final ByteArrayOutputStream output = new ByteArrayOutputStream();
89          m_outputMap.put(name, output);
90          final WriterThread thread =
91                  new WriterThread(name, data, output, demux);
92          m_threadMap.put(name, thread);
93      }
94  
95      private void startReader(final String name,
96                               final String data,
97                               final DemuxInputStream demux)
98              throws Exception {
99          final ByteArrayInputStream input = new ByteArrayInputStream(data.getBytes());
100         final ReaderThread thread = new ReaderThread(name, input, demux);
101         m_threadMap.put(name, thread);
102     }
103 
104     @Test
105     public void testOutputStream()
106             throws Exception {
107         final DemuxOutputStream output = new DemuxOutputStream();
108         startWriter(T1, DATA1, output);
109         startWriter(T2, DATA2, output);
110         startWriter(T3, DATA3, output);
111         startWriter(T4, DATA4, output);
112 
113         doStart();
114         doJoin();
115 
116         assertEquals("Data1", DATA1, getOutput(T1));
117         assertEquals("Data2", DATA2, getOutput(T2));
118         assertEquals("Data3", DATA3, getOutput(T3));
119         assertEquals("Data4", DATA4, getOutput(T4));
120     }
121 
122     @Test
123     public void testInputStream()
124             throws Exception {
125         final DemuxInputStream input = new DemuxInputStream();
126         startReader(T1, DATA1, input);
127         startReader(T2, DATA2, input);
128         startReader(T3, DATA3, input);
129         startReader(T4, DATA4, input);
130 
131         doStart();
132         doJoin();
133 
134         assertEquals("Data1", DATA1, getInput(T1));
135         assertEquals("Data2", DATA2, getInput(T2));
136         assertEquals("Data3", DATA3, getInput(T3));
137         assertEquals("Data4", DATA4, getInput(T4));
138     }
139 
140     private static class ReaderThread
141             extends Thread {
142         private final StringBuffer m_buffer = new StringBuffer();
143         private final InputStream m_input;
144         private final DemuxInputStream m_demux;
145 
146         ReaderThread(final String name,
147                      final InputStream input,
148                      final DemuxInputStream demux) {
149             super(name);
150             m_input = input;
151             m_demux = demux;
152         }
153 
154         public String getData() {
155             return m_buffer.toString();
156         }
157 
158         @Override
159         public void run() {
160             m_demux.bindStream(m_input);
161 
162             try {
163                 int ch = m_demux.read();
164                 while (-1 != ch) {
165                     //System.out.println( "Reading: " + (char)ch );
166                     m_buffer.append((char) ch);
167 
168                     final int sleepTime = Math.abs(c_random.nextInt() % 10);
169                     TestUtils.sleep(sleepTime);
170                     ch = m_demux.read();
171                 }
172             } catch (final Exception e) {
173                 e.printStackTrace();
174             }
175         }
176     }
177 
178     private static class WriterThread
179             extends Thread {
180         private final byte[] m_data;
181         private final OutputStream m_output;
182         private final DemuxOutputStream m_demux;
183 
184         WriterThread(final String name,
185                      final String data,
186                      final OutputStream output,
187                      final DemuxOutputStream demux) {
188             super(name);
189             m_output = output;
190             m_demux = demux;
191             m_data = data.getBytes();
192         }
193 
194         @Override
195         public void run() {
196             m_demux.bindStream(m_output);
197             for (final byte element : m_data) {
198                 try {
199                     //System.out.println( "Writing: " + (char)m_data[ i ] );
200                     m_demux.write(element);
201                     final int sleepTime = Math.abs(c_random.nextInt() % 10);
202                     TestUtils.sleep(sleepTime);
203                 } catch (final Exception e) {
204                     e.printStackTrace();
205                 }
206             }
207         }
208     }
209 }
210