1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25 final class TelnetInputStream extends BufferedInputStream implements Runnable {
26
27 private static final int EOF = -1;
28
29
30 private static final int WOULD_BLOCK = -2;
31
32
33 static final int STATE_DATA = 0;
34 static final int STATE_IAC = 1;
35 static final int STATE_WILL = 2;
36 static final int STATE_WONT = 3;
37 static final int STATE_DO = 4;
38 static final int STATE_DONT = 5;
39 static final int STATE_SB = 6;
40 static final int STATE_SE = 7;
41 static final int STATE_CR = 8;
42 static final int STATE_IAC_SB = 9;
43 private boolean hasReachedEOF;
44 private volatile boolean isClosed;
45 private boolean readIsWaiting;
46 private int receiveState;
47 private int queueHead;
48 private int queueTail;
49 private int bytesAvailable;
50 private final int[] queue;
51 private final TelnetClient client;
52 private final Thread thread;
53 private IOException ioException;
54
55
56 private final int suboption[];
57 private int suboptionCount;
58
59
60 private volatile boolean threaded;
61
62 TelnetInputStream(final InputStream input, final TelnetClient client) {
63 this(input, client, true);
64 }
65
66 TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
67 super(input);
68 this.client = client;
69 this.receiveState = STATE_DATA;
70 this.isClosed = true;
71 this.hasReachedEOF = false;
72
73
74 this.queue = new int[2049];
75 this.queueHead = 0;
76 this.queueTail = 0;
77 this.suboption = new int[client.maxSubnegotiationLength];
78 this.bytesAvailable = 0;
79 this.ioException = null;
80 this.readIsWaiting = false;
81 this.threaded = false;
82 if (readerThread) {
83 this.thread = new Thread(this);
84 } else {
85 this.thread = null;
86 }
87 }
88
89 @Override
90 public int available() throws IOException {
91
92 synchronized (queue) {
93 if (threaded) {
94 return bytesAvailable;
95 }
96 return bytesAvailable + super.available();
97 }
98 }
99
100
101
102 @Override
103 public void close() throws IOException {
104
105
106
107
108 super.close();
109
110 synchronized (queue) {
111 hasReachedEOF = true;
112 isClosed = true;
113
114 if (thread != null && thread.isAlive()) {
115 thread.interrupt();
116 }
117
118 queue.notifyAll();
119 }
120
121 }
122
123
124 @Override
125 public boolean markSupported() {
126 return false;
127 }
128
129
130
131
132
133 private boolean processChar(final int ch) throws InterruptedException {
134
135
136 final boolean bufferWasEmpty;
137 synchronized (queue) {
138 bufferWasEmpty = bytesAvailable == 0;
139 while (bytesAvailable >= queue.length - 1) {
140
141
142 if (!threaded) {
143
144
145 throw new IllegalStateException("Queue is full! Cannot process another character.");
146 }
147 queue.notify();
148 try {
149 queue.wait();
150 } catch (final InterruptedException e) {
151 throw e;
152 }
153 }
154
155
156 if (readIsWaiting && threaded) {
157 queue.notify();
158 }
159
160 queue[queueTail] = ch;
161 ++bytesAvailable;
162
163 if (++queueTail >= queue.length) {
164 queueTail = 0;
165 }
166 }
167 return bufferWasEmpty;
168 }
169
170 @Override
171 public int read() throws IOException {
172
173
174
175 synchronized (queue) {
176
177 while (true) {
178 if (ioException != null) {
179 final IOException e;
180 e = ioException;
181 ioException = null;
182 throw e;
183 }
184
185 if (bytesAvailable == 0) {
186
187 if (hasReachedEOF) {
188 return EOF;
189 }
190
191
192 if (threaded) {
193 queue.notify();
194 try {
195 readIsWaiting = true;
196 queue.wait();
197 readIsWaiting = false;
198 } catch (final InterruptedException e) {
199 throw new InterruptedIOException("Fatal thread interruption during read.");
200 }
201 } else {
202
203 readIsWaiting = true;
204 int ch;
205 boolean mayBlock = true;
206
207 do {
208 try {
209 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
210
211 return ch;
212 }
213 } catch (final InterruptedIOException e) {
214 synchronized (queue) {
215 ioException = e;
216 queue.notifyAll();
217 try {
218 queue.wait(100);
219 } catch (final InterruptedException interrupted) {
220
221 }
222 }
223 return EOF;
224 }
225
226 try {
227 if (ch != WOULD_BLOCK) {
228 processChar(ch);
229 }
230 } catch (final InterruptedException e) {
231 if (isClosed) {
232 return EOF;
233 }
234 }
235
236
237
238 mayBlock = false;
239
240 }
241
242 while (super.available() > 0 && bytesAvailable < queue.length - 1);
243
244 readIsWaiting = false;
245 }
246 continue;
247 }
248 final int ch;
249
250 ch = queue[queueHead];
251
252 if (++queueHead >= queue.length) {
253 queueHead = 0;
254 }
255
256 --bytesAvailable;
257
258
259 if (bytesAvailable == 0 && threaded) {
260 queue.notify();
261 }
262
263 return ch;
264 }
265 }
266 }
267
268
269
270
271
272
273
274
275
276
277
278 private int read(final boolean mayBlock) throws IOException {
279 int ch;
280
281 while (true) {
282
283
284
285 if (!mayBlock && super.available() == 0) {
286 return WOULD_BLOCK;
287 }
288
289
290 if ((ch = super.read()) < 0) {
291 return EOF;
292 }
293
294 ch &= 0xff;
295
296
297 synchronized (client) {
298 client.processAYTResponse();
299 }
300
301
302
303 client.spyRead(ch);
304
305
306 switch (receiveState) {
307
308 case STATE_CR:
309 if (ch == Telnet.NUL) {
310
311 continue;
312 }
313
314
315
316
317
318
319 case STATE_DATA:
320 if (ch == TelnetCommand.IAC) {
321 receiveState = STATE_IAC;
322 continue;
323 }
324
325 if (ch == '\r') {
326 synchronized (client) {
327 if (client.requestedDont(TelnetOption.BINARY)) {
328 receiveState = STATE_CR;
329 } else {
330 receiveState = STATE_DATA;
331 }
332 }
333 } else {
334 receiveState = STATE_DATA;
335 }
336 break;
337
338 case STATE_IAC:
339 switch (ch) {
340 case TelnetCommand.WILL:
341 receiveState = STATE_WILL;
342 continue;
343 case TelnetCommand.WONT:
344 receiveState = STATE_WONT;
345 continue;
346 case TelnetCommand.DO:
347 receiveState = STATE_DO;
348 continue;
349 case TelnetCommand.DONT:
350 receiveState = STATE_DONT;
351 continue;
352
353 case TelnetCommand.SB:
354 suboptionCount = 0;
355 receiveState = STATE_SB;
356 continue;
357
358 case TelnetCommand.IAC:
359 receiveState = STATE_DATA;
360 break;
361 case TelnetCommand.SE:
362 receiveState = STATE_DATA;
363 continue;
364 default:
365 receiveState = STATE_DATA;
366 client.processCommand(ch);
367 continue;
368 }
369 break;
370 case STATE_WILL:
371 synchronized (client) {
372 client.processWill(ch);
373 client.flushOutputStream();
374 }
375 receiveState = STATE_DATA;
376 continue;
377 case STATE_WONT:
378 synchronized (client) {
379 client.processWont(ch);
380 client.flushOutputStream();
381 }
382 receiveState = STATE_DATA;
383 continue;
384 case STATE_DO:
385 synchronized (client) {
386 client.processDo(ch);
387 client.flushOutputStream();
388 }
389 receiveState = STATE_DATA;
390 continue;
391 case STATE_DONT:
392 synchronized (client) {
393 client.processDont(ch);
394 client.flushOutputStream();
395 }
396 receiveState = STATE_DATA;
397 continue;
398
399 case STATE_SB:
400 switch (ch) {
401 case TelnetCommand.IAC:
402 receiveState = STATE_IAC_SB;
403 continue;
404 default:
405
406 if (suboptionCount < suboption.length) {
407 suboption[suboptionCount++] = ch;
408 }
409 break;
410 }
411 receiveState = STATE_SB;
412 continue;
413 case STATE_IAC_SB:
414 switch (ch) {
415 case TelnetCommand.SE:
416 synchronized (client) {
417 client.processSuboption(suboption, suboptionCount);
418 client.flushOutputStream();
419 }
420 receiveState = STATE_DATA;
421 continue;
422 case TelnetCommand.IAC:
423 if (suboptionCount < suboption.length) {
424 suboption[suboptionCount++] = ch;
425 }
426 break;
427 default:
428 break;
429 }
430 receiveState = STATE_SB;
431 continue;
432
433 }
434
435 break;
436 }
437
438 return ch;
439 }
440
441
442
443
444
445
446
447
448 @Override
449 public int read(final byte buffer[]) throws IOException {
450 return read(buffer, 0, buffer.length);
451 }
452
453
454
455
456
457
458
459
460
461
462
463 @Override
464 public int read(final byte buffer[], int offset, int length) throws IOException {
465 int ch;
466 final int off;
467
468 if (length < 1) {
469 return 0;
470 }
471
472
473 synchronized (queue) {
474 if (length > bytesAvailable) {
475 length = bytesAvailable;
476 }
477 }
478
479 if ((ch = read()) == EOF) {
480 return EOF;
481 }
482
483 off = offset;
484
485 do {
486 buffer[offset++] = (byte) ch;
487 } while (--length > 0 && (ch = read()) != EOF);
488
489
490 return offset - off;
491 }
492
493 @Override
494 public void run() {
495 int ch;
496
497 try {
498 _outerLoop: while (!isClosed) {
499 try {
500 if ((ch = read(true)) < 0) {
501 break;
502 }
503 } catch (final InterruptedIOException e) {
504 synchronized (queue) {
505 ioException = e;
506 queue.notifyAll();
507 try {
508 queue.wait(100);
509 } catch (final InterruptedException interrupted) {
510 if (isClosed) {
511 break _outerLoop;
512 }
513 }
514 continue;
515 }
516 } catch (final RuntimeException re) {
517
518
519
520 super.close();
521
522
523 break _outerLoop;
524 }
525
526
527 boolean notify = false;
528 try {
529 notify = processChar(ch);
530 } catch (final InterruptedException e) {
531 if (isClosed) {
532 break _outerLoop;
533 }
534 }
535
536
537 if (notify) {
538 client.notifyInputListener();
539 }
540 }
541 } catch (final IOException ioe) {
542 synchronized (queue) {
543 ioException = ioe;
544 }
545 client.notifyInputListener();
546 }
547
548 synchronized (queue) {
549 isClosed = true;
550 hasReachedEOF = true;
551 queue.notify();
552 }
553
554 threaded = false;
555 }
556
557 void start() {
558 if (thread == null) {
559 return;
560 }
561
562 int priority;
563 isClosed = false;
564
565
566
567
568 priority = Thread.currentThread().getPriority() + 1;
569 if (priority > Thread.MAX_PRIORITY) {
570 priority = Thread.MAX_PRIORITY;
571 }
572 thread.setPriority(priority);
573 thread.setDaemon(true);
574 thread.start();
575 threaded = true;
576 }
577 }