View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import static org.junit.Assert.assertEquals;
20  import static org.junit.Assert.assertNotNull;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.Random;
28  
29  import org.apache.commons.io.input.DemuxInputStream;
30  import org.apache.commons.io.output.ByteArrayOutputStream;
31  import org.apache.commons.io.output.DemuxOutputStream;
32  import org.junit.Test;
33  
34  /**
35   * Basic unit tests for the multiplexing streams.
36   */
37  public class DemuxTestCase {
38      private static final String T1 = "Thread1";
39      private static final String T2 = "Thread2";
40      private static final String T3 = "Thread3";
41      private static final String T4 = "Thread4";
42  
43      private static final String DATA1 = "Data for thread1";
44      private static final String DATA2 = "Data for thread2";
45      private static final String DATA3 = "Data for thread3";
46      private static final String DATA4 = "Data for thread4";
47  
48      private static final Random c_random = new Random();
49      private final HashMap<String, ByteArrayOutputStream> m_outputMap = new HashMap<String, ByteArrayOutputStream>();
50      private final HashMap<String, Thread> m_threadMap = new HashMap<String, Thread>();
51  
52      private String getOutput( final String threadName )
53      {
54          final ByteArrayOutputStream output =
55              m_outputMap.get( threadName );
56          assertNotNull( "getOutput()", output );
57  
58          return output.toString(Charsets.UTF_8);
59      }
60  
61      private String getInput( final String threadName )
62      {
63          final ReaderThread thread = (ReaderThread)m_threadMap.get( threadName );
64          assertNotNull( "getInput()", thread );
65  
66          return thread.getData();
67      }
68  
69      private void doStart()
70          throws Exception
71      {
72          final Iterator<String> iterator = m_threadMap.keySet().iterator();
73          while( iterator.hasNext() )
74          {
75              final String name = iterator.next();
76              final Thread thread = m_threadMap.get( name );
77              thread.start();
78          }
79      }
80  
81      private void doJoin()
82          throws Exception
83      {
84          final Iterator<String> iterator = m_threadMap.keySet().iterator();
85          while( iterator.hasNext() )
86          {
87              final String name = iterator.next();
88              final Thread thread = m_threadMap.get( name );
89              thread.join();
90          }
91      }
92  
93      private void startWriter( final String name,
94                                final String data,
95                                final DemuxOutputStream demux )
96          throws Exception
97      {
98          final ByteArrayOutputStream output = new ByteArrayOutputStream();
99          m_outputMap.put( name, output );
100         final WriterThread thread =
101             new WriterThread( name, data, output, demux );
102         m_threadMap.put( name, thread );
103     }
104 
105     private void startReader( final String name,
106                               final String data,
107                               final DemuxInputStream demux )
108         throws Exception
109     {
110         final ByteArrayInputStream input = new ByteArrayInputStream( data.getBytes() );
111         final ReaderThread thread = new ReaderThread( name, input, demux );
112         m_threadMap.put( name, thread );
113     }
114 
115     @Test
116     public void testOutputStream()
117         throws Exception
118     {
119         final DemuxOutputStream output = new DemuxOutputStream();
120         startWriter( T1, DATA1, output );
121         startWriter( T2, DATA2, output );
122         startWriter( T3, DATA3, output );
123         startWriter( T4, DATA4, output );
124 
125         doStart();
126         doJoin();
127 
128         assertEquals( "Data1", DATA1, getOutput( T1 ) );
129         assertEquals( "Data2", DATA2, getOutput( T2 ) );
130         assertEquals( "Data3", DATA3, getOutput( T3 ) );
131         assertEquals( "Data4", DATA4, getOutput( T4 ) );
132     }
133 
134     @Test
135     public void testInputStream()
136         throws Exception
137     {
138         final DemuxInputStream input = new DemuxInputStream();
139         startReader( T1, DATA1, input );
140         startReader( T2, DATA2, input );
141         startReader( T3, DATA3, input );
142         startReader( T4, DATA4, input );
143 
144         doStart();
145         doJoin();
146 
147         assertEquals( "Data1", DATA1, getInput( T1 ) );
148         assertEquals( "Data2", DATA2, getInput( T2 ) );
149         assertEquals( "Data3", DATA3, getInput( T3 ) );
150         assertEquals( "Data4", DATA4, getInput( T4 ) );
151     }
152 
153     private static class ReaderThread
154         extends Thread
155     {
156         private final StringBuffer m_buffer = new StringBuffer();
157         private final InputStream m_input;
158         private final DemuxInputStream m_demux;
159 
160         ReaderThread( final String name,
161                       final InputStream input,
162                       final DemuxInputStream demux )
163         {
164             super( name );
165             m_input = input;
166             m_demux = demux;
167         }
168 
169         public String getData()
170         {
171             return m_buffer.toString();
172         }
173 
174         @Override
175         public void run()
176         {
177             m_demux.bindStream( m_input );
178 
179             try
180             {
181                 int ch = m_demux.read();
182                 while( -1 != ch )
183                 {
184                     //System.out.println( "Reading: " + (char)ch );
185                     m_buffer.append( (char)ch );
186 
187                     final int sleepTime = Math.abs( c_random.nextInt() % 10 );
188                     Thread.sleep( sleepTime );
189                     ch = m_demux.read();
190                 }
191             }
192             catch( final Exception e )
193             {
194                 e.printStackTrace();
195             }
196         }
197     }
198 
199     private static class WriterThread
200         extends Thread
201     {
202         private final byte[] m_data;
203         private final OutputStream m_output;
204         private final DemuxOutputStream m_demux;
205 
206         WriterThread( final String name,
207                       final String data,
208                       final OutputStream output,
209                       final DemuxOutputStream demux )
210         {
211             super( name );
212             m_output = output;
213             m_demux = demux;
214             m_data = data.getBytes();
215         }
216 
217         @Override
218         public void run()
219         {
220             m_demux.bindStream( m_output );
221             for (final byte element : m_data) {
222                 try
223                 {
224                     //System.out.println( "Writing: " + (char)m_data[ i ] );
225                     m_demux.write( element );
226                     final int sleepTime = Math.abs( c_random.nextInt() % 10 );
227                     Thread.sleep( sleepTime );
228                 }
229                 catch( final Exception e )
230                 {
231                     e.printStackTrace();
232                 }
233             }
234         }
235     }
236 }
237