1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25 final class TelnetInputStream extends BufferedInputStream implements Runnable {
26
27 /** End of file has been reached */
28 private static final int EOF = -1;
29
30 /** Read would block */
31 private static final int WOULD_BLOCK = -2;
32
33 // TODO should these be private enums?
34 static final int STATE_DATA = 0;
35 static final int STATE_IAC = 1;
36 static final int STATE_WILL = 2;
37 static final int STATE_WONT = 3;
38 static final int STATE_DO = 4;
39 static final int STATE_DONT = 5;
40 static final int STATE_SB = 6;
41 static final int STATE_SE = 7;
42 static final int STATE_CR = 8;
43 static final int STATE_IAC_SB = 9;
44 private boolean hasReachedEOF; // @GuardedBy("queue")
45 private volatile boolean isClosed;
46 private boolean readIsWaiting;
47 private int receiveState;
48 private int queueHead;
49 private int queueTail;
50 private int bytesAvailable;
51 private final int[] queue;
52 private final TelnetClient client;
53 private final Thread thread;
54 private IOException ioException;
55
56 /* TERMINAL-TYPE option (start) */
57 private final int suboption[];
58 private int suboptionCount;
59 /* TERMINAL-TYPE option (end) */
60
61 private volatile boolean threaded;
62
63 TelnetInputStream(final InputStream input, final TelnetClient client) {
64 this(input, client, true);
65 }
66
67 TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
68 super(input);
69 this.client = client;
70 this.receiveState = STATE_DATA;
71 this.isClosed = true;
72 this.hasReachedEOF = false;
73 // Make it 2049, because when full, one slot will go unused, and we
74 // want a 2048 byte buffer just to have a round number (base 2 that is)
75 this.queue = new int[2049];
76 this.queueHead = 0;
77 this.queueTail = 0;
78 this.suboption = new int[client.maxSubnegotiationLength];
79 this.bytesAvailable = 0;
80 this.ioException = null;
81 this.readIsWaiting = false;
82 this.threaded = false;
83 if (readerThread) {
84 this.thread = new Thread(this);
85 } else {
86 this.thread = null;
87 }
88 }
89
90 @Override
91 public int available() throws IOException {
92 // Critical section because run() may change bytesAvailable
93 synchronized (queue) {
94 if (threaded) { // Must not call super.available when running threaded: NET-466
95 return bytesAvailable;
96 }
97 return bytesAvailable + super.available();
98 }
99 }
100
101 // Cannot be synchronized. Will cause deadlock if run() is blocked
102 // in read because BufferedInputStream read() is synchronized.
103 @Override
104 public void close() throws IOException {
105 // Completely disregard the fact thread may still be running.
106 // We can't afford to block on this close by waiting for
107 // thread to terminate because few if any JVM's will actually
108 // interrupt a system read() from the interrupt() method.
109 super.close();
110
111 synchronized (queue) {
112 hasReachedEOF = true;
113 isClosed = true;
114
115 if (thread != null && thread.isAlive()) {
116 thread.interrupt();
117 }
118
119 queue.notifyAll();
120 }
121
122 }
123
124 /** Returns false. Mark is not supported. */
125 @Override
126 public boolean markSupported() {
127 return false;
128 }
129
130 // synchronized(client) critical sections are to protect against
131 // TelnetOutputStream writing through the Telnet client at same time
132 // as a processDo/Will/etc. command invoked from TelnetInputStream
133 // tries to write. Returns true if buffer was previously empty.
134 private boolean processChar(final int ch) throws InterruptedException {
135 // Critical section because we're altering bytesAvailable,
136 // queueTail, and the contents of _queue.
137 final boolean bufferWasEmpty;
138 synchronized (queue) {
139 bufferWasEmpty = bytesAvailable == 0;
140 while (bytesAvailable >= queue.length - 1) {
141 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
142 // will consume some data soon!
143 if (!threaded) {
144 // We've been asked to add another character to the queue, but it is already full and there's
145 // no other thread to drain it. This should not have happened!
146 throw new IllegalStateException("Queue is full! Cannot process another character.");
147 }
148 queue.notify();
149 try {
150 queue.wait();
151 } catch (final InterruptedException e) {
152 Thread.currentThread().interrupt();
153 throw e;
154 }
155 }
156
157 // Need to do this in case we're not full, but block on a read
158 if (readIsWaiting && threaded) {
159 queue.notify();
160 }
161
162 queue[queueTail] = ch;
163 ++bytesAvailable;
164
165 if (++queueTail >= queue.length) {
166 queueTail = 0;
167 }
168 }
169 return bufferWasEmpty;
170 }
171
172 @Override
173 public int read() throws IOException {
174 // Critical section because we're altering bytesAvailable,
175 // queueHead, and the contents of _queue in addition to
176 // testing value of hasReachedEOF.
177 synchronized (queue) {
178
179 while (true) {
180 if (ioException != null) {
181 final IOException e;
182 e = ioException;
183 ioException = null;
184 throw e;
185 }
186
187 if (bytesAvailable == 0) {
188 // Return EOF if at end of file
189 if (hasReachedEOF) {
190 return EOF;
191 }
192
193 // Otherwise, we have to wait for queue to get something
194 if (threaded) {
195 queue.notify();
196 try {
197 readIsWaiting = true;
198 queue.wait();
199 readIsWaiting = false;
200 } catch (final InterruptedException e) {
201 Thread.currentThread().interrupt();
202 final InterruptedIOException interruptedIoException = new InterruptedIOException("Fatal thread interruption during read.");
203 interruptedIoException.initCause(e);
204 throw interruptedIoException;
205 }
206 } else {
207 // alreadyread = false;
208 readIsWaiting = true;
209 int ch;
210 boolean mayBlock = true; // block on the first read only
211
212 do {
213 try {
214 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
215 // must be EOF
216 return ch;
217 }
218 } catch (final InterruptedIOException e) {
219 synchronized (queue) {
220 ioException = e;
221 queue.notifyAll();
222 try {
223 queue.wait(100);
224 } catch (final InterruptedException interrupted) {
225 Thread.currentThread().interrupt();
226 }
227 }
228 return EOF;
229 }
230
231 try {
232 if (ch != WOULD_BLOCK) {
233 processChar(ch);
234 }
235 } catch (final InterruptedException e) {
236 Thread.currentThread().interrupt();
237 if (isClosed) {
238 return EOF;
239 }
240 }
241
242 // Reads should not block on subsequent iterations. Potentially, this could happen if the
243 // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
244 mayBlock = false;
245
246 }
247 // Continue reading as long as there is data available and the queue is not full.
248 while (super.available() > 0 && bytesAvailable < queue.length - 1);
249
250 readIsWaiting = false;
251 }
252 continue;
253 }
254 final int ch;
255
256 ch = queue[queueHead];
257
258 if (++queueHead >= queue.length) {
259 queueHead = 0;
260 }
261
262 --bytesAvailable;
263
264 // Need to explicitly notify() so available() works properly
265 if (bytesAvailable == 0 && threaded) {
266 queue.notify();
267 }
268
269 return ch; // NOPMD TODO?
270 }
271 }
272 }
273
274 // synchronized(client) critical sections are to protect against
275 // TelnetOutputStream writing through the Telnet client at same time
276 // as a processDo/Will/etc. command invoked from TelnetInputStream
277 // tries to write.
278 /**
279 * Gets the next byte of data. IAC commands are processed internally and do not return data.
280 *
281 * @param mayBlock true if method is allowed to block
282 * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
283 */
284 private int read(final boolean mayBlock) throws IOException {
285 int ch;
286
287 while (true) {
288
289 // If there is no more data AND we were told not to block,
290 // just return WOULD_BLOCK (-2). (More efficient than exception.)
291 if (!mayBlock && super.available() == 0) {
292 return WOULD_BLOCK;
293 }
294
295 // Otherwise, exit only when we reach end of stream.
296 if ((ch = super.read()) < 0) {
297 return EOF;
298 }
299
300 ch &= 0xff;
301
302 /* Code Section added for supporting AYT (start) */
303 synchronized (client) {
304 client.processAYTResponse();
305 }
306 /* Code Section added for supporting AYT (end) */
307
308 /* Code Section added for supporting spystreams (start) */
309 client.spyRead(ch);
310 /* Code Section added for supporting spystreams (end) */
311
312 switch (receiveState) {
313
314 case STATE_CR:
315 if (ch == Telnet.NUL) {
316 // Strip null
317 continue;
318 }
319 // How do we handle newline after cr?
320 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
321
322 // Handle as normal data by falling through to _STATE_DATA case
323
324 // falls through$
325 case STATE_DATA:
326 if (ch == TelnetCommand.IAC) {
327 receiveState = STATE_IAC;
328 continue;
329 }
330
331 if (ch == '\r') {
332 synchronized (client) {
333 if (client.requestedDont(TelnetOption.BINARY)) {
334 receiveState = STATE_CR;
335 } else {
336 receiveState = STATE_DATA;
337 }
338 }
339 } else {
340 receiveState = STATE_DATA;
341 }
342 break;
343
344 case STATE_IAC:
345 switch (ch) {
346 case TelnetCommand.WILL:
347 receiveState = STATE_WILL;
348 continue;
349 case TelnetCommand.WONT:
350 receiveState = STATE_WONT;
351 continue;
352 case TelnetCommand.DO:
353 receiveState = STATE_DO;
354 continue;
355 case TelnetCommand.DONT:
356 receiveState = STATE_DONT;
357 continue;
358 /* TERMINAL-TYPE option (start) */
359 case TelnetCommand.SB:
360 suboptionCount = 0;
361 receiveState = STATE_SB;
362 continue;
363 /* TERMINAL-TYPE option (end) */
364 case TelnetCommand.IAC:
365 receiveState = STATE_DATA;
366 break; // exit to enclosing switch to return IAC from read
367 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
368 receiveState = STATE_DATA;
369 continue;
370 default:
371 receiveState = STATE_DATA;
372 client.processCommand(ch); // Notify the user
373 continue; // move on the next char
374 }
375 break; // exit and return from read
376 case STATE_WILL:
377 synchronized (client) {
378 client.processWill(ch);
379 client.flushOutputStream();
380 }
381 receiveState = STATE_DATA;
382 continue;
383 case STATE_WONT:
384 synchronized (client) {
385 client.processWont(ch);
386 client.flushOutputStream();
387 }
388 receiveState = STATE_DATA;
389 continue;
390 case STATE_DO:
391 synchronized (client) {
392 client.processDo(ch);
393 client.flushOutputStream();
394 }
395 receiveState = STATE_DATA;
396 continue;
397 case STATE_DONT:
398 synchronized (client) {
399 client.processDont(ch);
400 client.flushOutputStream();
401 }
402 receiveState = STATE_DATA;
403 continue;
404 /* TERMINAL-TYPE option (start) */
405 case STATE_SB:
406 switch (ch) {
407 case TelnetCommand.IAC:
408 receiveState = STATE_IAC_SB;
409 continue;
410 default:
411 // store suboption char
412 if (suboptionCount < suboption.length) {
413 suboption[suboptionCount++] = ch;
414 }
415 break;
416 }
417 receiveState = STATE_SB;
418 continue;
419 case STATE_IAC_SB: // IAC received during SB phase
420 switch (ch) {
421 case TelnetCommand.SE:
422 synchronized (client) {
423 client.processSuboption(suboption, suboptionCount);
424 client.flushOutputStream();
425 }
426 receiveState = STATE_DATA;
427 continue;
428 case TelnetCommand.IAC: // De-dup the duplicated IAC
429 if (suboptionCount < suboption.length) {
430 suboption[suboptionCount++] = ch;
431 }
432 break;
433 default: // unexpected byte! ignore it
434 break;
435 }
436 receiveState = STATE_SB;
437 continue;
438 /* TERMINAL-TYPE option (end) */
439 }
440
441 break; // NOPMD TODO?
442 }
443
444 return ch;
445 }
446
447 /**
448 * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
449 *
450 * @param buffer The byte array in which to store the data.
451 * @return The number of bytes read. Returns -1 if the end of the message has been reached.
452 * @throws IOException If an error occurs in reading the underlying stream.
453 */
454 @Override
455 public int read(final byte buffer[]) throws IOException {
456 return read(buffer, 0, buffer.length);
457 }
458
459 /**
460 * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
461 * The characters are stored in the array starting from the given offset and up to the length specified.
462 *
463 * @param buffer The byte array in which to store the data.
464 * @param offset The offset into the array at which to start storing data.
465 * @param length The number of bytes to read.
466 * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
467 * @throws IOException If an error occurs while reading the underlying stream.
468 */
469 @Override
470 public int read(final byte buffer[], int offset, int length) throws IOException {
471 int ch;
472 final int off;
473
474 if (length < 1) {
475 return 0;
476 }
477
478 // Critical section because run() may change bytesAvailable
479 synchronized (queue) {
480 if (length > bytesAvailable) {
481 length = bytesAvailable;
482 }
483 }
484
485 if ((ch = read()) == EOF) {
486 return EOF;
487 }
488
489 off = offset;
490
491 do {
492 buffer[offset++] = (byte) ch;
493 } while (--length > 0 && (ch = read()) != EOF);
494
495 // client._spyRead(buffer, off, offset - off);
496 return offset - off;
497 }
498
499 @Override
500 public void run() {
501 int ch;
502
503 try {
504 _outerLoop: while (!isClosed) {
505 try {
506 if ((ch = read(true)) < 0) {
507 break;
508 }
509 } catch (final InterruptedIOException e) {
510 synchronized (queue) {
511 ioException = e;
512 queue.notifyAll();
513 try {
514 queue.wait(100);
515 } catch (final InterruptedException interrupted) {
516 Thread.currentThread().interrupt();
517 if (isClosed) {
518 break _outerLoop;
519 }
520 }
521 continue;
522 }
523 } catch (final RuntimeException re) {
524 // We treat any runtime exceptions as though the
525 // stream has been closed. We close the
526 // underlying stream just to be sure.
527 super.close();
528 // Breaking the loop has the effect of setting
529 // the state to closed at the end of the method.
530 break _outerLoop;
531 }
532
533 // Process new character
534 boolean notify = false;
535 try {
536 notify = processChar(ch);
537 } catch (final InterruptedException e) {
538 Thread.currentThread().interrupt();
539 if (isClosed) {
540 break _outerLoop;
541 }
542 }
543
544 // Notify input listener if buffer was previously empty
545 if (notify) {
546 client.notifyInputListener();
547 }
548 }
549 } catch (final IOException ioe) {
550 synchronized (queue) {
551 ioException = ioe;
552 }
553 client.notifyInputListener();
554 }
555
556 synchronized (queue) {
557 isClosed = true; // Possibly redundant
558 hasReachedEOF = true;
559 queue.notify();
560 }
561
562 threaded = false;
563 }
564
565 void start() {
566 if (thread == null) {
567 return;
568 }
569
570 int priority;
571 isClosed = false;
572 // TODO remove this
573 // Need to set a higher priority in case JVM does not use pre-emptive
574 // threads. This should prevent scheduler induced deadlock (rather than
575 // deadlock caused by a bug in this code).
576 priority = Thread.currentThread().getPriority() + 1;
577 if (priority > Thread.MAX_PRIORITY) {
578 priority = Thread.MAX_PRIORITY;
579 }
580 thread.setPriority(priority);
581 thread.setDaemon(true);
582 thread.start();
583 threaded = true; // tell _processChar that we are running threaded
584 }
585 }