1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.Random;
26
27 import junit.framework.TestCase;
28
29 import org.apache.commons.io.input.DemuxInputStream;
30 import org.apache.commons.io.output.ByteArrayOutputStream;
31 import org.apache.commons.io.output.DemuxOutputStream;
32
33
34
35
36
37
38 public class DemuxTestCase
39 extends TestCase
40 {
41 private static final String T1 = "Thread1";
42 private static final String T2 = "Thread2";
43 private static final String T3 = "Thread3";
44 private static final String T4 = "Thread4";
45
46 private static final String DATA1 = "Data for thread1";
47 private static final String DATA2 = "Data for thread2";
48 private static final String DATA3 = "Data for thread3";
49 private static final String DATA4 = "Data for thread4";
50
51 private static Random c_random = new Random();
52 private HashMap m_outputMap = new HashMap();
53 private HashMap m_threadMap = new HashMap();
54
55 public DemuxTestCase( String name )
56 {
57 super( name );
58 }
59
60 private String getOutput( String threadName )
61 throws IOException
62 {
63 ByteArrayOutputStream output =
64 (ByteArrayOutputStream)m_outputMap.get( threadName );
65 assertNotNull( "getOutput()", output );
66
67 return output.toString();
68 }
69
70 private String getInput( String threadName )
71 throws IOException
72 {
73 ReaderThread thread = (ReaderThread)m_threadMap.get( threadName );
74 assertNotNull( "getInput()", thread );
75
76 return thread.getData();
77 }
78
79 private void doStart()
80 throws Exception
81 {
82 Iterator iterator = m_threadMap.keySet().iterator();
83 while( iterator.hasNext() )
84 {
85 String name = (String)iterator.next();
86 Thread thread = (Thread)m_threadMap.get( name );
87 thread.start();
88 }
89 }
90
91 private void doJoin()
92 throws Exception
93 {
94 Iterator iterator = m_threadMap.keySet().iterator();
95 while( iterator.hasNext() )
96 {
97 String name = (String)iterator.next();
98 Thread thread = (Thread)m_threadMap.get( name );
99 thread.join();
100 }
101 }
102
103 private void startWriter( String name,
104 String data,
105 DemuxOutputStream demux )
106 throws Exception
107 {
108 ByteArrayOutputStream output = new ByteArrayOutputStream();
109 m_outputMap.put( name, output );
110 WriterThread thread =
111 new WriterThread( name, data, output, demux );
112 m_threadMap.put( name, thread );
113 }
114
115 private void startReader( String name,
116 String data,
117 DemuxInputStream demux )
118 throws Exception
119 {
120 ByteArrayInputStream input = new ByteArrayInputStream( data.getBytes() );
121 ReaderThread thread = new ReaderThread( name, input, demux );
122 m_threadMap.put( name, thread );
123 }
124
125 public void testOutputStream()
126 throws Exception
127 {
128 DemuxOutputStream output = new DemuxOutputStream();
129 startWriter( T1, DATA1, output );
130 startWriter( T2, DATA2, output );
131 startWriter( T3, DATA3, output );
132 startWriter( T4, DATA4, output );
133
134 doStart();
135 doJoin();
136
137 assertEquals( "Data1", DATA1, getOutput( T1 ) );
138 assertEquals( "Data2", DATA2, getOutput( T2 ) );
139 assertEquals( "Data3", DATA3, getOutput( T3 ) );
140 assertEquals( "Data4", DATA4, getOutput( T4 ) );
141 }
142
143 public void testInputStream()
144 throws Exception
145 {
146 DemuxInputStream input = new DemuxInputStream();
147 startReader( T1, DATA1, input );
148 startReader( T2, DATA2, input );
149 startReader( T3, DATA3, input );
150 startReader( T4, DATA4, input );
151
152 doStart();
153 doJoin();
154
155 assertEquals( "Data1", DATA1, getInput( T1 ) );
156 assertEquals( "Data2", DATA2, getInput( T2 ) );
157 assertEquals( "Data3", DATA3, getInput( T3 ) );
158 assertEquals( "Data4", DATA4, getInput( T4 ) );
159 }
160
161 private static class ReaderThread
162 extends Thread
163 {
164 private StringBuffer m_buffer = new StringBuffer();
165 private InputStream m_input;
166 private DemuxInputStream m_demux;
167
168 ReaderThread( String name,
169 InputStream input,
170 DemuxInputStream demux )
171 {
172 super( name );
173 m_input = input;
174 m_demux = demux;
175 }
176
177 public String getData()
178 {
179 return m_buffer.toString();
180 }
181
182 public void run()
183 {
184 m_demux.bindStream( m_input );
185
186 try
187 {
188 int ch = m_demux.read();
189 while( -1 != ch )
190 {
191
192 m_buffer.append( (char)ch );
193
194 int sleepTime = Math.abs( c_random.nextInt() % 10 );
195 Thread.sleep( sleepTime );
196 ch = m_demux.read();
197 }
198 }
199 catch( Exception e )
200 {
201 e.printStackTrace();
202 }
203 }
204 }
205
206 private static class WriterThread
207 extends Thread
208 {
209 private byte[] m_data;
210 private OutputStream m_output;
211 private DemuxOutputStream m_demux;
212
213 WriterThread( String name,
214 String data,
215 OutputStream output,
216 DemuxOutputStream demux )
217 {
218 super( name );
219 m_output = output;
220 m_demux = demux;
221 m_data = data.getBytes();
222 }
223
224 public void run()
225 {
226 m_demux.bindStream( m_output );
227 for( int i = 0; i < m_data.length; i++ )
228 {
229 try
230 {
231
232 m_demux.write( m_data[ i ] );
233 int sleepTime = Math.abs( c_random.nextInt() % 10 );
234 Thread.sleep( sleepTime );
235 }
236 catch( Exception e )
237 {
238 e.printStackTrace();
239 }
240 }
241 }
242 }
243 }
244