1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25 final class TelnetInputStream extends BufferedInputStream implements Runnable {
26 /** End of file has been reached */
27 private static final int EOF = -1;
28
29 /** Read would block */
30 private static final int WOULD_BLOCK = -2;
31
32 // TODO should these be private enums?
33 static final int STATE_DATA = 0;
34 static final int STATE_IAC = 1;
35 static final int STATE_WILL = 2;
36 static final int STATE_WONT = 3;
37 static final int STATE_DO = 4;
38 static final int STATE_DONT = 5;
39 static final int STATE_SB = 6;
40 static final int STATE_SE = 7;
41 static final int STATE_CR = 8;
42 static final int STATE_IAC_SB = 9;
43 private boolean hasReachedEOF; // @GuardedBy("queue")
44 private volatile boolean isClosed;
45 private boolean readIsWaiting;
46 private int receiveState;
47 private int queueHead;
48 private int queueTail;
49 private int bytesAvailable;
50 private final int[] queue;
51 private final TelnetClient client;
52 private final Thread thread;
53 private IOException ioException;
54
55 /* TERMINAL-TYPE option (start) */
56 private final int suboption[];
57 private int suboptionCount;
58 /* TERMINAL-TYPE option (end) */
59
60 private volatile boolean threaded;
61
62 TelnetInputStream(final InputStream input, final TelnetClient client) {
63 this(input, client, true);
64 }
65
66 TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
67 super(input);
68 this.client = client;
69 this.receiveState = STATE_DATA;
70 this.isClosed = true;
71 this.hasReachedEOF = false;
72 // Make it 2049, because when full, one slot will go unused, and we
73 // want a 2048 byte buffer just to have a round number (base 2 that is)
74 this.queue = new int[2049];
75 this.queueHead = 0;
76 this.queueTail = 0;
77 this.suboption = new int[client.maxSubnegotiationLength];
78 this.bytesAvailable = 0;
79 this.ioException = null;
80 this.readIsWaiting = false;
81 this.threaded = false;
82 if (readerThread) {
83 this.thread = new Thread(this);
84 } else {
85 this.thread = null;
86 }
87 }
88
89 @Override
90 public int available() throws IOException {
91 // Critical section because run() may change bytesAvailable
92 synchronized (queue) {
93 if (threaded) { // Must not call super.available when running threaded: NET-466
94 return bytesAvailable;
95 }
96 return bytesAvailable + super.available();
97 }
98 }
99
100 // Cannot be synchronized. Will cause deadlock if run() is blocked
101 // in read because BufferedInputStream read() is synchronized.
102 @Override
103 public void close() throws IOException {
104 // Completely disregard the fact thread may still be running.
105 // We can't afford to block on this close by waiting for
106 // thread to terminate because few if any JVM's will actually
107 // interrupt a system read() from the interrupt() method.
108 super.close();
109
110 synchronized (queue) {
111 hasReachedEOF = true;
112 isClosed = true;
113
114 if (thread != null && thread.isAlive()) {
115 thread.interrupt();
116 }
117
118 queue.notifyAll();
119 }
120
121 }
122
123 /** Returns false. Mark is not supported. */
124 @Override
125 public boolean markSupported() {
126 return false;
127 }
128
129 // synchronized(client) critical sections are to protect against
130 // TelnetOutputStream writing through the Telnet client at same time
131 // as a processDo/Will/etc. command invoked from TelnetInputStream
132 // tries to write. Returns true if buffer was previously empty.
133 private boolean processChar(final int ch) throws InterruptedException {
134 // Critical section because we're altering bytesAvailable,
135 // queueTail, and the contents of _queue.
136 final boolean bufferWasEmpty;
137 synchronized (queue) {
138 bufferWasEmpty = bytesAvailable == 0;
139 while (bytesAvailable >= queue.length - 1) {
140 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
141 // will consume some data soon!
142 if (!threaded) {
143 // We've been asked to add another character to the queue, but it is already full and there's
144 // no other thread to drain it. This should not have happened!
145 throw new IllegalStateException("Queue is full! Cannot process another character.");
146 }
147 queue.notify();
148 try {
149 queue.wait();
150 } catch (final InterruptedException e) {
151 throw e;
152 }
153 }
154
155 // Need to do this in case we're not full, but block on a read
156 if (readIsWaiting && threaded) {
157 queue.notify();
158 }
159
160 queue[queueTail] = ch;
161 ++bytesAvailable;
162
163 if (++queueTail >= queue.length) {
164 queueTail = 0;
165 }
166 }
167 return bufferWasEmpty;
168 }
169
170 @Override
171 public int read() throws IOException {
172 // Critical section because we're altering bytesAvailable,
173 // queueHead, and the contents of _queue in addition to
174 // testing value of hasReachedEOF.
175 synchronized (queue) {
176
177 while (true) {
178 if (ioException != null) {
179 final IOException e;
180 e = ioException;
181 ioException = null;
182 throw e;
183 }
184
185 if (bytesAvailable == 0) {
186 // Return EOF if at end of file
187 if (hasReachedEOF) {
188 return EOF;
189 }
190
191 // Otherwise, we have to wait for queue to get something
192 if (threaded) {
193 queue.notify();
194 try {
195 readIsWaiting = true;
196 queue.wait();
197 readIsWaiting = false;
198 } catch (final InterruptedException e) {
199 throw new InterruptedIOException("Fatal thread interruption during read.");
200 }
201 } else {
202 // alreadyread = false;
203 readIsWaiting = true;
204 int ch;
205 boolean mayBlock = true; // block on the first read only
206
207 do {
208 try {
209 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
210 // must be EOF
211 return ch;
212 }
213 } catch (final InterruptedIOException e) {
214 synchronized (queue) {
215 ioException = e;
216 queue.notifyAll();
217 try {
218 queue.wait(100);
219 } catch (final InterruptedException interrupted) {
220 // Ignored
221 }
222 }
223 return EOF;
224 }
225
226 try {
227 if (ch != WOULD_BLOCK) {
228 processChar(ch);
229 }
230 } catch (final InterruptedException e) {
231 if (isClosed) {
232 return EOF;
233 }
234 }
235
236 // Reads should not block on subsequent iterations. Potentially, this could happen if the
237 // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
238 mayBlock = false;
239
240 }
241 // Continue reading as long as there is data available and the queue is not full.
242 while (super.available() > 0 && bytesAvailable < queue.length - 1);
243
244 readIsWaiting = false;
245 }
246 continue;
247 }
248 final int ch;
249
250 ch = queue[queueHead];
251
252 if (++queueHead >= queue.length) {
253 queueHead = 0;
254 }
255
256 --bytesAvailable;
257
258 // Need to explicitly notify() so available() works properly
259 if (bytesAvailable == 0 && threaded) {
260 queue.notify();
261 }
262
263 return ch; // NOPMD TODO?
264 }
265 }
266 }
267
268 // synchronized(client) critical sections are to protect against
269 // TelnetOutputStream writing through the Telnet client at same time
270 // as a processDo/Will/etc. command invoked from TelnetInputStream
271 // tries to write.
272 /**
273 * Gets the next byte of data. IAC commands are processed internally and do not return data.
274 *
275 * @param mayBlock true if method is allowed to block
276 * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
277 */
278 private int read(final boolean mayBlock) throws IOException {
279 int ch;
280
281 while (true) {
282
283 // If there is no more data AND we were told not to block,
284 // just return WOULD_BLOCK (-2). (More efficient than exception.)
285 if (!mayBlock && super.available() == 0) {
286 return WOULD_BLOCK;
287 }
288
289 // Otherwise, exit only when we reach end of stream.
290 if ((ch = super.read()) < 0) {
291 return EOF;
292 }
293
294 ch &= 0xff;
295
296 /* Code Section added for supporting AYT (start) */
297 synchronized (client) {
298 client.processAYTResponse();
299 }
300 /* Code Section added for supporting AYT (end) */
301
302 /* Code Section added for supporting spystreams (start) */
303 client.spyRead(ch);
304 /* Code Section added for supporting spystreams (end) */
305
306 switch (receiveState) {
307
308 case STATE_CR:
309 if (ch == Telnet.NUL) {
310 // Strip null
311 continue;
312 }
313 // How do we handle newline after cr?
314 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
315
316 // Handle as normal data by falling through to _STATE_DATA case
317
318 // falls through$
319 case STATE_DATA:
320 if (ch == TelnetCommand.IAC) {
321 receiveState = STATE_IAC;
322 continue;
323 }
324
325 if (ch == '\r') {
326 synchronized (client) {
327 if (client.requestedDont(TelnetOption.BINARY)) {
328 receiveState = STATE_CR;
329 } else {
330 receiveState = STATE_DATA;
331 }
332 }
333 } else {
334 receiveState = STATE_DATA;
335 }
336 break;
337
338 case STATE_IAC:
339 switch (ch) {
340 case TelnetCommand.WILL:
341 receiveState = STATE_WILL;
342 continue;
343 case TelnetCommand.WONT:
344 receiveState = STATE_WONT;
345 continue;
346 case TelnetCommand.DO:
347 receiveState = STATE_DO;
348 continue;
349 case TelnetCommand.DONT:
350 receiveState = STATE_DONT;
351 continue;
352 /* TERMINAL-TYPE option (start) */
353 case TelnetCommand.SB:
354 suboptionCount = 0;
355 receiveState = STATE_SB;
356 continue;
357 /* TERMINAL-TYPE option (end) */
358 case TelnetCommand.IAC:
359 receiveState = STATE_DATA;
360 break; // exit to enclosing switch to return IAC from read
361 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
362 receiveState = STATE_DATA;
363 continue;
364 default:
365 receiveState = STATE_DATA;
366 client.processCommand(ch); // Notify the user
367 continue; // move on the next char
368 }
369 break; // exit and return from read
370 case STATE_WILL:
371 synchronized (client) {
372 client.processWill(ch);
373 client.flushOutputStream();
374 }
375 receiveState = STATE_DATA;
376 continue;
377 case STATE_WONT:
378 synchronized (client) {
379 client.processWont(ch);
380 client.flushOutputStream();
381 }
382 receiveState = STATE_DATA;
383 continue;
384 case STATE_DO:
385 synchronized (client) {
386 client.processDo(ch);
387 client.flushOutputStream();
388 }
389 receiveState = STATE_DATA;
390 continue;
391 case STATE_DONT:
392 synchronized (client) {
393 client.processDont(ch);
394 client.flushOutputStream();
395 }
396 receiveState = STATE_DATA;
397 continue;
398 /* TERMINAL-TYPE option (start) */
399 case STATE_SB:
400 switch (ch) {
401 case TelnetCommand.IAC:
402 receiveState = STATE_IAC_SB;
403 continue;
404 default:
405 // store suboption char
406 if (suboptionCount < suboption.length) {
407 suboption[suboptionCount++] = ch;
408 }
409 break;
410 }
411 receiveState = STATE_SB;
412 continue;
413 case STATE_IAC_SB: // IAC received during SB phase
414 switch (ch) {
415 case TelnetCommand.SE:
416 synchronized (client) {
417 client.processSuboption(suboption, suboptionCount);
418 client.flushOutputStream();
419 }
420 receiveState = STATE_DATA;
421 continue;
422 case TelnetCommand.IAC: // De-dup the duplicated IAC
423 if (suboptionCount < suboption.length) {
424 suboption[suboptionCount++] = ch;
425 }
426 break;
427 default: // unexpected byte! ignore it
428 break;
429 }
430 receiveState = STATE_SB;
431 continue;
432 /* TERMINAL-TYPE option (end) */
433 }
434
435 break; // NOPMD TODO?
436 }
437
438 return ch;
439 }
440
441 /**
442 * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
443 *
444 * @param buffer The byte array in which to store the data.
445 * @return The number of bytes read. Returns -1 if the end of the message has been reached.
446 * @throws IOException If an error occurs in reading the underlying stream.
447 */
448 @Override
449 public int read(final byte buffer[]) throws IOException {
450 return read(buffer, 0, buffer.length);
451 }
452
453 /**
454 * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
455 * The characters are stored in the array starting from the given offset and up to the length specified.
456 *
457 * @param buffer The byte array in which to store the data.
458 * @param offset The offset into the array at which to start storing data.
459 * @param length The number of bytes to read.
460 * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
461 * @throws IOException If an error occurs while reading the underlying stream.
462 */
463 @Override
464 public int read(final byte buffer[], int offset, int length) throws IOException {
465 int ch;
466 final int off;
467
468 if (length < 1) {
469 return 0;
470 }
471
472 // Critical section because run() may change bytesAvailable
473 synchronized (queue) {
474 if (length > bytesAvailable) {
475 length = bytesAvailable;
476 }
477 }
478
479 if ((ch = read()) == EOF) {
480 return EOF;
481 }
482
483 off = offset;
484
485 do {
486 buffer[offset++] = (byte) ch;
487 } while (--length > 0 && (ch = read()) != EOF);
488
489 // client._spyRead(buffer, off, offset - off);
490 return offset - off;
491 }
492
493 @Override
494 public void run() {
495 int ch;
496
497 try {
498 _outerLoop: while (!isClosed) {
499 try {
500 if ((ch = read(true)) < 0) {
501 break;
502 }
503 } catch (final InterruptedIOException e) {
504 synchronized (queue) {
505 ioException = e;
506 queue.notifyAll();
507 try {
508 queue.wait(100);
509 } catch (final InterruptedException interrupted) {
510 if (isClosed) {
511 break _outerLoop;
512 }
513 }
514 continue;
515 }
516 } catch (final RuntimeException re) {
517 // We treat any runtime exceptions as though the
518 // stream has been closed. We close the
519 // underlying stream just to be sure.
520 super.close();
521 // Breaking the loop has the effect of setting
522 // the state to closed at the end of the method.
523 break _outerLoop;
524 }
525
526 // Process new character
527 boolean notify = false;
528 try {
529 notify = processChar(ch);
530 } catch (final InterruptedException e) {
531 if (isClosed) {
532 break _outerLoop;
533 }
534 }
535
536 // Notify input listener if buffer was previously empty
537 if (notify) {
538 client.notifyInputListener();
539 }
540 }
541 } catch (final IOException ioe) {
542 synchronized (queue) {
543 ioException = ioe;
544 }
545 client.notifyInputListener();
546 }
547
548 synchronized (queue) {
549 isClosed = true; // Possibly redundant
550 hasReachedEOF = true;
551 queue.notify();
552 }
553
554 threaded = false;
555 }
556
557 void start() {
558 if (thread == null) {
559 return;
560 }
561
562 int priority;
563 isClosed = false;
564 // TODO remove this
565 // Need to set a higher priority in case JVM does not use pre-emptive
566 // threads. This should prevent scheduler induced deadlock (rather than
567 // deadlock caused by a bug in this code).
568 priority = Thread.currentThread().getPriority() + 1;
569 if (priority > Thread.MAX_PRIORITY) {
570 priority = Thread.MAX_PRIORITY;
571 }
572 thread.setPriority(priority);
573 thread.setDaemon(true);
574 thread.start();
575 threaded = true; // tell _processChar that we are running threaded
576 }
577 }