View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      https://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  final class TelnetInputStream extends BufferedInputStream implements Runnable {
26      /** End of file has been reached */
27      private static final int EOF = -1;
28  
29      /** Read would block */
30      private static final int WOULD_BLOCK = -2;
31  
32      // TODO should these be private enums?
33      static final int STATE_DATA = 0;
34      static final int STATE_IAC = 1;
35      static final int STATE_WILL = 2;
36      static final int STATE_WONT = 3;
37      static final int STATE_DO = 4;
38      static final int STATE_DONT = 5;
39      static final int STATE_SB = 6;
40      static final int STATE_SE = 7;
41      static final int STATE_CR = 8;
42      static final int STATE_IAC_SB = 9;
43      private boolean hasReachedEOF; // @GuardedBy("queue")
44      private volatile boolean isClosed;
45      private boolean readIsWaiting;
46      private int receiveState;
47      private int queueHead;
48      private int queueTail;
49      private int bytesAvailable;
50      private final int[] queue;
51      private final TelnetClient client;
52      private final Thread thread;
53      private IOException ioException;
54  
55      /* TERMINAL-TYPE option (start) */
56      private final int suboption[];
57      private int suboptionCount;
58      /* TERMINAL-TYPE option (end) */
59  
60      private volatile boolean threaded;
61  
62      TelnetInputStream(final InputStream input, final TelnetClient client) {
63          this(input, client, true);
64      }
65  
66      TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
67          super(input);
68          this.client = client;
69          this.receiveState = STATE_DATA;
70          this.isClosed = true;
71          this.hasReachedEOF = false;
72          // Make it 2049, because when full, one slot will go unused, and we
73          // want a 2048 byte buffer just to have a round number (base 2 that is)
74          this.queue = new int[2049];
75          this.queueHead = 0;
76          this.queueTail = 0;
77          this.suboption = new int[client.maxSubnegotiationLength];
78          this.bytesAvailable = 0;
79          this.ioException = null;
80          this.readIsWaiting = false;
81          this.threaded = false;
82          if (readerThread) {
83              this.thread = new Thread(this);
84          } else {
85              this.thread = null;
86          }
87      }
88  
89      @Override
90      public int available() throws IOException {
91          // Critical section because run() may change bytesAvailable
92          synchronized (queue) {
93              if (threaded) { // Must not call super.available when running threaded: NET-466
94                  return bytesAvailable;
95              }
96              return bytesAvailable + super.available();
97          }
98      }
99  
100     // Cannot be synchronized. Will cause deadlock if run() is blocked
101     // in read because BufferedInputStream read() is synchronized.
102     @Override
103     public void close() throws IOException {
104         // Completely disregard the fact thread may still be running.
105         // We can't afford to block on this close by waiting for
106         // thread to terminate because few if any JVM's will actually
107         // interrupt a system read() from the interrupt() method.
108         super.close();
109 
110         synchronized (queue) {
111             hasReachedEOF = true;
112             isClosed = true;
113 
114             if (thread != null && thread.isAlive()) {
115                 thread.interrupt();
116             }
117 
118             queue.notifyAll();
119         }
120 
121     }
122 
123     /** Returns false. Mark is not supported. */
124     @Override
125     public boolean markSupported() {
126         return false;
127     }
128 
129     // synchronized(client) critical sections are to protect against
130     // TelnetOutputStream writing through the Telnet client at same time
131     // as a processDo/Will/etc. command invoked from TelnetInputStream
132     // tries to write. Returns true if buffer was previously empty.
133     private boolean processChar(final int ch) throws InterruptedException {
134         // Critical section because we're altering bytesAvailable,
135         // queueTail, and the contents of _queue.
136         final boolean bufferWasEmpty;
137         synchronized (queue) {
138             bufferWasEmpty = bytesAvailable == 0;
139             while (bytesAvailable >= queue.length - 1) {
140                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
141                 // will consume some data soon!
142                 if (!threaded) {
143                     // We've been asked to add another character to the queue, but it is already full and there's
144                     // no other thread to drain it. This should not have happened!
145                     throw new IllegalStateException("Queue is full! Cannot process another character.");
146                 }
147                 queue.notify();
148                 try {
149                     queue.wait();
150                 } catch (final InterruptedException e) {
151                     throw e;
152                 }
153             }
154 
155             // Need to do this in case we're not full, but block on a read
156             if (readIsWaiting && threaded) {
157                 queue.notify();
158             }
159 
160             queue[queueTail] = ch;
161             ++bytesAvailable;
162 
163             if (++queueTail >= queue.length) {
164                 queueTail = 0;
165             }
166         }
167         return bufferWasEmpty;
168     }
169 
170     @Override
171     public int read() throws IOException {
172         // Critical section because we're altering bytesAvailable,
173         // queueHead, and the contents of _queue in addition to
174         // testing value of hasReachedEOF.
175         synchronized (queue) {
176 
177             while (true) {
178                 if (ioException != null) {
179                     final IOException e;
180                     e = ioException;
181                     ioException = null;
182                     throw e;
183                 }
184 
185                 if (bytesAvailable == 0) {
186                     // Return EOF if at end of file
187                     if (hasReachedEOF) {
188                         return EOF;
189                     }
190 
191                     // Otherwise, we have to wait for queue to get something
192                     if (threaded) {
193                         queue.notify();
194                         try {
195                             readIsWaiting = true;
196                             queue.wait();
197                             readIsWaiting = false;
198                         } catch (final InterruptedException e) {
199                             throw new InterruptedIOException("Fatal thread interruption during read.");
200                         }
201                     } else {
202                         // alreadyread = false;
203                         readIsWaiting = true;
204                         int ch;
205                         boolean mayBlock = true; // block on the first read only
206 
207                         do {
208                             try {
209                                 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
210                                     // must be EOF
211                                     return ch;
212                                 }
213                             } catch (final InterruptedIOException e) {
214                                 synchronized (queue) {
215                                     ioException = e;
216                                     queue.notifyAll();
217                                     try {
218                                         queue.wait(100);
219                                     } catch (final InterruptedException interrupted) {
220                                         // Ignored
221                                     }
222                                 }
223                                 return EOF;
224                             }
225 
226                             try {
227                                 if (ch != WOULD_BLOCK) {
228                                     processChar(ch);
229                                 }
230                             } catch (final InterruptedException e) {
231                                 if (isClosed) {
232                                     return EOF;
233                                 }
234                             }
235 
236                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
237                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
238                             mayBlock = false;
239 
240                         }
241                         // Continue reading as long as there is data available and the queue is not full.
242                         while (super.available() > 0 && bytesAvailable < queue.length - 1);
243 
244                         readIsWaiting = false;
245                     }
246                     continue;
247                 }
248                 final int ch;
249 
250                 ch = queue[queueHead];
251 
252                 if (++queueHead >= queue.length) {
253                     queueHead = 0;
254                 }
255 
256                 --bytesAvailable;
257 
258                 // Need to explicitly notify() so available() works properly
259                 if (bytesAvailable == 0 && threaded) {
260                     queue.notify();
261                 }
262 
263                 return ch; // NOPMD TODO?
264             }
265         }
266     }
267 
268     // synchronized(client) critical sections are to protect against
269     // TelnetOutputStream writing through the Telnet client at same time
270     // as a processDo/Will/etc. command invoked from TelnetInputStream
271     // tries to write.
272     /**
273      * Gets the next byte of data. IAC commands are processed internally and do not return data.
274      *
275      * @param mayBlock true if method is allowed to block
276      * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
277      */
278     private int read(final boolean mayBlock) throws IOException {
279         int ch;
280 
281         while (true) {
282 
283             // If there is no more data AND we were told not to block,
284             // just return WOULD_BLOCK (-2). (More efficient than exception.)
285             if (!mayBlock && super.available() == 0) {
286                 return WOULD_BLOCK;
287             }
288 
289             // Otherwise, exit only when we reach end of stream.
290             if ((ch = super.read()) < 0) {
291                 return EOF;
292             }
293 
294             ch &= 0xff;
295 
296             /* Code Section added for supporting AYT (start) */
297             synchronized (client) {
298                 client.processAYTResponse();
299             }
300             /* Code Section added for supporting AYT (end) */
301 
302             /* Code Section added for supporting spystreams (start) */
303             client.spyRead(ch);
304             /* Code Section added for supporting spystreams (end) */
305 
306             switch (receiveState) {
307 
308             case STATE_CR:
309                 if (ch == Telnet.NUL) {
310                     // Strip null
311                     continue;
312                 }
313                 // How do we handle newline after cr?
314                 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
315 
316                 // Handle as normal data by falling through to _STATE_DATA case
317 
318                 // falls through$
319             case STATE_DATA:
320                 if (ch == TelnetCommand.IAC) {
321                     receiveState = STATE_IAC;
322                     continue;
323                 }
324 
325                 if (ch == '\r') {
326                     synchronized (client) {
327                         if (client.requestedDont(TelnetOption.BINARY)) {
328                             receiveState = STATE_CR;
329                         } else {
330                             receiveState = STATE_DATA;
331                         }
332                     }
333                 } else {
334                     receiveState = STATE_DATA;
335                 }
336                 break;
337 
338             case STATE_IAC:
339                 switch (ch) {
340                 case TelnetCommand.WILL:
341                     receiveState = STATE_WILL;
342                     continue;
343                 case TelnetCommand.WONT:
344                     receiveState = STATE_WONT;
345                     continue;
346                 case TelnetCommand.DO:
347                     receiveState = STATE_DO;
348                     continue;
349                 case TelnetCommand.DONT:
350                     receiveState = STATE_DONT;
351                     continue;
352                 /* TERMINAL-TYPE option (start) */
353                 case TelnetCommand.SB:
354                     suboptionCount = 0;
355                     receiveState = STATE_SB;
356                     continue;
357                 /* TERMINAL-TYPE option (end) */
358                 case TelnetCommand.IAC:
359                     receiveState = STATE_DATA;
360                     break; // exit to enclosing switch to return IAC from read
361                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
362                     receiveState = STATE_DATA;
363                     continue;
364                 default:
365                     receiveState = STATE_DATA;
366                     client.processCommand(ch); // Notify the user
367                     continue; // move on the next char
368                 }
369                 break; // exit and return from read
370             case STATE_WILL:
371                 synchronized (client) {
372                     client.processWill(ch);
373                     client.flushOutputStream();
374                 }
375                 receiveState = STATE_DATA;
376                 continue;
377             case STATE_WONT:
378                 synchronized (client) {
379                     client.processWont(ch);
380                     client.flushOutputStream();
381                 }
382                 receiveState = STATE_DATA;
383                 continue;
384             case STATE_DO:
385                 synchronized (client) {
386                     client.processDo(ch);
387                     client.flushOutputStream();
388                 }
389                 receiveState = STATE_DATA;
390                 continue;
391             case STATE_DONT:
392                 synchronized (client) {
393                     client.processDont(ch);
394                     client.flushOutputStream();
395                 }
396                 receiveState = STATE_DATA;
397                 continue;
398             /* TERMINAL-TYPE option (start) */
399             case STATE_SB:
400                 switch (ch) {
401                 case TelnetCommand.IAC:
402                     receiveState = STATE_IAC_SB;
403                     continue;
404                 default:
405                     // store suboption char
406                     if (suboptionCount < suboption.length) {
407                         suboption[suboptionCount++] = ch;
408                     }
409                     break;
410                 }
411                 receiveState = STATE_SB;
412                 continue;
413             case STATE_IAC_SB: // IAC received during SB phase
414                 switch (ch) {
415                 case TelnetCommand.SE:
416                     synchronized (client) {
417                         client.processSuboption(suboption, suboptionCount);
418                         client.flushOutputStream();
419                     }
420                     receiveState = STATE_DATA;
421                     continue;
422                 case TelnetCommand.IAC: // De-dup the duplicated IAC
423                     if (suboptionCount < suboption.length) {
424                         suboption[suboptionCount++] = ch;
425                     }
426                     break;
427                 default: // unexpected byte! ignore it
428                     break;
429                 }
430                 receiveState = STATE_SB;
431                 continue;
432             /* TERMINAL-TYPE option (end) */
433             }
434 
435             break; // NOPMD TODO?
436         }
437 
438         return ch;
439     }
440 
441     /**
442      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
443      *
444      * @param buffer The byte array in which to store the data.
445      * @return The number of bytes read. Returns -1 if the end of the message has been reached.
446      * @throws IOException If an error occurs in reading the underlying stream.
447      */
448     @Override
449     public int read(final byte buffer[]) throws IOException {
450         return read(buffer, 0, buffer.length);
451     }
452 
453     /**
454      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
455      * The characters are stored in the array starting from the given offset and up to the length specified.
456      *
457      * @param buffer The byte array in which to store the data.
458      * @param offset The offset into the array at which to start storing data.
459      * @param length The number of bytes to read.
460      * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
461      * @throws IOException If an error occurs while reading the underlying stream.
462      */
463     @Override
464     public int read(final byte buffer[], int offset, int length) throws IOException {
465         int ch;
466         final int off;
467 
468         if (length < 1) {
469             return 0;
470         }
471 
472         // Critical section because run() may change bytesAvailable
473         synchronized (queue) {
474             if (length > bytesAvailable) {
475                 length = bytesAvailable;
476             }
477         }
478 
479         if ((ch = read()) == EOF) {
480             return EOF;
481         }
482 
483         off = offset;
484 
485         do {
486             buffer[offset++] = (byte) ch;
487         } while (--length > 0 && (ch = read()) != EOF);
488 
489         // client._spyRead(buffer, off, offset - off);
490         return offset - off;
491     }
492 
493     @Override
494     public void run() {
495         int ch;
496 
497         try {
498             _outerLoop: while (!isClosed) {
499                 try {
500                     if ((ch = read(true)) < 0) {
501                         break;
502                     }
503                 } catch (final InterruptedIOException e) {
504                     synchronized (queue) {
505                         ioException = e;
506                         queue.notifyAll();
507                         try {
508                             queue.wait(100);
509                         } catch (final InterruptedException interrupted) {
510                             if (isClosed) {
511                                 break _outerLoop;
512                             }
513                         }
514                         continue;
515                     }
516                 } catch (final RuntimeException re) {
517                     // We treat any runtime exceptions as though the
518                     // stream has been closed. We close the
519                     // underlying stream just to be sure.
520                     super.close();
521                     // Breaking the loop has the effect of setting
522                     // the state to closed at the end of the method.
523                     break _outerLoop;
524                 }
525 
526                 // Process new character
527                 boolean notify = false;
528                 try {
529                     notify = processChar(ch);
530                 } catch (final InterruptedException e) {
531                     if (isClosed) {
532                         break _outerLoop;
533                     }
534                 }
535 
536                 // Notify input listener if buffer was previously empty
537                 if (notify) {
538                     client.notifyInputListener();
539                 }
540             }
541         } catch (final IOException ioe) {
542             synchronized (queue) {
543                 ioException = ioe;
544             }
545             client.notifyInputListener();
546         }
547 
548         synchronized (queue) {
549             isClosed = true; // Possibly redundant
550             hasReachedEOF = true;
551             queue.notify();
552         }
553 
554         threaded = false;
555     }
556 
557     void start() {
558         if (thread == null) {
559             return;
560         }
561 
562         int priority;
563         isClosed = false;
564         // TODO remove this
565         // Need to set a higher priority in case JVM does not use pre-emptive
566         // threads. This should prevent scheduler induced deadlock (rather than
567         // deadlock caused by a bug in this code).
568         priority = Thread.currentThread().getPriority() + 1;
569         if (priority > Thread.MAX_PRIORITY) {
570             priority = Thread.MAX_PRIORITY;
571         }
572         thread.setPriority(priority);
573         thread.setDaemon(true);
574         thread.start();
575         threaded = true; // tell _processChar that we are running threaded
576     }
577 }