TelnetInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.apache.commons.net.telnet;

  18. import java.io.BufferedInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InterruptedIOException;

  22. final class TelnetInputStream extends BufferedInputStream implements Runnable {
  23.     /** End of file has been reached */
  24.     private static final int EOF = -1;

  25.     /** Read would block */
  26.     private static final int WOULD_BLOCK = -2;

  27.     // TODO should these be private enums?
  28.     static final int STATE_DATA = 0, STATE_IAC = 1, STATE_WILL = 2, STATE_WONT = 3, STATE_DO = 4, STATE_DONT = 5, STATE_SB = 6, STATE_SE = 7, STATE_CR = 8,
  29.             STATE_IAC_SB = 9;

  30.     private boolean hasReachedEOF; // @GuardedBy("queue")
  31.     private volatile boolean isClosed;
  32.     private boolean readIsWaiting;
  33.     private int receiveState, queueHead, queueTail, bytesAvailable;
  34.     private final int[] queue;
  35.     private final TelnetClient client;
  36.     private final Thread thread;
  37.     private IOException ioException;

  38.     /* TERMINAL-TYPE option (start) */
  39.     private final int suboption[];
  40.     private int suboptionCount;
  41.     /* TERMINAL-TYPE option (end) */

  42.     private volatile boolean threaded;

  43.     TelnetInputStream(final InputStream input, final TelnetClient client) {
  44.         this(input, client, true);
  45.     }

  46.     TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
  47.         super(input);
  48.         this.client = client;
  49.         this.receiveState = STATE_DATA;
  50.         this.isClosed = true;
  51.         this.hasReachedEOF = false;
  52.         // Make it 2049, because when full, one slot will go unused, and we
  53.         // want a 2048 byte buffer just to have a round number (base 2 that is)
  54.         this.queue = new int[2049];
  55.         this.queueHead = 0;
  56.         this.queueTail = 0;
  57.         this.suboption = new int[client.maxSubnegotiationLength];
  58.         this.bytesAvailable = 0;
  59.         this.ioException = null;
  60.         this.readIsWaiting = false;
  61.         this.threaded = false;
  62.         if (readerThread) {
  63.             this.thread = new Thread(this);
  64.         } else {
  65.             this.thread = null;
  66.         }
  67.     }

  68.     @Override
  69.     public int available() throws IOException {
  70.         // Critical section because run() may change bytesAvailable
  71.         synchronized (queue) {
  72.             if (threaded) { // Must not call super.available when running threaded: NET-466
  73.                 return bytesAvailable;
  74.             }
  75.             return bytesAvailable + super.available();
  76.         }
  77.     }

  78.     // Cannot be synchronized. Will cause deadlock if run() is blocked
  79.     // in read because BufferedInputStream read() is synchronized.
  80.     @Override
  81.     public void close() throws IOException {
  82.         // Completely disregard the fact thread may still be running.
  83.         // We can't afford to block on this close by waiting for
  84.         // thread to terminate because few if any JVM's will actually
  85.         // interrupt a system read() from the interrupt() method.
  86.         super.close();

  87.         synchronized (queue) {
  88.             hasReachedEOF = true;
  89.             isClosed = true;

  90.             if (thread != null && thread.isAlive()) {
  91.                 thread.interrupt();
  92.             }

  93.             queue.notifyAll();
  94.         }

  95.     }

  96.     /** Returns false. Mark is not supported. */
  97.     @Override
  98.     public boolean markSupported() {
  99.         return false;
  100.     }

  101.     // synchronized(client) critical sections are to protect against
  102.     // TelnetOutputStream writing through the telnet client at same time
  103.     // as a processDo/Will/etc. command invoked from TelnetInputStream
  104.     // tries to write. Returns true if buffer was previously empty.
  105.     private boolean processChar(final int ch) throws InterruptedException {
  106.         // Critical section because we're altering bytesAvailable,
  107.         // queueTail, and the contents of _queue.
  108.         final boolean bufferWasEmpty;
  109.         synchronized (queue) {
  110.             bufferWasEmpty = bytesAvailable == 0;
  111.             while (bytesAvailable >= queue.length - 1) {
  112.                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
  113.                 // will consume some data soon!
  114.                 if (!threaded) {
  115.                     // We've been asked to add another character to the queue, but it is already full and there's
  116.                     // no other thread to drain it. This should not have happened!
  117.                     throw new IllegalStateException("Queue is full! Cannot process another character.");
  118.                 }
  119.                 queue.notify();
  120.                 try {
  121.                     queue.wait();
  122.                 } catch (final InterruptedException e) {
  123.                     throw e;
  124.                 }
  125.             }

  126.             // Need to do this in case we're not full, but block on a read
  127.             if (readIsWaiting && threaded) {
  128.                 queue.notify();
  129.             }

  130.             queue[queueTail] = ch;
  131.             ++bytesAvailable;

  132.             if (++queueTail >= queue.length) {
  133.                 queueTail = 0;
  134.             }
  135.         }
  136.         return bufferWasEmpty;
  137.     }

  138.     @Override
  139.     public int read() throws IOException {
  140.         // Critical section because we're altering bytesAvailable,
  141.         // queueHead, and the contents of _queue in addition to
  142.         // testing value of hasReachedEOF.
  143.         synchronized (queue) {

  144.             while (true) {
  145.                 if (ioException != null) {
  146.                     final IOException e;
  147.                     e = ioException;
  148.                     ioException = null;
  149.                     throw e;
  150.                 }

  151.                 if (bytesAvailable == 0) {
  152.                     // Return EOF if at end of file
  153.                     if (hasReachedEOF) {
  154.                         return EOF;
  155.                     }

  156.                     // Otherwise, we have to wait for queue to get something
  157.                     if (threaded) {
  158.                         queue.notify();
  159.                         try {
  160.                             readIsWaiting = true;
  161.                             queue.wait();
  162.                             readIsWaiting = false;
  163.                         } catch (final InterruptedException e) {
  164.                             throw new InterruptedIOException("Fatal thread interruption during read.");
  165.                         }
  166.                     } else {
  167.                         // alreadyread = false;
  168.                         readIsWaiting = true;
  169.                         int ch;
  170.                         boolean mayBlock = true; // block on the first read only

  171.                         do {
  172.                             try {
  173.                                 if ((ch = read(mayBlock)) < 0) { // must be EOF
  174.                                     if (ch != WOULD_BLOCK) {
  175.                                         return ch;
  176.                                     }
  177.                                 }
  178.                             } catch (final InterruptedIOException e) {
  179.                                 synchronized (queue) {
  180.                                     ioException = e;
  181.                                     queue.notifyAll();
  182.                                     try {
  183.                                         queue.wait(100);
  184.                                     } catch (final InterruptedException interrupted) {
  185.                                         // Ignored
  186.                                     }
  187.                                 }
  188.                                 return EOF;
  189.                             }

  190.                             try {
  191.                                 if (ch != WOULD_BLOCK) {
  192.                                     processChar(ch);
  193.                                 }
  194.                             } catch (final InterruptedException e) {
  195.                                 if (isClosed) {
  196.                                     return EOF;
  197.                                 }
  198.                             }

  199.                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
  200.                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
  201.                             mayBlock = false;

  202.                         }
  203.                         // Continue reading as long as there is data available and the queue is not full.
  204.                         while (super.available() > 0 && bytesAvailable < queue.length - 1);

  205.                         readIsWaiting = false;
  206.                     }
  207.                     continue;
  208.                 }
  209.                 final int ch;

  210.                 ch = queue[queueHead];

  211.                 if (++queueHead >= queue.length) {
  212.                     queueHead = 0;
  213.                 }

  214.                 --bytesAvailable;

  215.                 // Need to explicitly notify() so available() works properly
  216.                 if (bytesAvailable == 0 && threaded) {
  217.                     queue.notify();
  218.                 }

  219.                 return ch;
  220.             }
  221.         }
  222.     }

  223.     // synchronized(client) critical sections are to protect against
  224.     // TelnetOutputStream writing through the telnet client at same time
  225.     // as a processDo/Will/etc. command invoked from TelnetInputStream
  226.     // tries to write.
  227.     /**
  228.      * Gets the next byte of data. IAC commands are processed internally and do not return data.
  229.      *
  230.      * @param mayBlock true if method is allowed to block
  231.      * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
  232.      */
  233.     private int read(final boolean mayBlock) throws IOException {
  234.         int ch;

  235.         while (true) {

  236.             // If there is no more data AND we were told not to block,
  237.             // just return WOULD_BLOCK (-2). (More efficient than exception.)
  238.             if (!mayBlock && super.available() == 0) {
  239.                 return WOULD_BLOCK;
  240.             }

  241.             // Otherwise, exit only when we reach end of stream.
  242.             if ((ch = super.read()) < 0) {
  243.                 return EOF;
  244.             }

  245.             ch &= 0xff;

  246.             /* Code Section added for supporting AYT (start) */
  247.             synchronized (client) {
  248.                 client.processAYTResponse();
  249.             }
  250.             /* Code Section added for supporting AYT (end) */

  251.             /* Code Section added for supporting spystreams (start) */
  252.             client.spyRead(ch);
  253.             /* Code Section added for supporting spystreams (end) */

  254.             switch (receiveState) {

  255.             case STATE_CR:
  256.                 if (ch == '\0') {
  257.                     // Strip null
  258.                     continue;
  259.                 }
  260.                 // How do we handle newline after cr?
  261.                 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&

  262.                 // Handle as normal data by falling through to _STATE_DATA case

  263.                 //$FALL-THROUGH$
  264.             case STATE_DATA:
  265.                 if (ch == TelnetCommand.IAC) {
  266.                     receiveState = STATE_IAC;
  267.                     continue;
  268.                 }

  269.                 if (ch == '\r') {
  270.                     synchronized (client) {
  271.                         if (client.requestedDont(TelnetOption.BINARY)) {
  272.                             receiveState = STATE_CR;
  273.                         } else {
  274.                             receiveState = STATE_DATA;
  275.                         }
  276.                     }
  277.                 } else {
  278.                     receiveState = STATE_DATA;
  279.                 }
  280.                 break;

  281.             case STATE_IAC:
  282.                 switch (ch) {
  283.                 case TelnetCommand.WILL:
  284.                     receiveState = STATE_WILL;
  285.                     continue;
  286.                 case TelnetCommand.WONT:
  287.                     receiveState = STATE_WONT;
  288.                     continue;
  289.                 case TelnetCommand.DO:
  290.                     receiveState = STATE_DO;
  291.                     continue;
  292.                 case TelnetCommand.DONT:
  293.                     receiveState = STATE_DONT;
  294.                     continue;
  295.                 /* TERMINAL-TYPE option (start) */
  296.                 case TelnetCommand.SB:
  297.                     suboptionCount = 0;
  298.                     receiveState = STATE_SB;
  299.                     continue;
  300.                 /* TERMINAL-TYPE option (end) */
  301.                 case TelnetCommand.IAC:
  302.                     receiveState = STATE_DATA;
  303.                     break; // exit to enclosing switch to return IAC from read
  304.                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
  305.                     receiveState = STATE_DATA;
  306.                     continue;
  307.                 default:
  308.                     receiveState = STATE_DATA;
  309.                     client.processCommand(ch); // Notify the user
  310.                     continue; // move on the next char
  311.                 }
  312.                 break; // exit and return from read
  313.             case STATE_WILL:
  314.                 synchronized (client) {
  315.                     client.processWill(ch);
  316.                     client.flushOutputStream();
  317.                 }
  318.                 receiveState = STATE_DATA;
  319.                 continue;
  320.             case STATE_WONT:
  321.                 synchronized (client) {
  322.                     client.processWont(ch);
  323.                     client.flushOutputStream();
  324.                 }
  325.                 receiveState = STATE_DATA;
  326.                 continue;
  327.             case STATE_DO:
  328.                 synchronized (client) {
  329.                     client.processDo(ch);
  330.                     client.flushOutputStream();
  331.                 }
  332.                 receiveState = STATE_DATA;
  333.                 continue;
  334.             case STATE_DONT:
  335.                 synchronized (client) {
  336.                     client.processDont(ch);
  337.                     client.flushOutputStream();
  338.                 }
  339.                 receiveState = STATE_DATA;
  340.                 continue;
  341.             /* TERMINAL-TYPE option (start) */
  342.             case STATE_SB:
  343.                 switch (ch) {
  344.                 case TelnetCommand.IAC:
  345.                     receiveState = STATE_IAC_SB;
  346.                     continue;
  347.                 default:
  348.                     // store suboption char
  349.                     if (suboptionCount < suboption.length) {
  350.                         suboption[suboptionCount++] = ch;
  351.                     }
  352.                     break;
  353.                 }
  354.                 receiveState = STATE_SB;
  355.                 continue;
  356.             case STATE_IAC_SB: // IAC received during SB phase
  357.                 switch (ch) {
  358.                 case TelnetCommand.SE:
  359.                     synchronized (client) {
  360.                         client.processSuboption(suboption, suboptionCount);
  361.                         client.flushOutputStream();
  362.                     }
  363.                     receiveState = STATE_DATA;
  364.                     continue;
  365.                 case TelnetCommand.IAC: // De-dup the duplicated IAC
  366.                     if (suboptionCount < suboption.length) {
  367.                         suboption[suboptionCount++] = ch;
  368.                     }
  369.                     break;
  370.                 default: // unexpected byte! ignore it
  371.                     break;
  372.                 }
  373.                 receiveState = STATE_SB;
  374.                 continue;
  375.             /* TERMINAL-TYPE option (end) */
  376.             }

  377.             break;
  378.         }

  379.         return ch;
  380.     }

  381.     /**
  382.      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
  383.      *
  384.      * @param buffer The byte array in which to store the data.
  385.      * @return The number of bytes read. Returns -1 if the end of the message has been reached.
  386.      * @throws IOException If an error occurs in reading the underlying stream.
  387.      */
  388.     @Override
  389.     public int read(final byte buffer[]) throws IOException {
  390.         return read(buffer, 0, buffer.length);
  391.     }

  392.     /**
  393.      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
  394.      * The characters are stored in the array starting from the given offset and up to the length specified.
  395.      *
  396.      * @param buffer The byte array in which to store the data.
  397.      * @param offset The offset into the array at which to start storing data.
  398.      * @param length The number of bytes to read.
  399.      * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
  400.      * @throws IOException If an error occurs while reading the underlying stream.
  401.      */
  402.     @Override
  403.     public int read(final byte buffer[], int offset, int length) throws IOException {
  404.         int ch;
  405.         final int off;

  406.         if (length < 1) {
  407.             return 0;
  408.         }

  409.         // Critical section because run() may change bytesAvailable
  410.         synchronized (queue) {
  411.             if (length > bytesAvailable) {
  412.                 length = bytesAvailable;
  413.             }
  414.         }

  415.         if ((ch = read()) == EOF) {
  416.             return EOF;
  417.         }

  418.         off = offset;

  419.         do {
  420.             buffer[offset++] = (byte) ch;
  421.         } while (--length > 0 && (ch = read()) != EOF);

  422.         // client._spyRead(buffer, off, offset - off);
  423.         return offset - off;
  424.     }

  425.     @Override
  426.     public void run() {
  427.         int ch;

  428.         try {
  429.             _outerLoop: while (!isClosed) {
  430.                 try {
  431.                     if ((ch = read(true)) < 0) {
  432.                         break;
  433.                     }
  434.                 } catch (final InterruptedIOException e) {
  435.                     synchronized (queue) {
  436.                         ioException = e;
  437.                         queue.notifyAll();
  438.                         try {
  439.                             queue.wait(100);
  440.                         } catch (final InterruptedException interrupted) {
  441.                             if (isClosed) {
  442.                                 break _outerLoop;
  443.                             }
  444.                         }
  445.                         continue;
  446.                     }
  447.                 } catch (final RuntimeException re) {
  448.                     // We treat any runtime exceptions as though the
  449.                     // stream has been closed. We close the
  450.                     // underlying stream just to be sure.
  451.                     super.close();
  452.                     // Breaking the loop has the effect of setting
  453.                     // the state to closed at the end of the method.
  454.                     break _outerLoop;
  455.                 }

  456.                 // Process new character
  457.                 boolean notify = false;
  458.                 try {
  459.                     notify = processChar(ch);
  460.                 } catch (final InterruptedException e) {
  461.                     if (isClosed) {
  462.                         break _outerLoop;
  463.                     }
  464.                 }

  465.                 // Notify input listener if buffer was previously empty
  466.                 if (notify) {
  467.                     client.notifyInputListener();
  468.                 }
  469.             }
  470.         } catch (final IOException ioe) {
  471.             synchronized (queue) {
  472.                 ioException = ioe;
  473.             }
  474.             client.notifyInputListener();
  475.         }

  476.         synchronized (queue) {
  477.             isClosed = true; // Possibly redundant
  478.             hasReachedEOF = true;
  479.             queue.notify();
  480.         }

  481.         threaded = false;
  482.     }

  483.     void start() {
  484.         if (thread == null) {
  485.             return;
  486.         }

  487.         int priority;
  488.         isClosed = false;
  489.         // TODO remove this
  490.         // Need to set a higher priority in case JVM does not use pre-emptive
  491.         // threads. This should prevent scheduler induced deadlock (rather than
  492.         // deadlock caused by a bug in this code).
  493.         priority = Thread.currentThread().getPriority() + 1;
  494.         if (priority > Thread.MAX_PRIORITY) {
  495.             priority = Thread.MAX_PRIORITY;
  496.         }
  497.         thread.setPriority(priority);
  498.         thread.setDaemon(true);
  499.         thread.start();
  500.         threaded = true; // tell _processChar that we are running threaded
  501.     }
  502. }