1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.commons.io;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21
22 import java.io.ByteArrayInputStream;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.Random;
28
29 import org.apache.commons.io.input.DemuxInputStream;
30 import org.apache.commons.io.output.ByteArrayOutputStream;
31 import org.apache.commons.io.output.DemuxOutputStream;
32 import org.junit.Test;
33
34
35
36
37 public class DemuxTestCase {
38 private static final String T1 = "Thread1";
39 private static final String T2 = "Thread2";
40 private static final String T3 = "Thread3";
41 private static final String T4 = "Thread4";
42
43 private static final String DATA1 = "Data for thread1";
44 private static final String DATA2 = "Data for thread2";
45 private static final String DATA3 = "Data for thread3";
46 private static final String DATA4 = "Data for thread4";
47
48 private static final Random c_random = new Random();
49 private final HashMap<String, ByteArrayOutputStream> m_outputMap = new HashMap<String, ByteArrayOutputStream>();
50 private final HashMap<String, Thread> m_threadMap = new HashMap<String, Thread>();
51
52 private String getOutput( final String threadName )
53 {
54 final ByteArrayOutputStream output =
55 m_outputMap.get( threadName );
56 assertNotNull( "getOutput()", output );
57
58 return output.toString(Charsets.UTF_8);
59 }
60
61 private String getInput( final String threadName )
62 {
63 final ReaderThread thread = (ReaderThread)m_threadMap.get( threadName );
64 assertNotNull( "getInput()", thread );
65
66 return thread.getData();
67 }
68
69 private void doStart()
70 throws Exception
71 {
72 final Iterator<String> iterator = m_threadMap.keySet().iterator();
73 while( iterator.hasNext() )
74 {
75 final String name = iterator.next();
76 final Thread thread = m_threadMap.get( name );
77 thread.start();
78 }
79 }
80
81 private void doJoin()
82 throws Exception
83 {
84 final Iterator<String> iterator = m_threadMap.keySet().iterator();
85 while( iterator.hasNext() )
86 {
87 final String name = iterator.next();
88 final Thread thread = m_threadMap.get( name );
89 thread.join();
90 }
91 }
92
93 private void startWriter( final String name,
94 final String data,
95 final DemuxOutputStream demux )
96 throws Exception
97 {
98 final ByteArrayOutputStream output = new ByteArrayOutputStream();
99 m_outputMap.put( name, output );
100 final WriterThread thread =
101 new WriterThread( name, data, output, demux );
102 m_threadMap.put( name, thread );
103 }
104
105 private void startReader( final String name,
106 final String data,
107 final DemuxInputStream demux )
108 throws Exception
109 {
110 final ByteArrayInputStream input = new ByteArrayInputStream( data.getBytes() );
111 final ReaderThread thread = new ReaderThread( name, input, demux );
112 m_threadMap.put( name, thread );
113 }
114
115 @Test
116 public void testOutputStream()
117 throws Exception
118 {
119 final DemuxOutputStream output = new DemuxOutputStream();
120 startWriter( T1, DATA1, output );
121 startWriter( T2, DATA2, output );
122 startWriter( T3, DATA3, output );
123 startWriter( T4, DATA4, output );
124
125 doStart();
126 doJoin();
127
128 assertEquals( "Data1", DATA1, getOutput( T1 ) );
129 assertEquals( "Data2", DATA2, getOutput( T2 ) );
130 assertEquals( "Data3", DATA3, getOutput( T3 ) );
131 assertEquals( "Data4", DATA4, getOutput( T4 ) );
132 }
133
134 @Test
135 public void testInputStream()
136 throws Exception
137 {
138 final DemuxInputStream input = new DemuxInputStream();
139 startReader( T1, DATA1, input );
140 startReader( T2, DATA2, input );
141 startReader( T3, DATA3, input );
142 startReader( T4, DATA4, input );
143
144 doStart();
145 doJoin();
146
147 assertEquals( "Data1", DATA1, getInput( T1 ) );
148 assertEquals( "Data2", DATA2, getInput( T2 ) );
149 assertEquals( "Data3", DATA3, getInput( T3 ) );
150 assertEquals( "Data4", DATA4, getInput( T4 ) );
151 }
152
153 private static class ReaderThread
154 extends Thread
155 {
156 private final StringBuffer m_buffer = new StringBuffer();
157 private final InputStream m_input;
158 private final DemuxInputStream m_demux;
159
160 ReaderThread( final String name,
161 final InputStream input,
162 final DemuxInputStream demux )
163 {
164 super( name );
165 m_input = input;
166 m_demux = demux;
167 }
168
169 public String getData()
170 {
171 return m_buffer.toString();
172 }
173
174 @Override
175 public void run()
176 {
177 m_demux.bindStream( m_input );
178
179 try
180 {
181 int ch = m_demux.read();
182 while( -1 != ch )
183 {
184
185 m_buffer.append( (char)ch );
186
187 final int sleepTime = Math.abs( c_random.nextInt() % 10 );
188 Thread.sleep( sleepTime );
189 ch = m_demux.read();
190 }
191 }
192 catch( final Exception e )
193 {
194 e.printStackTrace();
195 }
196 }
197 }
198
199 private static class WriterThread
200 extends Thread
201 {
202 private final byte[] m_data;
203 private final OutputStream m_output;
204 private final DemuxOutputStream m_demux;
205
206 WriterThread( final String name,
207 final String data,
208 final OutputStream output,
209 final DemuxOutputStream demux )
210 {
211 super( name );
212 m_output = output;
213 m_demux = demux;
214 m_data = data.getBytes();
215 }
216
217 @Override
218 public void run()
219 {
220 m_demux.bindStream( m_output );
221 for (final byte element : m_data) {
222 try
223 {
224
225 m_demux.write( element );
226 final int sleepTime = Math.abs( c_random.nextInt() % 10 );
227 Thread.sleep( sleepTime );
228 }
229 catch( final Exception e )
230 {
231 e.printStackTrace();
232 }
233 }
234 }
235 }
236 }
237