1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.io;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Random;
26  
27  import junit.framework.TestCase;
28  
29  import org.apache.commons.io.input.DemuxInputStream;
30  import org.apache.commons.io.output.ByteArrayOutputStream;
31  import org.apache.commons.io.output.DemuxOutputStream;
32  
33  /**
34   * Basic unit tests for the multiplexing streams.
35   *
36   * @author <a href="mailto:peter@apache.org">Peter Donald</a>
37   */
38  public class DemuxTestCase
39      extends TestCase
40  {
41      private static final String T1 = "Thread1";
42      private static final String T2 = "Thread2";
43      private static final String T3 = "Thread3";
44      private static final String T4 = "Thread4";
45  
46      private static final String DATA1 = "Data for thread1";
47      private static final String DATA2 = "Data for thread2";
48      private static final String DATA3 = "Data for thread3";
49      private static final String DATA4 = "Data for thread4";
50  
51      private static Random c_random = new Random();
52      private HashMap m_outputMap = new HashMap();
53      private HashMap m_threadMap = new HashMap();
54  
55      public DemuxTestCase( String name )
56      {
57          super( name );
58      }
59  
60      private String getOutput( String threadName )
61          throws IOException
62      {
63          ByteArrayOutputStream output =
64              (ByteArrayOutputStream)m_outputMap.get( threadName );
65          assertNotNull( "getOutput()", output );
66  
67          return output.toString();
68      }
69  
70      private String getInput( String threadName )
71          throws IOException
72      {
73          ReaderThread thread = (ReaderThread)m_threadMap.get( threadName );
74          assertNotNull( "getInput()", thread );
75  
76          return thread.getData();
77      }
78  
79      private void doStart()
80          throws Exception
81      {
82          Iterator iterator = m_threadMap.keySet().iterator();
83          while( iterator.hasNext() )
84          {
85              String name = (String)iterator.next();
86              Thread thread = (Thread)m_threadMap.get( name );
87              thread.start();
88          }
89      }
90  
91      private void doJoin()
92          throws Exception
93      {
94          Iterator iterator = m_threadMap.keySet().iterator();
95          while( iterator.hasNext() )
96          {
97              String name = (String)iterator.next();
98              Thread thread = (Thread)m_threadMap.get( name );
99              thread.join();
100         }
101     }
102 
103     private void startWriter( String name,
104                               String data,
105                               DemuxOutputStream demux )
106         throws Exception
107     {
108         ByteArrayOutputStream output = new ByteArrayOutputStream();
109         m_outputMap.put( name, output );
110         WriterThread thread =
111             new WriterThread( name, data, output, demux );
112         m_threadMap.put( name, thread );
113     }
114 
115     private void startReader( String name,
116                               String data,
117                               DemuxInputStream demux )
118         throws Exception
119     {
120         ByteArrayInputStream input = new ByteArrayInputStream( data.getBytes() );
121         ReaderThread thread = new ReaderThread( name, input, demux );
122         m_threadMap.put( name, thread );
123     }
124 
125     public void testOutputStream()
126         throws Exception
127     {
128         DemuxOutputStream output = new DemuxOutputStream();
129         startWriter( T1, DATA1, output );
130         startWriter( T2, DATA2, output );
131         startWriter( T3, DATA3, output );
132         startWriter( T4, DATA4, output );
133 
134         doStart();
135         doJoin();
136 
137         assertEquals( "Data1", DATA1, getOutput( T1 ) );
138         assertEquals( "Data2", DATA2, getOutput( T2 ) );
139         assertEquals( "Data3", DATA3, getOutput( T3 ) );
140         assertEquals( "Data4", DATA4, getOutput( T4 ) );
141     }
142 
143     public void testInputStream()
144         throws Exception
145     {
146         DemuxInputStream input = new DemuxInputStream();
147         startReader( T1, DATA1, input );
148         startReader( T2, DATA2, input );
149         startReader( T3, DATA3, input );
150         startReader( T4, DATA4, input );
151 
152         doStart();
153         doJoin();
154 
155         assertEquals( "Data1", DATA1, getInput( T1 ) );
156         assertEquals( "Data2", DATA2, getInput( T2 ) );
157         assertEquals( "Data3", DATA3, getInput( T3 ) );
158         assertEquals( "Data4", DATA4, getInput( T4 ) );
159     }
160 
161     private static class ReaderThread
162         extends Thread
163     {
164         private StringBuffer m_buffer = new StringBuffer();
165         private InputStream m_input;
166         private DemuxInputStream m_demux;
167 
168         ReaderThread( String name,
169                       InputStream input,
170                       DemuxInputStream demux )
171         {
172             super( name );
173             m_input = input;
174             m_demux = demux;
175         }
176 
177         public String getData()
178         {
179             return m_buffer.toString();
180         }
181 
182         public void run()
183         {
184             m_demux.bindStream( m_input );
185 
186             try
187             {
188                 int ch = m_demux.read();
189                 while( -1 != ch )
190                 {
191                     //System.out.println( "Reading: " + (char)ch );
192                     m_buffer.append( (char)ch );
193 
194                     int sleepTime = Math.abs( c_random.nextInt() % 10 );
195                     Thread.sleep( sleepTime );
196                     ch = m_demux.read();
197                 }
198             }
199             catch( Exception e )
200             {
201                 e.printStackTrace();
202             }
203         }
204     }
205 
206     private static class WriterThread
207         extends Thread
208     {
209         private byte[] m_data;
210         private OutputStream m_output;
211         private DemuxOutputStream m_demux;
212 
213         WriterThread( String name,
214                       String data,
215                       OutputStream output,
216                       DemuxOutputStream demux )
217         {
218             super( name );
219             m_output = output;
220             m_demux = demux;
221             m_data = data.getBytes();
222         }
223 
224         public void run()
225         {
226             m_demux.bindStream( m_output );
227             for( int i = 0; i < m_data.length; i++ )
228             {
229                 try
230                 {
231                     //System.out.println( "Writing: " + (char)m_data[ i ] );
232                     m_demux.write( m_data[ i ] );
233                     int sleepTime = Math.abs( c_random.nextInt() % 10 );
234                     Thread.sleep( sleepTime );
235                 }
236                 catch( Exception e )
237                 {
238                     e.printStackTrace();
239                 }
240             }
241         }
242     }
243 }
244