View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.commons.net.telnet;
19  
20  import java.io.BufferedInputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.InterruptedIOException;
24  
25  /***
26   *
27   * <p>
28   *
29   * <p>
30   * <p>
31   * @author Bruno D'Avanzo
32   ***/
33  
34  
35  final class TelnetInputStream extends BufferedInputStream implements Runnable
36  {
37      /** End of file has been reached */
38      private static final int EOF = -1;
39  
40      /** Read would block */
41      private static final int WOULD_BLOCK = -2;
42  
43      // TODO should these be private enums?
44      static final int _STATE_DATA = 0, _STATE_IAC = 1, _STATE_WILL = 2,
45                       _STATE_WONT = 3, _STATE_DO = 4, _STATE_DONT = 5,
46                       _STATE_SB = 6, _STATE_SE = 7, _STATE_CR = 8, _STATE_IAC_SB = 9;
47  
48      private boolean __hasReachedEOF; // @GuardedBy("__queue")
49      private volatile boolean __isClosed;
50      private boolean __readIsWaiting;
51      private int __receiveState, __queueHead, __queueTail, __bytesAvailable;
52      private final int[] __queue;
53      private final TelnetClient __client;
54      private final Thread __thread;
55      private IOException __ioException;
56  
57      /* TERMINAL-TYPE option (start)*/
58      private final int __suboption[] = new int[512];
59      private int __suboption_count = 0;
60      /* TERMINAL-TYPE option (end)*/
61  
62      private volatile boolean __threaded;
63  
64      TelnetInputStream(InputStream input, TelnetClient client,
65                        boolean readerThread)
66      {
67          super(input);
68          __client = client;
69          __receiveState = _STATE_DATA;
70          __isClosed = true;
71          __hasReachedEOF = false;
72          // Make it 2049, because when full, one slot will go unused, and we
73          // want a 2048 byte buffer just to have a round number (base 2 that is)
74          __queue = new int[2049];
75          __queueHead = 0;
76          __queueTail = 0;
77          __bytesAvailable = 0;
78          __ioException = null;
79          __readIsWaiting = false;
80          __threaded = false;
81          if(readerThread) {
82              __thread = new Thread(this);
83          } else {
84              __thread = null;
85          }
86      }
87  
88      TelnetInputStream(InputStream input, TelnetClient client) {
89          this(input, client, true);
90      }
91  
92      void _start()
93      {
94          if(__thread == null) {
95              return;
96          }
97  
98          int priority;
99          __isClosed = false;
100         // TODO remove this
101         // Need to set a higher priority in case JVM does not use pre-emptive
102         // threads.  This should prevent scheduler induced deadlock (rather than
103         // deadlock caused by a bug in this code).
104         priority = Thread.currentThread().getPriority() + 1;
105         if (priority > Thread.MAX_PRIORITY) {
106             priority = Thread.MAX_PRIORITY;
107         }
108         __thread.setPriority(priority);
109         __thread.setDaemon(true);
110         __thread.start();
111         __threaded = true; // tell _processChar that we are running threaded
112     }
113 
114 
115     // synchronized(__client) critical sections are to protect against
116     // TelnetOutputStream writing through the telnet client at same time
117     // as a processDo/Will/etc. command invoked from TelnetInputStream
118     // tries to write.
119     /**
120      * Get the next byte of data.
121      * IAC commands are processed internally and do not return data.
122      *
123      * @param mayBlock true if method is allowed to block
124      * @return the next byte of data,
125      * or -1 (EOF) if end of stread reached,
126      * or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
127      */
128     private int __read(boolean mayBlock) throws IOException
129     {
130         int ch;
131 
132         while (true)
133         {
134 
135             // If there is no more data AND we were told not to block, just return WOULD_BLOCK (-2). (More efficient than exception.)
136             if(!mayBlock && super.available() == 0) {
137                 return WOULD_BLOCK;
138             }
139 
140             // Otherwise, exit only when we reach end of stream.
141             if ((ch = super.read()) < 0) {
142                 return EOF;
143             }
144 
145             ch = (ch & 0xff);
146 
147             /* Code Section added for supporting AYT (start)*/
148             synchronized (__client)
149             {
150                 __client._processAYTResponse();
151             }
152             /* Code Section added for supporting AYT (end)*/
153 
154             /* Code Section added for supporting spystreams (start)*/
155             __client._spyRead(ch);
156             /* Code Section added for supporting spystreams (end)*/
157 
158             switch (__receiveState)
159             {
160 
161             case _STATE_CR:
162                 if (ch == '\0')
163                 {
164                     // Strip null
165                     continue;
166                 }
167                 // How do we handle newline after cr?
168                 //  else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&
169 
170                 // Handle as normal data by falling through to _STATE_DATA case
171 
172                 //$FALL-THROUGH$
173             case _STATE_DATA:
174                 if (ch == TelnetCommand.IAC)
175                 {
176                     __receiveState = _STATE_IAC;
177                     continue;
178                 }
179 
180 
181                 if (ch == '\r')
182                 {
183                     synchronized (__client)
184                     {
185                         if (__client._requestedDont(TelnetOption.BINARY)) {
186                             __receiveState = _STATE_CR;
187                         } else {
188                             __receiveState = _STATE_DATA;
189                         }
190                     }
191                 } else {
192                     __receiveState = _STATE_DATA;
193                 }
194                 break;
195 
196             case _STATE_IAC:
197                 switch (ch)
198                 {
199                 case TelnetCommand.WILL:
200                     __receiveState = _STATE_WILL;
201                     continue;
202                 case TelnetCommand.WONT:
203                     __receiveState = _STATE_WONT;
204                     continue;
205                 case TelnetCommand.DO:
206                     __receiveState = _STATE_DO;
207                     continue;
208                 case TelnetCommand.DONT:
209                     __receiveState = _STATE_DONT;
210                     continue;
211                 /* TERMINAL-TYPE option (start)*/
212                 case TelnetCommand.SB:
213                     __suboption_count = 0;
214                     __receiveState = _STATE_SB;
215                     continue;
216                 /* TERMINAL-TYPE option (end)*/
217                 case TelnetCommand.IAC:
218                     __receiveState = _STATE_DATA;
219                     break; // exit to enclosing switch to return IAC from read
220                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
221                     __receiveState = _STATE_DATA;
222                     continue;
223                 default:
224                     __receiveState = _STATE_DATA;
225                     __client._processCommand(ch); // Notify the user
226                     continue; // move on the next char
227                 }
228                 break; // exit and return from read
229             case _STATE_WILL:
230                 synchronized (__client)
231                 {
232                     __client._processWill(ch);
233                     __client._flushOutputStream();
234                 }
235                 __receiveState = _STATE_DATA;
236                 continue;
237             case _STATE_WONT:
238                 synchronized (__client)
239                 {
240                     __client._processWont(ch);
241                     __client._flushOutputStream();
242                 }
243                 __receiveState = _STATE_DATA;
244                 continue;
245             case _STATE_DO:
246                 synchronized (__client)
247                 {
248                     __client._processDo(ch);
249                     __client._flushOutputStream();
250                 }
251                 __receiveState = _STATE_DATA;
252                 continue;
253             case _STATE_DONT:
254                 synchronized (__client)
255                 {
256                     __client._processDont(ch);
257                     __client._flushOutputStream();
258                 }
259                 __receiveState = _STATE_DATA;
260                 continue;
261             /* TERMINAL-TYPE option (start)*/
262             case _STATE_SB:
263                 switch (ch)
264                 {
265                 case TelnetCommand.IAC:
266                     __receiveState = _STATE_IAC_SB;
267                     continue;
268                 default:
269                     // store suboption char
270                     if (__suboption_count < __suboption.length) {
271                         __suboption[__suboption_count++] = ch;
272                     }
273                     break;
274                 }
275                 __receiveState = _STATE_SB;
276                 continue;
277             case _STATE_IAC_SB: // IAC received during SB phase
278                 switch (ch)
279                 {
280                 case TelnetCommand.SE:
281                     synchronized (__client)
282                     {
283                         __client._processSuboption(__suboption, __suboption_count);
284                         __client._flushOutputStream();
285                     }
286                     __receiveState = _STATE_DATA;
287                     continue;
288                 case TelnetCommand.IAC: // De-dup the duplicated IAC
289                     if (__suboption_count < __suboption.length) {
290                         __suboption[__suboption_count++] = ch;
291                     }
292                     break;
293                 default:            // unexpected byte! ignore it
294                     break;
295                 }
296                 __receiveState = _STATE_SB;
297                 continue;
298             /* TERMINAL-TYPE option (end)*/
299             }
300 
301             break;
302         }
303 
304         return ch;
305     }
306 
307     // synchronized(__client) critical sections are to protect against
308     // TelnetOutputStream writing through the telnet client at same time
309     // as a processDo/Will/etc. command invoked from TelnetInputStream
310     // tries to write. Returns true if buffer was previously empty.
311     private boolean __processChar(int ch) throws InterruptedException
312     {
313         // Critical section because we're altering __bytesAvailable,
314         // __queueTail, and the contents of _queue.
315         boolean bufferWasEmpty;
316         synchronized (__queue)
317         {
318             bufferWasEmpty = (__bytesAvailable == 0);
319             while (__bytesAvailable >= __queue.length - 1)
320             {
321                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
322                 // will consume some data soon!
323                 if(__threaded)
324                 {
325                     __queue.notify();
326                     try
327                     {
328                         __queue.wait();
329                     }
330                     catch (InterruptedException e)
331                     {
332                         throw e;
333                     }
334                 }
335                 else
336                 {
337                     // We've been asked to add another character to the queue, but it is already full and there's
338                     // no other thread to drain it. This should not have happened!
339                     throw new IllegalStateException("Queue is full! Cannot process another character.");
340                 }
341             }
342 
343             // Need to do this in case we're not full, but block on a read
344             if (__readIsWaiting && __threaded)
345             {
346                 __queue.notify();
347             }
348 
349             __queue[__queueTail] = ch;
350             ++__bytesAvailable;
351 
352             if (++__queueTail >= __queue.length) {
353                 __queueTail = 0;
354             }
355         }
356         return bufferWasEmpty;
357     }
358 
359     @Override
360     public int read() throws IOException
361     {
362         // Critical section because we're altering __bytesAvailable,
363         // __queueHead, and the contents of _queue in addition to
364         // testing value of __hasReachedEOF.
365         synchronized (__queue)
366         {
367 
368             while (true)
369             {
370                 if (__ioException != null)
371                 {
372                     IOException e;
373                     e = __ioException;
374                     __ioException = null;
375                     throw e;
376                 }
377 
378                 if (__bytesAvailable == 0)
379                 {
380                     // Return EOF if at end of file
381                     if (__hasReachedEOF) {
382                         return EOF;
383                     }
384 
385                     // Otherwise, we have to wait for queue to get something
386                     if(__threaded)
387                     {
388                         __queue.notify();
389                         try
390                         {
391                             __readIsWaiting = true;
392                             __queue.wait();
393                             __readIsWaiting = false;
394                         }
395                         catch (InterruptedException e)
396                         {
397                             throw new InterruptedIOException("Fatal thread interruption during read.");
398                         }
399                     }
400                     else
401                     {
402                         //__alreadyread = false;
403                         __readIsWaiting = true;
404                         int ch;
405                         boolean mayBlock = true;    // block on the first read only
406 
407                         do
408                         {
409                             try
410                             {
411                                 if ((ch = __read(mayBlock)) < 0) { // must be EOF
412                                     if(ch != WOULD_BLOCK) {
413                                         return (ch);
414                                     }
415                                 }
416                             }
417                             catch (InterruptedIOException e)
418                             {
419                                 synchronized (__queue)
420                                 {
421                                     __ioException = e;
422                                     __queue.notifyAll();
423                                     try
424                                     {
425                                         __queue.wait(100);
426                                     }
427                                     catch (InterruptedException interrupted)
428                                     {
429                                     }
430                                 }
431                                 return EOF;
432                             }
433 
434 
435                             try
436                             {
437                                 if(ch != WOULD_BLOCK)
438                                 {
439                                     __processChar(ch);
440                                 }
441                             }
442                             catch (InterruptedException e)
443                             {
444                                 if (__isClosed) {
445                                     return EOF;
446                                 }
447                             }
448 
449                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
450                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
451                             mayBlock = false;
452 
453                         }
454                         // Continue reading as long as there is data available and the queue is not full.
455                         while (super.available() > 0 && __bytesAvailable < __queue.length - 1);
456 
457                         __readIsWaiting = false;
458                     }
459                     continue;
460                 }
461                 else
462                 {
463                     int ch;
464 
465                     ch = __queue[__queueHead];
466 
467                     if (++__queueHead >= __queue.length) {
468                         __queueHead = 0;
469                     }
470 
471                     --__bytesAvailable;
472 
473             // Need to explicitly notify() so available() works properly
474             if(__bytesAvailable == 0 && __threaded) {
475                 __queue.notify();
476             }
477 
478                     return ch;
479                 }
480             }
481         }
482     }
483 
484 
485     /***
486      * Reads the next number of bytes from the stream into an array and
487      * returns the number of bytes read.  Returns -1 if the end of the
488      * stream has been reached.
489      * <p>
490      * @param buffer  The byte array in which to store the data.
491      * @return The number of bytes read. Returns -1 if the
492      *          end of the message has been reached.
493      * @exception IOException If an error occurs in reading the underlying
494      *            stream.
495      ***/
496     @Override
497     public int read(byte buffer[]) throws IOException
498     {
499         return read(buffer, 0, buffer.length);
500     }
501 
502 
503     /***
504      * Reads the next number of bytes from the stream into an array and returns
505      * the number of bytes read.  Returns -1 if the end of the
506      * message has been reached.  The characters are stored in the array
507      * starting from the given offset and up to the length specified.
508      * <p>
509      * @param buffer The byte array in which to store the data.
510      * @param offset  The offset into the array at which to start storing data.
511      * @param length   The number of bytes to read.
512      * @return The number of bytes read. Returns -1 if the
513      *          end of the stream has been reached.
514      * @exception IOException If an error occurs while reading the underlying
515      *            stream.
516      ***/
517     @Override
518     public int read(byte buffer[], int offset, int length) throws IOException
519     {
520         int ch, off;
521 
522         if (length < 1) {
523             return 0;
524         }
525 
526         // Critical section because run() may change __bytesAvailable
527         synchronized (__queue)
528         {
529             if (length > __bytesAvailable) {
530                 length = __bytesAvailable;
531             }
532         }
533 
534         if ((ch = read()) == EOF) {
535             return EOF;
536         }
537 
538         off = offset;
539 
540         do
541         {
542             buffer[offset++] = (byte)ch;
543         }
544         while (--length > 0 && (ch = read()) != EOF);
545 
546         //__client._spyRead(buffer, off, offset - off);
547         return (offset - off);
548     }
549 
550 
551     /*** Returns false.  Mark is not supported. ***/
552     @Override
553     public boolean markSupported()
554     {
555         return false;
556     }
557 
558     @Override
559     public int available() throws IOException
560     {
561         // Critical section because run() may change __bytesAvailable
562         synchronized (__queue)
563         {
564             if (__threaded) { // Must not call super.available when running threaded: NET-466
565                 return __bytesAvailable;
566             } else {
567                 return __bytesAvailable + super.available();
568             }
569         }
570     }
571 
572 
573     // Cannot be synchronized.  Will cause deadlock if run() is blocked
574     // in read because BufferedInputStream read() is synchronized.
575     @Override
576     public void close() throws IOException
577     {
578         // Completely disregard the fact thread may still be running.
579         // We can't afford to block on this close by waiting for
580         // thread to terminate because few if any JVM's will actually
581         // interrupt a system read() from the interrupt() method.
582         super.close();
583 
584         synchronized (__queue)
585         {
586             __hasReachedEOF = true;
587             __isClosed      = true;
588 
589             if (__thread != null && __thread.isAlive())
590             {
591                 __thread.interrupt();
592             }
593 
594             __queue.notifyAll();
595         }
596 
597     }
598 
599 //    @Override
600     public void run()
601     {
602         int ch;
603 
604         try
605         {
606 _outerLoop:
607             while (!__isClosed)
608             {
609                 try
610                 {
611                     if ((ch = __read(true)) < 0) {
612                         break;
613                     }
614                 }
615                 catch (InterruptedIOException e)
616                 {
617                     synchronized (__queue)
618                     {
619                         __ioException = e;
620                         __queue.notifyAll();
621                         try
622                         {
623                             __queue.wait(100);
624                         }
625                         catch (InterruptedException interrupted)
626                         {
627                             if (__isClosed) {
628                                 break _outerLoop;
629                             }
630                         }
631                         continue;
632                     }
633                 } catch(RuntimeException re) {
634                     // We treat any runtime exceptions as though the
635                     // stream has been closed.  We close the
636                     // underlying stream just to be sure.
637                     super.close();
638                     // Breaking the loop has the effect of setting
639                     // the state to closed at the end of the method.
640                     break _outerLoop;
641                 }
642 
643                 // Process new character
644                 boolean notify = false;
645                 try
646                 {
647                     notify = __processChar(ch);
648                 }
649                 catch (InterruptedException e)
650                 {
651                     if (__isClosed) {
652                         break _outerLoop;
653                     }
654                 }
655 
656                 // Notify input listener if buffer was previously empty
657                 if (notify) {
658                     __client.notifyInputListener();
659                 }
660             }
661         }
662         catch (IOException ioe)
663         {
664             synchronized (__queue)
665             {
666                 __ioException = ioe;
667             }
668             __client.notifyInputListener();
669         }
670 
671         synchronized (__queue)
672         {
673             __isClosed      = true; // Possibly redundant
674             __hasReachedEOF = true;
675             __queue.notify();
676         }
677 
678         __threaded = false;
679     }
680 }
681 
682 /* Emacs configuration
683  * Local variables:        **
684  * mode:             java  **
685  * c-basic-offset:   4     **
686  * indent-tabs-mode: nil   **
687  * End:                    **
688  */