View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  final class TelnetInputStream extends BufferedInputStream implements Runnable {
26  
27      /** End of file has been reached */
28      private static final int EOF = -1;
29  
30      /** Read would block */
31      private static final int WOULD_BLOCK = -2;
32  
33      // TODO should these be private enums?
34      static final int STATE_DATA = 0;
35      static final int STATE_IAC = 1;
36      static final int STATE_WILL = 2;
37      static final int STATE_WONT = 3;
38      static final int STATE_DO = 4;
39      static final int STATE_DONT = 5;
40      static final int STATE_SB = 6;
41      static final int STATE_SE = 7;
42      static final int STATE_CR = 8;
43      static final int STATE_IAC_SB = 9;
44      private boolean hasReachedEOF; // @GuardedBy("queue")
45      private volatile boolean isClosed;
46      private boolean readIsWaiting;
47      private int receiveState;
48      private int queueHead;
49      private int queueTail;
50      private int bytesAvailable;
51      private final int[] queue;
52      private final TelnetClient client;
53      private final Thread thread;
54      private IOException ioException;
55  
56      /* TERMINAL-TYPE option (start) */
57      private final int suboption[];
58      private int suboptionCount;
59      /* TERMINAL-TYPE option (end) */
60  
61      private volatile boolean threaded;
62  
63      TelnetInputStream(final InputStream input, final TelnetClient client) {
64          this(input, client, true);
65      }
66  
67      TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
68          super(input);
69          this.client = client;
70          this.receiveState = STATE_DATA;
71          this.isClosed = true;
72          this.hasReachedEOF = false;
73          // Make it 2049, because when full, one slot will go unused, and we
74          // want a 2048 byte buffer just to have a round number (base 2 that is)
75          this.queue = new int[2049];
76          this.queueHead = 0;
77          this.queueTail = 0;
78          this.suboption = new int[client.maxSubnegotiationLength];
79          this.bytesAvailable = 0;
80          this.ioException = null;
81          this.readIsWaiting = false;
82          this.threaded = false;
83          if (readerThread) {
84              this.thread = new Thread(this);
85          } else {
86              this.thread = null;
87          }
88      }
89  
90      @Override
91      public int available() throws IOException {
92          // Critical section because run() may change bytesAvailable
93          synchronized (queue) {
94              if (threaded) { // Must not call super.available when running threaded: NET-466
95                  return bytesAvailable;
96              }
97              return bytesAvailable + super.available();
98          }
99      }
100 
101     // Cannot be synchronized. Will cause deadlock if run() is blocked
102     // in read because BufferedInputStream read() is synchronized.
103     @Override
104     public void close() throws IOException {
105         // Completely disregard the fact thread may still be running.
106         // We can't afford to block on this close by waiting for
107         // thread to terminate because few if any JVM's will actually
108         // interrupt a system read() from the interrupt() method.
109         super.close();
110 
111         synchronized (queue) {
112             hasReachedEOF = true;
113             isClosed = true;
114 
115             if (thread != null && thread.isAlive()) {
116                 thread.interrupt();
117             }
118 
119             queue.notifyAll();
120         }
121 
122     }
123 
124     /** Returns false. Mark is not supported. */
125     @Override
126     public boolean markSupported() {
127         return false;
128     }
129 
130     // synchronized(client) critical sections are to protect against
131     // TelnetOutputStream writing through the Telnet client at same time
132     // as a processDo/Will/etc. command invoked from TelnetInputStream
133     // tries to write. Returns true if buffer was previously empty.
134     private boolean processChar(final int ch) throws InterruptedException {
135         // Critical section because we're altering bytesAvailable,
136         // queueTail, and the contents of _queue.
137         final boolean bufferWasEmpty;
138         synchronized (queue) {
139             bufferWasEmpty = bytesAvailable == 0;
140             while (bytesAvailable >= queue.length - 1) {
141                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
142                 // will consume some data soon!
143                 if (!threaded) {
144                     // We've been asked to add another character to the queue, but it is already full and there's
145                     // no other thread to drain it. This should not have happened!
146                     throw new IllegalStateException("Queue is full! Cannot process another character.");
147                 }
148                 queue.notify();
149                 try {
150                     queue.wait();
151                 } catch (final InterruptedException e) {
152                     Thread.currentThread().interrupt();
153                     throw e;
154                 }
155             }
156 
157             // Need to do this in case we're not full, but block on a read
158             if (readIsWaiting && threaded) {
159                 queue.notify();
160             }
161 
162             queue[queueTail] = ch;
163             ++bytesAvailable;
164 
165             if (++queueTail >= queue.length) {
166                 queueTail = 0;
167             }
168         }
169         return bufferWasEmpty;
170     }
171 
172     @Override
173     public int read() throws IOException {
174         // Critical section because we're altering bytesAvailable,
175         // queueHead, and the contents of _queue in addition to
176         // testing value of hasReachedEOF.
177         synchronized (queue) {
178 
179             while (true) {
180                 if (ioException != null) {
181                     final IOException e;
182                     e = ioException;
183                     ioException = null;
184                     throw e;
185                 }
186 
187                 if (bytesAvailable == 0) {
188                     // Return EOF if at end of file
189                     if (hasReachedEOF) {
190                         return EOF;
191                     }
192 
193                     // Otherwise, we have to wait for queue to get something
194                     if (threaded) {
195                         queue.notify();
196                         try {
197                             readIsWaiting = true;
198                             queue.wait();
199                             readIsWaiting = false;
200                         } catch (final InterruptedException e) {
201                             Thread.currentThread().interrupt();
202                             final InterruptedIOException interruptedIoException = new InterruptedIOException("Fatal thread interruption during read.");
203                             interruptedIoException.initCause(e);
204                             throw interruptedIoException;
205                         }
206                     } else {
207                         // alreadyread = false;
208                         readIsWaiting = true;
209                         int ch;
210                         boolean mayBlock = true; // block on the first read only
211 
212                         do {
213                             try {
214                                 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
215                                     // must be EOF
216                                     return ch;
217                                 }
218                             } catch (final InterruptedIOException e) {
219                                 synchronized (queue) {
220                                     ioException = e;
221                                     queue.notifyAll();
222                                     try {
223                                         queue.wait(100);
224                                     } catch (final InterruptedException interrupted) {
225                                         Thread.currentThread().interrupt();
226                                     }
227                                 }
228                                 return EOF;
229                             }
230 
231                             try {
232                                 if (ch != WOULD_BLOCK) {
233                                     processChar(ch);
234                                 }
235                             } catch (final InterruptedException e) {
236                                 Thread.currentThread().interrupt();
237                                 if (isClosed) {
238                                     return EOF;
239                                 }
240                             }
241 
242                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
243                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
244                             mayBlock = false;
245 
246                         }
247                         // Continue reading as long as there is data available and the queue is not full.
248                         while (super.available() > 0 && bytesAvailable < queue.length - 1);
249 
250                         readIsWaiting = false;
251                     }
252                     continue;
253                 }
254                 final int ch;
255 
256                 ch = queue[queueHead];
257 
258                 if (++queueHead >= queue.length) {
259                     queueHead = 0;
260                 }
261 
262                 --bytesAvailable;
263 
264                 // Need to explicitly notify() so available() works properly
265                 if (bytesAvailable == 0 && threaded) {
266                     queue.notify();
267                 }
268 
269                 return ch; // NOPMD TODO?
270             }
271         }
272     }
273 
274     // synchronized(client) critical sections are to protect against
275     // TelnetOutputStream writing through the Telnet client at same time
276     // as a processDo/Will/etc. command invoked from TelnetInputStream
277     // tries to write.
278     /**
279      * Gets the next byte of data. IAC commands are processed internally and do not return data.
280      *
281      * @param mayBlock true if method is allowed to block
282      * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
283      */
284     private int read(final boolean mayBlock) throws IOException {
285         int ch;
286 
287         while (true) {
288 
289             // If there is no more data AND we were told not to block,
290             // just return WOULD_BLOCK (-2). (More efficient than exception.)
291             if (!mayBlock && super.available() == 0) {
292                 return WOULD_BLOCK;
293             }
294 
295             // Otherwise, exit only when we reach end of stream.
296             if ((ch = super.read()) < 0) {
297                 return EOF;
298             }
299 
300             ch &= 0xff;
301 
302             /* Code Section added for supporting AYT (start) */
303             synchronized (client) {
304                 client.processAYTResponse();
305             }
306             /* Code Section added for supporting AYT (end) */
307 
308             /* Code Section added for supporting spystreams (start) */
309             client.spyRead(ch);
310             /* Code Section added for supporting spystreams (end) */
311 
312             switch (receiveState) {
313 
314             case STATE_CR:
315                 if (ch == Telnet.NUL) {
316                     // Strip null
317                     continue;
318                 }
319                 // How do we handle newline after cr?
320                 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
321 
322                 // Handle as normal data by falling through to _STATE_DATA case
323 
324                 // falls through$
325             case STATE_DATA:
326                 if (ch == TelnetCommand.IAC) {
327                     receiveState = STATE_IAC;
328                     continue;
329                 }
330 
331                 if (ch == '\r') {
332                     synchronized (client) {
333                         if (client.requestedDont(TelnetOption.BINARY)) {
334                             receiveState = STATE_CR;
335                         } else {
336                             receiveState = STATE_DATA;
337                         }
338                     }
339                 } else {
340                     receiveState = STATE_DATA;
341                 }
342                 break;
343 
344             case STATE_IAC:
345                 switch (ch) {
346                 case TelnetCommand.WILL:
347                     receiveState = STATE_WILL;
348                     continue;
349                 case TelnetCommand.WONT:
350                     receiveState = STATE_WONT;
351                     continue;
352                 case TelnetCommand.DO:
353                     receiveState = STATE_DO;
354                     continue;
355                 case TelnetCommand.DONT:
356                     receiveState = STATE_DONT;
357                     continue;
358                 /* TERMINAL-TYPE option (start) */
359                 case TelnetCommand.SB:
360                     suboptionCount = 0;
361                     receiveState = STATE_SB;
362                     continue;
363                 /* TERMINAL-TYPE option (end) */
364                 case TelnetCommand.IAC:
365                     receiveState = STATE_DATA;
366                     break; // exit to enclosing switch to return IAC from read
367                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
368                     receiveState = STATE_DATA;
369                     continue;
370                 default:
371                     receiveState = STATE_DATA;
372                     client.processCommand(ch); // Notify the user
373                     continue; // move on the next char
374                 }
375                 break; // exit and return from read
376             case STATE_WILL:
377                 synchronized (client) {
378                     client.processWill(ch);
379                     client.flushOutputStream();
380                 }
381                 receiveState = STATE_DATA;
382                 continue;
383             case STATE_WONT:
384                 synchronized (client) {
385                     client.processWont(ch);
386                     client.flushOutputStream();
387                 }
388                 receiveState = STATE_DATA;
389                 continue;
390             case STATE_DO:
391                 synchronized (client) {
392                     client.processDo(ch);
393                     client.flushOutputStream();
394                 }
395                 receiveState = STATE_DATA;
396                 continue;
397             case STATE_DONT:
398                 synchronized (client) {
399                     client.processDont(ch);
400                     client.flushOutputStream();
401                 }
402                 receiveState = STATE_DATA;
403                 continue;
404             /* TERMINAL-TYPE option (start) */
405             case STATE_SB:
406                 switch (ch) {
407                 case TelnetCommand.IAC:
408                     receiveState = STATE_IAC_SB;
409                     continue;
410                 default:
411                     // store suboption char
412                     if (suboptionCount < suboption.length) {
413                         suboption[suboptionCount++] = ch;
414                     }
415                     break;
416                 }
417                 receiveState = STATE_SB;
418                 continue;
419             case STATE_IAC_SB: // IAC received during SB phase
420                 switch (ch) {
421                 case TelnetCommand.SE:
422                     synchronized (client) {
423                         client.processSuboption(suboption, suboptionCount);
424                         client.flushOutputStream();
425                     }
426                     receiveState = STATE_DATA;
427                     continue;
428                 case TelnetCommand.IAC: // De-dup the duplicated IAC
429                     if (suboptionCount < suboption.length) {
430                         suboption[suboptionCount++] = ch;
431                     }
432                     break;
433                 default: // unexpected byte! ignore it
434                     break;
435                 }
436                 receiveState = STATE_SB;
437                 continue;
438             /* TERMINAL-TYPE option (end) */
439             }
440 
441             break; // NOPMD TODO?
442         }
443 
444         return ch;
445     }
446 
447     /**
448      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
449      *
450      * @param buffer The byte array in which to store the data.
451      * @return The number of bytes read. Returns -1 if the end of the message has been reached.
452      * @throws IOException If an error occurs in reading the underlying stream.
453      */
454     @Override
455     public int read(final byte buffer[]) throws IOException {
456         return read(buffer, 0, buffer.length);
457     }
458 
459     /**
460      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
461      * The characters are stored in the array starting from the given offset and up to the length specified.
462      *
463      * @param buffer The byte array in which to store the data.
464      * @param offset The offset into the array at which to start storing data.
465      * @param length The number of bytes to read.
466      * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
467      * @throws IOException If an error occurs while reading the underlying stream.
468      */
469     @Override
470     public int read(final byte buffer[], int offset, int length) throws IOException {
471         int ch;
472         final int off;
473 
474         if (length < 1) {
475             return 0;
476         }
477 
478         // Critical section because run() may change bytesAvailable
479         synchronized (queue) {
480             if (length > bytesAvailable) {
481                 length = bytesAvailable;
482             }
483         }
484 
485         if ((ch = read()) == EOF) {
486             return EOF;
487         }
488 
489         off = offset;
490 
491         do {
492             buffer[offset++] = (byte) ch;
493         } while (--length > 0 && (ch = read()) != EOF);
494 
495         // client._spyRead(buffer, off, offset - off);
496         return offset - off;
497     }
498 
499     @Override
500     public void run() {
501         int ch;
502 
503         try {
504             _outerLoop: while (!isClosed) {
505                 try {
506                     if ((ch = read(true)) < 0) {
507                         break;
508                     }
509                 } catch (final InterruptedIOException e) {
510                     synchronized (queue) {
511                         ioException = e;
512                         queue.notifyAll();
513                         try {
514                             queue.wait(100);
515                         } catch (final InterruptedException interrupted) {
516                             Thread.currentThread().interrupt();
517                             if (isClosed) {
518                                 break _outerLoop;
519                             }
520                         }
521                         continue;
522                     }
523                 } catch (final RuntimeException re) {
524                     // We treat any runtime exceptions as though the
525                     // stream has been closed. We close the
526                     // underlying stream just to be sure.
527                     super.close();
528                     // Breaking the loop has the effect of setting
529                     // the state to closed at the end of the method.
530                     break _outerLoop;
531                 }
532 
533                 // Process new character
534                 boolean notify = false;
535                 try {
536                     notify = processChar(ch);
537                 } catch (final InterruptedException e) {
538                     Thread.currentThread().interrupt();
539                     if (isClosed) {
540                         break _outerLoop;
541                     }
542                 }
543 
544                 // Notify input listener if buffer was previously empty
545                 if (notify) {
546                     client.notifyInputListener();
547                 }
548             }
549         } catch (final IOException ioe) {
550             synchronized (queue) {
551                 ioException = ioe;
552             }
553             client.notifyInputListener();
554         }
555 
556         synchronized (queue) {
557             isClosed = true; // Possibly redundant
558             hasReachedEOF = true;
559             queue.notify();
560         }
561 
562         threaded = false;
563     }
564 
565     void start() {
566         if (thread == null) {
567             return;
568         }
569 
570         int priority;
571         isClosed = false;
572         // TODO remove this
573         // Need to set a higher priority in case JVM does not use pre-emptive
574         // threads. This should prevent scheduler induced deadlock (rather than
575         // deadlock caused by a bug in this code).
576         priority = Thread.currentThread().getPriority() + 1;
577         if (priority > Thread.MAX_PRIORITY) {
578             priority = Thread.MAX_PRIORITY;
579         }
580         thread.setPriority(priority);
581         thread.setDaemon(true);
582         thread.start();
583         threaded = true; // tell _processChar that we are running threaded
584     }
585 }