View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  final class TelnetInputStream extends BufferedInputStream implements Runnable
26  {
27      /** End of file has been reached */
28      private static final int EOF = -1;
29  
30      /** Read would block */
31      private static final int WOULD_BLOCK = -2;
32  
33      // TODO should these be private enums?
34      static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
35                       _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
36                       _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
37  
38      private boolean __hasReachedEOF; // @GuardedBy("__queue")
39      private volatile boolean __isClosed;
40      private boolean __readIsWaiting;
41      private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
42      private final int[] __queue;
43      private final TelnetClient __client;
44      private final Thread __thread;
45      private IOException __ioException;
46  
47      /* TERMINAL-TYPE option (start)*/
48      private final int __suboption[] = new int[512];
49      private int __suboption_count = 0;
50      /* TERMINAL-TYPE option (end)*/
51  
52      private volatile boolean __threaded;
53  
54      TelnetInputStream(InputStream input, TelnetClient client,
55                        boolean readerThread)
56      {
57          super(input);
58          __client = client;
59          __receiveState = _STATE_DATA;
60          __isClosed = true;
61          __hasReachedEOF = false;
62          // Make it 2049, because when full, one slot will go unused, and we
63          // want a 2048 byte buffer just to have a round number (base 2 that is)
64          __queue = new int[2049];
65          __queueHead = 0;
66          __queueTail = 0;
67          __bytesAvailable = 0;
68          __ioException = null;
69          __readIsWaiting = false;
70          __threaded = false;
71          if(readerThread) {
72              __thread = new Thread(this);
73          } else {
74              __thread = null;
75          }
76      }
77  
78      TelnetInputStream(InputStream input, TelnetClient client) {
79          this(input, client, true);
80      }
81  
82      void _start()
83      {
84          if(__thread == null) {
85              return;
86          }
87  
88          int priority;
89          __isClosed = false;
90          // TODO remove this
91          // Need to set a higher priority in case JVM does not use pre-emptive
92          // threads.  This should prevent scheduler induced deadlock (rather than
93          // deadlock caused by a bug in this code).
94          priority = Thread.currentThread().getPriority() + 1;
95          if (priority > Thread.MAX_PRIORITY) {
96              priority = Thread.MAX_PRIORITY;
97          }
98          __thread.setPriority(priority);
99          __thread.setDaemon(true);
100         __thread.start();
101         __threaded = true; // tell _processChar that we are running threaded
102     }
103 
104 
105     // synchronized(__client) critical sections are to protect against
106     // TelnetOutputStream writing through the telnet client at same time
107     // as a processDo/Will/etc. command invoked from TelnetInputStream
108     // tries to write.
109     /**
110      * Get the next byte of data.
111      * IAC commands are processed internally and do not return data.
112      *
113      * @param mayBlock true if method is allowed to block
114      * @return the next byte of data,
115      * or -1 (EOF) if end of stread reached,
116      * or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
117      */
118     private int __read(boolean mayBlock) throws IOException
119     {
120         int ch;
121 
122         while (true)
123         {
124 
125             // If there is no more data AND we were told not to block,
126             // just return WOULD_BLOCK (-2). (More efficient than exception.)
127             if(!mayBlock && super.available() == 0) {
128                 return WOULD_BLOCK;
129             }
130 
131             // Otherwise, exit only when we reach end of stream.
132             if ((ch = super.read()) < 0) {
133                 return EOF;
134             }
135 
136             ch = (ch & 0xff);
137 
138             /* Code Section added for supporting AYT (start)*/
139             synchronized (__client)
140             {
141                 __client._processAYTResponse();
142             }
143             /* Code Section added for supporting AYT (end)*/
144 
145             /* Code Section added for supporting spystreams (start)*/
146             __client._spyRead(ch);
147             /* Code Section added for supporting spystreams (end)*/
148 
149             switch (__receiveState)
150             {
151 
152             case _STATE_CR:
153                 if (ch == '\0')
154                 {
155                     // Strip null
156                     continue;
157                 }
158                 // How do we handle newline after cr?
159                 //  else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
160 
161                 // Handle as normal data by falling through to _STATE_DATA case
162 
163                 //$FALL-THROUGH$
164             case _STATE_DATA:
165                 if (ch == TelnetCommand.IAC)
166                 {
167                     __receiveState = _STATE_IAC;
168                     continue;
169                 }
170 
171 
172                 if (ch == '\r')
173                 {
174                     synchronized (__client)
175                     {
176                         if (__client._requestedDont(TelnetOption.BINARY)) {
177                             __receiveState = _STATE_CR;
178                         } else {
179                             __receiveState = _STATE_DATA;
180                         }
181                     }
182                 } else {
183                     __receiveState = _STATE_DATA;
184                 }
185                 break;
186 
187             case _STATE_IAC:
188                 switch (ch)
189                 {
190                 case TelnetCommand.WILL:
191                     __receiveState = _STATE_WILL;
192                     continue;
193                 case TelnetCommand.WONT:
194                     __receiveState = _STATE_WONT;
195                     continue;
196                 case TelnetCommand.DO:
197                     __receiveState = _STATE_DO;
198                     continue;
199                 case TelnetCommand.DONT:
200                     __receiveState = _STATE_DONT;
201                     continue;
202                 /* TERMINAL-TYPE option (start)*/
203                 case TelnetCommand.SB:
204                     __suboption_count = 0;
205                     __receiveState = _STATE_SB;
206                     continue;
207                 /* TERMINAL-TYPE option (end)*/
208                 case TelnetCommand.IAC:
209                     __receiveState = _STATE_DATA;
210                     break; // exit to enclosing switch to return IAC from read
211                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
212                     __receiveState = _STATE_DATA;
213                     continue;
214                 default:
215                     __receiveState = _STATE_DATA;
216                     __client._processCommand(ch); // Notify the user
217                     continue; // move on the next char
218                 }
219                 break; // exit and return from read
220             case _STATE_WILL:
221                 synchronized (__client)
222                 {
223                     __client._processWill(ch);
224                     __client._flushOutputStream();
225                 }
226                 __receiveState = _STATE_DATA;
227                 continue;
228             case _STATE_WONT:
229                 synchronized (__client)
230                 {
231                     __client._processWont(ch);
232                     __client._flushOutputStream();
233                 }
234                 __receiveState = _STATE_DATA;
235                 continue;
236             case _STATE_DO:
237                 synchronized (__client)
238                 {
239                     __client._processDo(ch);
240                     __client._flushOutputStream();
241                 }
242                 __receiveState = _STATE_DATA;
243                 continue;
244             case _STATE_DONT:
245                 synchronized (__client)
246                 {
247                     __client._processDont(ch);
248                     __client._flushOutputStream();
249                 }
250                 __receiveState = _STATE_DATA;
251                 continue;
252             /* TERMINAL-TYPE option (start)*/
253             case _STATE_SB:
254                 switch (ch)
255                 {
256                 case TelnetCommand.IAC:
257                     __receiveState = _STATE_IAC_SB;
258                     continue;
259                 default:
260                     // store suboption char
261                     if (__suboption_count < __suboption.length) {
262                         __suboption[__suboption_count++] = ch;
263                     }
264                     break;
265                 }
266                 __receiveState = _STATE_SB;
267                 continue;
268             case _STATE_IAC_SB: // IAC received during SB phase
269                 switch (ch)
270                 {
271                 case TelnetCommand.SE:
272                     synchronized (__client)
273                     {
274                         __client._processSuboption(__suboption, __suboption_count);
275                         __client._flushOutputStream();
276                     }
277                     __receiveState = _STATE_DATA;
278                     continue;
279                 case TelnetCommand.IAC: // De-dup the duplicated IAC
280                     if (__suboption_count < __suboption.length) {
281                         __suboption[__suboption_count++] = ch;
282                     }
283                     break;
284                 default:            // unexpected byte! ignore it
285                     break;
286                 }
287                 __receiveState = _STATE_SB;
288                 continue;
289             /* TERMINAL-TYPE option (end)*/
290             }
291 
292             break;
293         }
294 
295         return ch;
296     }
297 
298     // synchronized(__client) critical sections are to protect against
299     // TelnetOutputStream writing through the telnet client at same time
300     // as a processDo/Will/etc. command invoked from TelnetInputStream
301     // tries to write. Returns true if buffer was previously empty.
302     private boolean __processChar(int ch) throws InterruptedException
303     {
304         // Critical section because we're altering __bytesAvailable,
305         // __queueTail, and the contents of _queue.
306         boolean bufferWasEmpty;
307         synchronized (__queue)
308         {
309             bufferWasEmpty = (__bytesAvailable == 0);
310             while (__bytesAvailable >= __queue.length - 1)
311             {
312                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
313                 // will consume some data soon!
314                 if(__threaded)
315                 {
316                     __queue.notify();
317                     try
318                     {
319                         __queue.wait();
320                     }
321                     catch (InterruptedException e)
322                     {
323                         throw e;
324                     }
325                 }
326                 else
327                 {
328                     // We've been asked to add another character to the queue, but it is already full and there's
329                     // no other thread to drain it. This should not have happened!
330                     throw new IllegalStateException("Queue is full! Cannot process another character.");
331                 }
332             }
333 
334             // Need to do this in case we're not full, but block on a read
335             if (__readIsWaiting && __threaded)
336             {
337                 __queue.notify();
338             }
339 
340             __queue[__queueTail] = ch;
341             ++__bytesAvailable;
342 
343             if (++__queueTail >= __queue.length) {
344                 __queueTail = 0;
345             }
346         }
347         return bufferWasEmpty;
348     }
349 
350     @Override
351     public int read() throws IOException
352     {
353         // Critical section because we're altering __bytesAvailable,
354         // __queueHead, and the contents of _queue in addition to
355         // testing value of __hasReachedEOF.
356         synchronized (__queue)
357         {
358 
359             while (true)
360             {
361                 if (__ioException != null)
362                 {
363                     IOException e;
364                     e = __ioException;
365                     __ioException = null;
366                     throw e;
367                 }
368 
369                 if (__bytesAvailable == 0)
370                 {
371                     // Return EOF if at end of file
372                     if (__hasReachedEOF) {
373                         return EOF;
374                     }
375 
376                     // Otherwise, we have to wait for queue to get something
377                     if(__threaded)
378                     {
379                         __queue.notify();
380                         try
381                         {
382                             __readIsWaiting = true;
383                             __queue.wait();
384                             __readIsWaiting = false;
385                         }
386                         catch (InterruptedException e)
387                         {
388                             throw new InterruptedIOException("Fatal thread interruption during read.");
389                         }
390                     }
391                     else
392                     {
393                         //__alreadyread = false;
394                         __readIsWaiting = true;
395                         int ch;
396                         boolean mayBlock = true;    // block on the first read only
397 
398                         do
399                         {
400                             try
401                             {
402                                 if ((ch = __read(mayBlock)) < 0) { // must be EOF
403                                     if(ch != WOULD_BLOCK) {
404                                         return (ch);
405                                     }
406                                 }
407                             }
408                             catch (InterruptedIOException e)
409                             {
410                                 synchronized (__queue)
411                                 {
412                                     __ioException = e;
413                                     __queue.notifyAll();
414                                     try
415                                     {
416                                         __queue.wait(100);
417                                     }
418                                     catch (InterruptedException interrupted)
419                                     {
420                                         // Ignored
421                                     }
422                                 }
423                                 return EOF;
424                             }
425 
426 
427                             try
428                             {
429                                 if(ch != WOULD_BLOCK)
430                                 {
431                                     __processChar(ch);
432                                 }
433                             }
434                             catch (InterruptedException e)
435                             {
436                                 if (__isClosed) {
437                                     return EOF;
438                                 }
439                             }
440 
441                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
442                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
443                             mayBlock = false;
444 
445                         }
446                         // Continue reading as long as there is data available and the queue is not full.
447                         while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
448 
449                         __readIsWaiting = false;
450                     }
451                     continue;
452                 }
453                 else
454                 {
455                     int ch;
456 
457                     ch = __queue[__queueHead];
458 
459                     if (++__queueHead >= __queue.length) {
460                         __queueHead = 0;
461                     }
462 
463                     --__bytesAvailable;
464 
465             // Need to explicitly notify() so available() works properly
466             if(__bytesAvailable == 0 && __threaded) {
467                 __queue.notify();
468             }
469 
470                     return ch;
471                 }
472             }
473         }
474     }
475 
476 
477     /***
478      * Reads the next number of bytes from the stream into an array and
479      * returns the number of bytes read.  Returns -1 if the end of the
480      * stream has been reached.
481      * <p>
482      * @param buffer  The byte array in which to store the data.
483      * @return The number of bytes read. Returns -1 if the
484      *          end of the message has been reached.
485      * @exception IOException If an error occurs in reading the underlying
486      *            stream.
487      ***/
488     @Override
489     public int read(byte buffer[]) throws IOException
490     {
491         return read(buffer, 0, buffer.length);
492     }
493 
494 
495     /***
496      * Reads the next number of bytes from the stream into an array and returns
497      * the number of bytes read.  Returns -1 if the end of the
498      * message has been reached.  The characters are stored in the array
499      * starting from the given offset and up to the length specified.
500      * <p>
501      * @param buffer The byte array in which to store the data.
502      * @param offset  The offset into the array at which to start storing data.
503      * @param length   The number of bytes to read.
504      * @return The number of bytes read. Returns -1 if the
505      *          end of the stream has been reached.
506      * @exception IOException If an error occurs while reading the underlying
507      *            stream.
508      ***/
509     @Override
510     public int read(byte buffer[], int offset, int length) throws IOException
511     {
512         int ch, off;
513 
514         if (length < 1) {
515             return 0;
516         }
517 
518         // Critical section because run() may change __bytesAvailable
519         synchronized (__queue)
520         {
521             if (length > __bytesAvailable) {
522                 length = __bytesAvailable;
523             }
524         }
525 
526         if ((ch = read()) == EOF) {
527             return EOF;
528         }
529 
530         off = offset;
531 
532         do
533         {
534             buffer[offset++] = (byte)ch;
535         }
536         while (--length > 0 && (ch = read()) != EOF);
537 
538         //__client._spyRead(buffer, off, offset - off);
539         return (offset - off);
540     }
541 
542 
543     /*** Returns false.  Mark is not supported. ***/
544     @Override
545     public boolean markSupported()
546     {
547         return false;
548     }
549 
550     @Override
551     public int available() throws IOException
552     {
553         // Critical section because run() may change __bytesAvailable
554         synchronized (__queue)
555         {
556             if (__threaded) { // Must not call super.available when running threaded: NET-466
557                 return __bytesAvailable;
558             } else {
559                 return __bytesAvailable + super.available();
560             }
561         }
562     }
563 
564 
565     // Cannot be synchronized.  Will cause deadlock if run() is blocked
566     // in read because BufferedInputStream read() is synchronized.
567     @Override
568     public void close() throws IOException
569     {
570         // Completely disregard the fact thread may still be running.
571         // We can't afford to block on this close by waiting for
572         // thread to terminate because few if any JVM's will actually
573         // interrupt a system read() from the interrupt() method.
574         super.close();
575 
576         synchronized (__queue)
577         {
578             __hasReachedEOF = true;
579             __isClosed      = true;
580 
581             if (__thread != null && __thread.isAlive())
582             {
583                 __thread.interrupt();
584             }
585 
586             __queue.notifyAll();
587         }
588 
589     }
590 
591     @Override
592     public void run()
593     {
594         int ch;
595 
596         try
597         {
598 _outerLoop:
599             while (!__isClosed)
600             {
601                 try
602                 {
603                     if ((ch = __read(true)) < 0) {
604                         break;
605                     }
606                 }
607                 catch (InterruptedIOException e)
608                 {
609                     synchronized (__queue)
610                     {
611                         __ioException = e;
612                         __queue.notifyAll();
613                         try
614                         {
615                             __queue.wait(100);
616                         }
617                         catch (InterruptedException interrupted)
618                         {
619                             if (__isClosed) {
620                                 break _outerLoop;
621                             }
622                         }
623                         continue;
624                     }
625                 } catch(RuntimeException re) {
626                     // We treat any runtime exceptions as though the
627                     // stream has been closed.  We close the
628                     // underlying stream just to be sure.
629                     super.close();
630                     // Breaking the loop has the effect of setting
631                     // the state to closed at the end of the method.
632                     break _outerLoop;
633                 }
634 
635                 // Process new character
636                 boolean notify = false;
637                 try
638                 {
639                     notify = __processChar(ch);
640                 }
641                 catch (InterruptedException e)
642                 {
643                     if (__isClosed) {
644                         break _outerLoop;
645                     }
646                 }
647 
648                 // Notify input listener if buffer was previously empty
649                 if (notify) {
650                     __client.notifyInputListener();
651                 }
652             }
653         }
654         catch (IOException ioe)
655         {
656             synchronized (__queue)
657             {
658                 __ioException = ioe;
659             }
660             __client.notifyInputListener();
661         }
662 
663         synchronized (__queue)
664         {
665             __isClosed      = true; // Possibly redundant
666             __hasReachedEOF = true;
667             __queue.notify();
668         }
669 
670         __threaded = false;
671     }
672 }
673 
674 /* Emacs configuration
675  * Local variables:        **
676  * mode:             java  **
677  * c-basic-offset:   4     **
678  * indent-tabs-mode: nil   **
679  * End:                    **
680  */