View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  final class TelnetInputStream extends BufferedInputStream implements Runnable {
26      /** End of file has been reached */
27      private static final int EOF = -1;
28  
29      /** Read would block */
30      private static final int WOULD_BLOCK = -2;
31  
32      // TODO should these be private enums?
33      static final int STATE_DATA = 0, STATE_IAC = 1, STATE_WILL = 2, STATE_WONT = 3, STATE_DO = 4, STATE_DONT = 5, STATE_SB = 6, STATE_SE = 7, STATE_CR = 8,
34              STATE_IAC_SB = 9;
35  
36      private boolean hasReachedEOF; // @GuardedBy("queue")
37      private volatile boolean isClosed;
38      private boolean readIsWaiting;
39      private int receiveState, queueHead, queueTail, bytesAvailable;
40      private final int[] queue;
41      private final TelnetClient client;
42      private final Thread thread;
43      private IOException ioException;
44  
45      /* TERMINAL-TYPE option (start) */
46      private final int suboption[];
47      private int suboptionCount;
48      /* TERMINAL-TYPE option (end) */
49  
50      private volatile boolean threaded;
51  
52      TelnetInputStream(final InputStream input, final TelnetClient client) {
53          this(input, client, true);
54      }
55  
56      TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
57          super(input);
58          this.client = client;
59          this.receiveState = STATE_DATA;
60          this.isClosed = true;
61          this.hasReachedEOF = false;
62          // Make it 2049, because when full, one slot will go unused, and we
63          // want a 2048 byte buffer just to have a round number (base 2 that is)
64          this.queue = new int[2049];
65          this.queueHead = 0;
66          this.queueTail = 0;
67          this.suboption = new int[client.maxSubnegotiationLength];
68          this.bytesAvailable = 0;
69          this.ioException = null;
70          this.readIsWaiting = false;
71          this.threaded = false;
72          if (readerThread) {
73              this.thread = new Thread(this);
74          } else {
75              this.thread = null;
76          }
77      }
78  
79      @Override
80      public int available() throws IOException {
81          // Critical section because run() may change bytesAvailable
82          synchronized (queue) {
83              if (threaded) { // Must not call super.available when running threaded: NET-466
84                  return bytesAvailable;
85              }
86              return bytesAvailable + super.available();
87          }
88      }
89  
90      // Cannot be synchronized. Will cause deadlock if run() is blocked
91      // in read because BufferedInputStream read() is synchronized.
92      @Override
93      public void close() throws IOException {
94          // Completely disregard the fact thread may still be running.
95          // We can't afford to block on this close by waiting for
96          // thread to terminate because few if any JVM's will actually
97          // interrupt a system read() from the interrupt() method.
98          super.close();
99  
100         synchronized (queue) {
101             hasReachedEOF = true;
102             isClosed = true;
103 
104             if (thread != null && thread.isAlive()) {
105                 thread.interrupt();
106             }
107 
108             queue.notifyAll();
109         }
110 
111     }
112 
113     /** Returns false. Mark is not supported. */
114     @Override
115     public boolean markSupported() {
116         return false;
117     }
118 
119     // synchronized(client) critical sections are to protect against
120     // TelnetOutputStream writing through the telnet client at same time
121     // as a processDo/Will/etc. command invoked from TelnetInputStream
122     // tries to write. Returns true if buffer was previously empty.
123     private boolean processChar(final int ch) throws InterruptedException {
124         // Critical section because we're altering bytesAvailable,
125         // queueTail, and the contents of _queue.
126         final boolean bufferWasEmpty;
127         synchronized (queue) {
128             bufferWasEmpty = bytesAvailable == 0;
129             while (bytesAvailable >= queue.length - 1) {
130                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
131                 // will consume some data soon!
132                 if (!threaded) {
133                     // We've been asked to add another character to the queue, but it is already full and there's
134                     // no other thread to drain it. This should not have happened!
135                     throw new IllegalStateException("Queue is full! Cannot process another character.");
136                 }
137                 queue.notify();
138                 try {
139                     queue.wait();
140                 } catch (final InterruptedException e) {
141                     throw e;
142                 }
143             }
144 
145             // Need to do this in case we're not full, but block on a read
146             if (readIsWaiting && threaded) {
147                 queue.notify();
148             }
149 
150             queue[queueTail] = ch;
151             ++bytesAvailable;
152 
153             if (++queueTail >= queue.length) {
154                 queueTail = 0;
155             }
156         }
157         return bufferWasEmpty;
158     }
159 
160     @Override
161     public int read() throws IOException {
162         // Critical section because we're altering bytesAvailable,
163         // queueHead, and the contents of _queue in addition to
164         // testing value of hasReachedEOF.
165         synchronized (queue) {
166 
167             while (true) {
168                 if (ioException != null) {
169                     final IOException e;
170                     e = ioException;
171                     ioException = null;
172                     throw e;
173                 }
174 
175                 if (bytesAvailable == 0) {
176                     // Return EOF if at end of file
177                     if (hasReachedEOF) {
178                         return EOF;
179                     }
180 
181                     // Otherwise, we have to wait for queue to get something
182                     if (threaded) {
183                         queue.notify();
184                         try {
185                             readIsWaiting = true;
186                             queue.wait();
187                             readIsWaiting = false;
188                         } catch (final InterruptedException e) {
189                             throw new InterruptedIOException("Fatal thread interruption during read.");
190                         }
191                     } else {
192                         // alreadyread = false;
193                         readIsWaiting = true;
194                         int ch;
195                         boolean mayBlock = true; // block on the first read only
196 
197                         do {
198                             try {
199                                 if ((ch = read(mayBlock)) < 0) { // must be EOF
200                                     if (ch != WOULD_BLOCK) {
201                                         return ch;
202                                     }
203                                 }
204                             } catch (final InterruptedIOException e) {
205                                 synchronized (queue) {
206                                     ioException = e;
207                                     queue.notifyAll();
208                                     try {
209                                         queue.wait(100);
210                                     } catch (final InterruptedException interrupted) {
211                                         // Ignored
212                                     }
213                                 }
214                                 return EOF;
215                             }
216 
217                             try {
218                                 if (ch != WOULD_BLOCK) {
219                                     processChar(ch);
220                                 }
221                             } catch (final InterruptedException e) {
222                                 if (isClosed) {
223                                     return EOF;
224                                 }
225                             }
226 
227                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
228                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
229                             mayBlock = false;
230 
231                         }
232                         // Continue reading as long as there is data available and the queue is not full.
233                         while (super.available() > 0 && bytesAvailable < queue.length - 1);
234 
235                         readIsWaiting = false;
236                     }
237                     continue;
238                 }
239                 final int ch;
240 
241                 ch = queue[queueHead];
242 
243                 if (++queueHead >= queue.length) {
244                     queueHead = 0;
245                 }
246 
247                 --bytesAvailable;
248 
249                 // Need to explicitly notify() so available() works properly
250                 if (bytesAvailable == 0 && threaded) {
251                     queue.notify();
252                 }
253 
254                 return ch;
255             }
256         }
257     }
258 
259     // synchronized(client) critical sections are to protect against
260     // TelnetOutputStream writing through the telnet client at same time
261     // as a processDo/Will/etc. command invoked from TelnetInputStream
262     // tries to write.
263     /**
264      * Get the next byte of data. IAC commands are processed internally and do not return data.
265      *
266      * @param mayBlock true if method is allowed to block
267      * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
268      */
269     private int read(final boolean mayBlock) throws IOException {
270         int ch;
271 
272         while (true) {
273 
274             // If there is no more data AND we were told not to block,
275             // just return WOULD_BLOCK (-2). (More efficient than exception.)
276             if (!mayBlock && super.available() == 0) {
277                 return WOULD_BLOCK;
278             }
279 
280             // Otherwise, exit only when we reach end of stream.
281             if ((ch = super.read()) < 0) {
282                 return EOF;
283             }
284 
285             ch = ch & 0xff;
286 
287             /* Code Section added for supporting AYT (start) */
288             synchronized (client) {
289                 client.processAYTResponse();
290             }
291             /* Code Section added for supporting AYT (end) */
292 
293             /* Code Section added for supporting spystreams (start) */
294             client.spyRead(ch);
295             /* Code Section added for supporting spystreams (end) */
296 
297             switch (receiveState) {
298 
299             case STATE_CR:
300                 if (ch == '\0') {
301                     // Strip null
302                     continue;
303                 }
304                 // How do we handle newline after cr?
305                 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
306 
307                 // Handle as normal data by falling through to _STATE_DATA case
308 
309                 //$FALL-THROUGH$
310             case STATE_DATA:
311                 if (ch == TelnetCommand.IAC) {
312                     receiveState = STATE_IAC;
313                     continue;
314                 }
315 
316                 if (ch == '\r') {
317                     synchronized (client) {
318                         if (client.requestedDont(TelnetOption.BINARY)) {
319                             receiveState = STATE_CR;
320                         } else {
321                             receiveState = STATE_DATA;
322                         }
323                     }
324                 } else {
325                     receiveState = STATE_DATA;
326                 }
327                 break;
328 
329             case STATE_IAC:
330                 switch (ch) {
331                 case TelnetCommand.WILL:
332                     receiveState = STATE_WILL;
333                     continue;
334                 case TelnetCommand.WONT:
335                     receiveState = STATE_WONT;
336                     continue;
337                 case TelnetCommand.DO:
338                     receiveState = STATE_DO;
339                     continue;
340                 case TelnetCommand.DONT:
341                     receiveState = STATE_DONT;
342                     continue;
343                 /* TERMINAL-TYPE option (start) */
344                 case TelnetCommand.SB:
345                     suboptionCount = 0;
346                     receiveState = STATE_SB;
347                     continue;
348                 /* TERMINAL-TYPE option (end) */
349                 case TelnetCommand.IAC:
350                     receiveState = STATE_DATA;
351                     break; // exit to enclosing switch to return IAC from read
352                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
353                     receiveState = STATE_DATA;
354                     continue;
355                 default:
356                     receiveState = STATE_DATA;
357                     client.processCommand(ch); // Notify the user
358                     continue; // move on the next char
359                 }
360                 break; // exit and return from read
361             case STATE_WILL:
362                 synchronized (client) {
363                     client.processWill(ch);
364                     client.flushOutputStream();
365                 }
366                 receiveState = STATE_DATA;
367                 continue;
368             case STATE_WONT:
369                 synchronized (client) {
370                     client.processWont(ch);
371                     client.flushOutputStream();
372                 }
373                 receiveState = STATE_DATA;
374                 continue;
375             case STATE_DO:
376                 synchronized (client) {
377                     client.processDo(ch);
378                     client.flushOutputStream();
379                 }
380                 receiveState = STATE_DATA;
381                 continue;
382             case STATE_DONT:
383                 synchronized (client) {
384                     client.processDont(ch);
385                     client.flushOutputStream();
386                 }
387                 receiveState = STATE_DATA;
388                 continue;
389             /* TERMINAL-TYPE option (start) */
390             case STATE_SB:
391                 switch (ch) {
392                 case TelnetCommand.IAC:
393                     receiveState = STATE_IAC_SB;
394                     continue;
395                 default:
396                     // store suboption char
397                     if (suboptionCount < suboption.length) {
398                         suboption[suboptionCount++] = ch;
399                     }
400                     break;
401                 }
402                 receiveState = STATE_SB;
403                 continue;
404             case STATE_IAC_SB: // IAC received during SB phase
405                 switch (ch) {
406                 case TelnetCommand.SE:
407                     synchronized (client) {
408                         client.processSuboption(suboption, suboptionCount);
409                         client.flushOutputStream();
410                     }
411                     receiveState = STATE_DATA;
412                     continue;
413                 case TelnetCommand.IAC: // De-dup the duplicated IAC
414                     if (suboptionCount < suboption.length) {
415                         suboption[suboptionCount++] = ch;
416                     }
417                     break;
418                 default: // unexpected byte! ignore it
419                     break;
420                 }
421                 receiveState = STATE_SB;
422                 continue;
423             /* TERMINAL-TYPE option (end) */
424             }
425 
426             break;
427         }
428 
429         return ch;
430     }
431 
432     /**
433      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
434      *
435      * @param buffer The byte array in which to store the data.
436      * @return The number of bytes read. Returns -1 if the end of the message has been reached.
437      * @throws IOException If an error occurs in reading the underlying stream.
438      */
439     @Override
440     public int read(final byte buffer[]) throws IOException {
441         return read(buffer, 0, buffer.length);
442     }
443 
444     /**
445      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
446      * The characters are stored in the array starting from the given offset and up to the length specified.
447      *
448      * @param buffer The byte array in which to store the data.
449      * @param offset The offset into the array at which to start storing data.
450      * @param length The number of bytes to read.
451      * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
452      * @throws IOException If an error occurs while reading the underlying stream.
453      */
454     @Override
455     public int read(final byte buffer[], int offset, int length) throws IOException {
456         int ch;
457         final int off;
458 
459         if (length < 1) {
460             return 0;
461         }
462 
463         // Critical section because run() may change bytesAvailable
464         synchronized (queue) {
465             if (length > bytesAvailable) {
466                 length = bytesAvailable;
467             }
468         }
469 
470         if ((ch = read()) == EOF) {
471             return EOF;
472         }
473 
474         off = offset;
475 
476         do {
477             buffer[offset++] = (byte) ch;
478         } while (--length > 0 && (ch = read()) != EOF);
479 
480         // client._spyRead(buffer, off, offset - off);
481         return offset - off;
482     }
483 
484     @Override
485     public void run() {
486         int ch;
487 
488         try {
489             _outerLoop: while (!isClosed) {
490                 try {
491                     if ((ch = read(true)) < 0) {
492                         break;
493                     }
494                 } catch (final InterruptedIOException e) {
495                     synchronized (queue) {
496                         ioException = e;
497                         queue.notifyAll();
498                         try {
499                             queue.wait(100);
500                         } catch (final InterruptedException interrupted) {
501                             if (isClosed) {
502                                 break _outerLoop;
503                             }
504                         }
505                         continue;
506                     }
507                 } catch (final RuntimeException re) {
508                     // We treat any runtime exceptions as though the
509                     // stream has been closed. We close the
510                     // underlying stream just to be sure.
511                     super.close();
512                     // Breaking the loop has the effect of setting
513                     // the state to closed at the end of the method.
514                     break _outerLoop;
515                 }
516 
517                 // Process new character
518                 boolean notify = false;
519                 try {
520                     notify = processChar(ch);
521                 } catch (final InterruptedException e) {
522                     if (isClosed) {
523                         break _outerLoop;
524                     }
525                 }
526 
527                 // Notify input listener if buffer was previously empty
528                 if (notify) {
529                     client.notifyInputListener();
530                 }
531             }
532         } catch (final IOException ioe) {
533             synchronized (queue) {
534                 ioException = ioe;
535             }
536             client.notifyInputListener();
537         }
538 
539         synchronized (queue) {
540             isClosed = true; // Possibly redundant
541             hasReachedEOF = true;
542             queue.notify();
543         }
544 
545         threaded = false;
546     }
547 
548     void start() {
549         if (thread == null) {
550             return;
551         }
552 
553         int priority;
554         isClosed = false;
555         // TODO remove this
556         // Need to set a higher priority in case JVM does not use pre-emptive
557         // threads. This should prevent scheduler induced deadlock (rather than
558         // deadlock caused by a bug in this code).
559         priority = Thread.currentThread().getPriority() + 1;
560         if (priority > Thread.MAX_PRIORITY) {
561             priority = Thread.MAX_PRIORITY;
562         }
563         thread.setPriority(priority);
564         thread.setDaemon(true);
565         thread.start();
566         threaded = true; // tell _processChar that we are running threaded
567     }
568 }