TelnetInputStream.java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *      https://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */

  17. package org.apache.commons.net.telnet;

  18. import java.io.BufferedInputStream;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.InterruptedIOException;

  22. final class TelnetInputStream extends BufferedInputStream implements Runnable {
  23.     /** End of file has been reached */
  24.     private static final int EOF = -1;

  25.     /** Read would block */
  26.     private static final int WOULD_BLOCK = -2;

  27.     // TODO should these be private enums?
  28.     static final int STATE_DATA = 0;
  29.     static final int STATE_IAC = 1;
  30.     static final int STATE_WILL = 2;
  31.     static final int STATE_WONT = 3;
  32.     static final int STATE_DO = 4;
  33.     static final int STATE_DONT = 5;
  34.     static final int STATE_SB = 6;
  35.     static final int STATE_SE = 7;
  36.     static final int STATE_CR = 8;
  37.     static final int STATE_IAC_SB = 9;
  38.     private boolean hasReachedEOF; // @GuardedBy("queue")
  39.     private volatile boolean isClosed;
  40.     private boolean readIsWaiting;
  41.     private int receiveState;
  42.     private int queueHead;
  43.     private int queueTail;
  44.     private int bytesAvailable;
  45.     private final int[] queue;
  46.     private final TelnetClient client;
  47.     private final Thread thread;
  48.     private IOException ioException;

  49.     /* TERMINAL-TYPE option (start) */
  50.     private final int suboption[];
  51.     private int suboptionCount;
  52.     /* TERMINAL-TYPE option (end) */

  53.     private volatile boolean threaded;

  54.     TelnetInputStream(final InputStream input, final TelnetClient client) {
  55.         this(input, client, true);
  56.     }

  57.     TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
  58.         super(input);
  59.         this.client = client;
  60.         this.receiveState = STATE_DATA;
  61.         this.isClosed = true;
  62.         this.hasReachedEOF = false;
  63.         // Make it 2049, because when full, one slot will go unused, and we
  64.         // want a 2048 byte buffer just to have a round number (base 2 that is)
  65.         this.queue = new int[2049];
  66.         this.queueHead = 0;
  67.         this.queueTail = 0;
  68.         this.suboption = new int[client.maxSubnegotiationLength];
  69.         this.bytesAvailable = 0;
  70.         this.ioException = null;
  71.         this.readIsWaiting = false;
  72.         this.threaded = false;
  73.         if (readerThread) {
  74.             this.thread = new Thread(this);
  75.         } else {
  76.             this.thread = null;
  77.         }
  78.     }

  79.     @Override
  80.     public int available() throws IOException {
  81.         // Critical section because run() may change bytesAvailable
  82.         synchronized (queue) {
  83.             if (threaded) { // Must not call super.available when running threaded: NET-466
  84.                 return bytesAvailable;
  85.             }
  86.             return bytesAvailable + super.available();
  87.         }
  88.     }

  89.     // Cannot be synchronized. Will cause deadlock if run() is blocked
  90.     // in read because BufferedInputStream read() is synchronized.
  91.     @Override
  92.     public void close() throws IOException {
  93.         // Completely disregard the fact thread may still be running.
  94.         // We can't afford to block on this close by waiting for
  95.         // thread to terminate because few if any JVM's will actually
  96.         // interrupt a system read() from the interrupt() method.
  97.         super.close();

  98.         synchronized (queue) {
  99.             hasReachedEOF = true;
  100.             isClosed = true;

  101.             if (thread != null && thread.isAlive()) {
  102.                 thread.interrupt();
  103.             }

  104.             queue.notifyAll();
  105.         }

  106.     }

  107.     /** Returns false. Mark is not supported. */
  108.     @Override
  109.     public boolean markSupported() {
  110.         return false;
  111.     }

  112.     // synchronized(client) critical sections are to protect against
  113.     // TelnetOutputStream writing through the Telnet client at same time
  114.     // as a processDo/Will/etc. command invoked from TelnetInputStream
  115.     // tries to write. Returns true if buffer was previously empty.
  116.     private boolean processChar(final int ch) throws InterruptedException {
  117.         // Critical section because we're altering bytesAvailable,
  118.         // queueTail, and the contents of _queue.
  119.         final boolean bufferWasEmpty;
  120.         synchronized (queue) {
  121.             bufferWasEmpty = bytesAvailable == 0;
  122.             while (bytesAvailable >= queue.length - 1) {
  123.                 // The queue is full. We need to wait before adding any more data to it. Hopefully the stream owner
  124.                 // will consume some data soon!
  125.                 if (!threaded) {
  126.                     // We've been asked to add another character to the queue, but it is already full and there's
  127.                     // no other thread to drain it. This should not have happened!
  128.                     throw new IllegalStateException("Queue is full! Cannot process another character.");
  129.                 }
  130.                 queue.notify();
  131.                 try {
  132.                     queue.wait();
  133.                 } catch (final InterruptedException e) {
  134.                     throw e;
  135.                 }
  136.             }

  137.             // Need to do this in case we're not full, but block on a read
  138.             if (readIsWaiting && threaded) {
  139.                 queue.notify();
  140.             }

  141.             queue[queueTail] = ch;
  142.             ++bytesAvailable;

  143.             if (++queueTail >= queue.length) {
  144.                 queueTail = 0;
  145.             }
  146.         }
  147.         return bufferWasEmpty;
  148.     }

  149.     @Override
  150.     public int read() throws IOException {
  151.         // Critical section because we're altering bytesAvailable,
  152.         // queueHead, and the contents of _queue in addition to
  153.         // testing value of hasReachedEOF.
  154.         synchronized (queue) {

  155.             while (true) {
  156.                 if (ioException != null) {
  157.                     final IOException e;
  158.                     e = ioException;
  159.                     ioException = null;
  160.                     throw e;
  161.                 }

  162.                 if (bytesAvailable == 0) {
  163.                     // Return EOF if at end of file
  164.                     if (hasReachedEOF) {
  165.                         return EOF;
  166.                     }

  167.                     // Otherwise, we have to wait for queue to get something
  168.                     if (threaded) {
  169.                         queue.notify();
  170.                         try {
  171.                             readIsWaiting = true;
  172.                             queue.wait();
  173.                             readIsWaiting = false;
  174.                         } catch (final InterruptedException e) {
  175.                             throw new InterruptedIOException("Fatal thread interruption during read.");
  176.                         }
  177.                     } else {
  178.                         // alreadyread = false;
  179.                         readIsWaiting = true;
  180.                         int ch;
  181.                         boolean mayBlock = true; // block on the first read only

  182.                         do {
  183.                             try {
  184.                                 if ((ch = read(mayBlock)) < 0 && ch != WOULD_BLOCK) {
  185.                                     // must be EOF
  186.                                     return ch;
  187.                                 }
  188.                             } catch (final InterruptedIOException e) {
  189.                                 synchronized (queue) {
  190.                                     ioException = e;
  191.                                     queue.notifyAll();
  192.                                     try {
  193.                                         queue.wait(100);
  194.                                     } catch (final InterruptedException interrupted) {
  195.                                         // Ignored
  196.                                     }
  197.                                 }
  198.                                 return EOF;
  199.                             }

  200.                             try {
  201.                                 if (ch != WOULD_BLOCK) {
  202.                                     processChar(ch);
  203.                                 }
  204.                             } catch (final InterruptedException e) {
  205.                                 if (isClosed) {
  206.                                     return EOF;
  207.                                 }
  208.                             }

  209.                             // Reads should not block on subsequent iterations. Potentially, this could happen if the
  210.                             // remaining buffered socket data consists entirely of Telnet command sequence and no "user" data.
  211.                             mayBlock = false;

  212.                         }
  213.                         // Continue reading as long as there is data available and the queue is not full.
  214.                         while (super.available() > 0 && bytesAvailable < queue.length - 1);

  215.                         readIsWaiting = false;
  216.                     }
  217.                     continue;
  218.                 }
  219.                 final int ch;

  220.                 ch = queue[queueHead];

  221.                 if (++queueHead >= queue.length) {
  222.                     queueHead = 0;
  223.                 }

  224.                 --bytesAvailable;

  225.                 // Need to explicitly notify() so available() works properly
  226.                 if (bytesAvailable == 0 && threaded) {
  227.                     queue.notify();
  228.                 }

  229.                 return ch; // NOPMD TODO?
  230.             }
  231.         }
  232.     }

  233.     // synchronized(client) critical sections are to protect against
  234.     // TelnetOutputStream writing through the Telnet client at same time
  235.     // as a processDo/Will/etc. command invoked from TelnetInputStream
  236.     // tries to write.
  237.     /**
  238.      * Gets the next byte of data. IAC commands are processed internally and do not return data.
  239.      *
  240.      * @param mayBlock true if method is allowed to block
  241.      * @return the next byte of data, or -1 (EOF) if end of stread reached, or -2 (WOULD_BLOCK) if mayBlock is false and there is no data available
  242.      */
  243.     private int read(final boolean mayBlock) throws IOException {
  244.         int ch;

  245.         while (true) {

  246.             // If there is no more data AND we were told not to block,
  247.             // just return WOULD_BLOCK (-2). (More efficient than exception.)
  248.             if (!mayBlock && super.available() == 0) {
  249.                 return WOULD_BLOCK;
  250.             }

  251.             // Otherwise, exit only when we reach end of stream.
  252.             if ((ch = super.read()) < 0) {
  253.                 return EOF;
  254.             }

  255.             ch &= 0xff;

  256.             /* Code Section added for supporting AYT (start) */
  257.             synchronized (client) {
  258.                 client.processAYTResponse();
  259.             }
  260.             /* Code Section added for supporting AYT (end) */

  261.             /* Code Section added for supporting spystreams (start) */
  262.             client.spyRead(ch);
  263.             /* Code Section added for supporting spystreams (end) */

  264.             switch (receiveState) {

  265.             case STATE_CR:
  266.                 if (ch == Telnet.NUL) {
  267.                     // Strip null
  268.                     continue;
  269.                 }
  270.                 // How do we handle newline after cr?
  271.                 // else if (ch == '\n' && _requestedDont(TelnetOption.ECHO) &&

  272.                 // Handle as normal data by falling through to _STATE_DATA case

  273.                 // falls through$
  274.             case STATE_DATA:
  275.                 if (ch == TelnetCommand.IAC) {
  276.                     receiveState = STATE_IAC;
  277.                     continue;
  278.                 }

  279.                 if (ch == '\r') {
  280.                     synchronized (client) {
  281.                         if (client.requestedDont(TelnetOption.BINARY)) {
  282.                             receiveState = STATE_CR;
  283.                         } else {
  284.                             receiveState = STATE_DATA;
  285.                         }
  286.                     }
  287.                 } else {
  288.                     receiveState = STATE_DATA;
  289.                 }
  290.                 break;

  291.             case STATE_IAC:
  292.                 switch (ch) {
  293.                 case TelnetCommand.WILL:
  294.                     receiveState = STATE_WILL;
  295.                     continue;
  296.                 case TelnetCommand.WONT:
  297.                     receiveState = STATE_WONT;
  298.                     continue;
  299.                 case TelnetCommand.DO:
  300.                     receiveState = STATE_DO;
  301.                     continue;
  302.                 case TelnetCommand.DONT:
  303.                     receiveState = STATE_DONT;
  304.                     continue;
  305.                 /* TERMINAL-TYPE option (start) */
  306.                 case TelnetCommand.SB:
  307.                     suboptionCount = 0;
  308.                     receiveState = STATE_SB;
  309.                     continue;
  310.                 /* TERMINAL-TYPE option (end) */
  311.                 case TelnetCommand.IAC:
  312.                     receiveState = STATE_DATA;
  313.                     break; // exit to enclosing switch to return IAC from read
  314.                 case TelnetCommand.SE: // unexpected byte! ignore it (don't send it as a command)
  315.                     receiveState = STATE_DATA;
  316.                     continue;
  317.                 default:
  318.                     receiveState = STATE_DATA;
  319.                     client.processCommand(ch); // Notify the user
  320.                     continue; // move on the next char
  321.                 }
  322.                 break; // exit and return from read
  323.             case STATE_WILL:
  324.                 synchronized (client) {
  325.                     client.processWill(ch);
  326.                     client.flushOutputStream();
  327.                 }
  328.                 receiveState = STATE_DATA;
  329.                 continue;
  330.             case STATE_WONT:
  331.                 synchronized (client) {
  332.                     client.processWont(ch);
  333.                     client.flushOutputStream();
  334.                 }
  335.                 receiveState = STATE_DATA;
  336.                 continue;
  337.             case STATE_DO:
  338.                 synchronized (client) {
  339.                     client.processDo(ch);
  340.                     client.flushOutputStream();
  341.                 }
  342.                 receiveState = STATE_DATA;
  343.                 continue;
  344.             case STATE_DONT:
  345.                 synchronized (client) {
  346.                     client.processDont(ch);
  347.                     client.flushOutputStream();
  348.                 }
  349.                 receiveState = STATE_DATA;
  350.                 continue;
  351.             /* TERMINAL-TYPE option (start) */
  352.             case STATE_SB:
  353.                 switch (ch) {
  354.                 case TelnetCommand.IAC:
  355.                     receiveState = STATE_IAC_SB;
  356.                     continue;
  357.                 default:
  358.                     // store suboption char
  359.                     if (suboptionCount < suboption.length) {
  360.                         suboption[suboptionCount++] = ch;
  361.                     }
  362.                     break;
  363.                 }
  364.                 receiveState = STATE_SB;
  365.                 continue;
  366.             case STATE_IAC_SB: // IAC received during SB phase
  367.                 switch (ch) {
  368.                 case TelnetCommand.SE:
  369.                     synchronized (client) {
  370.                         client.processSuboption(suboption, suboptionCount);
  371.                         client.flushOutputStream();
  372.                     }
  373.                     receiveState = STATE_DATA;
  374.                     continue;
  375.                 case TelnetCommand.IAC: // De-dup the duplicated IAC
  376.                     if (suboptionCount < suboption.length) {
  377.                         suboption[suboptionCount++] = ch;
  378.                     }
  379.                     break;
  380.                 default: // unexpected byte! ignore it
  381.                     break;
  382.                 }
  383.                 receiveState = STATE_SB;
  384.                 continue;
  385.             /* TERMINAL-TYPE option (end) */
  386.             }

  387.             break; // NOPMD TODO?
  388.         }

  389.         return ch;
  390.     }

  391.     /**
  392.      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the stream has been reached.
  393.      *
  394.      * @param buffer The byte array in which to store the data.
  395.      * @return The number of bytes read. Returns -1 if the end of the message has been reached.
  396.      * @throws IOException If an error occurs in reading the underlying stream.
  397.      */
  398.     @Override
  399.     public int read(final byte buffer[]) throws IOException {
  400.         return read(buffer, 0, buffer.length);
  401.     }

  402.     /**
  403.      * Reads the next number of bytes from the stream into an array and returns the number of bytes read. Returns -1 if the end of the message has been reached.
  404.      * The characters are stored in the array starting from the given offset and up to the length specified.
  405.      *
  406.      * @param buffer The byte array in which to store the data.
  407.      * @param offset The offset into the array at which to start storing data.
  408.      * @param length The number of bytes to read.
  409.      * @return The number of bytes read. Returns -1 if the end of the stream has been reached.
  410.      * @throws IOException If an error occurs while reading the underlying stream.
  411.      */
  412.     @Override
  413.     public int read(final byte buffer[], int offset, int length) throws IOException {
  414.         int ch;
  415.         final int off;

  416.         if (length < 1) {
  417.             return 0;
  418.         }

  419.         // Critical section because run() may change bytesAvailable
  420.         synchronized (queue) {
  421.             if (length > bytesAvailable) {
  422.                 length = bytesAvailable;
  423.             }
  424.         }

  425.         if ((ch = read()) == EOF) {
  426.             return EOF;
  427.         }

  428.         off = offset;

  429.         do {
  430.             buffer[offset++] = (byte) ch;
  431.         } while (--length > 0 && (ch = read()) != EOF);

  432.         // client._spyRead(buffer, off, offset - off);
  433.         return offset - off;
  434.     }

  435.     @Override
  436.     public void run() {
  437.         int ch;

  438.         try {
  439.             _outerLoop: while (!isClosed) {
  440.                 try {
  441.                     if ((ch = read(true)) < 0) {
  442.                         break;
  443.                     }
  444.                 } catch (final InterruptedIOException e) {
  445.                     synchronized (queue) {
  446.                         ioException = e;
  447.                         queue.notifyAll();
  448.                         try {
  449.                             queue.wait(100);
  450.                         } catch (final InterruptedException interrupted) {
  451.                             if (isClosed) {
  452.                                 break _outerLoop;
  453.                             }
  454.                         }
  455.                         continue;
  456.                     }
  457.                 } catch (final RuntimeException re) {
  458.                     // We treat any runtime exceptions as though the
  459.                     // stream has been closed. We close the
  460.                     // underlying stream just to be sure.
  461.                     super.close();
  462.                     // Breaking the loop has the effect of setting
  463.                     // the state to closed at the end of the method.
  464.                     break _outerLoop;
  465.                 }

  466.                 // Process new character
  467.                 boolean notify = false;
  468.                 try {
  469.                     notify = processChar(ch);
  470.                 } catch (final InterruptedException e) {
  471.                     if (isClosed) {
  472.                         break _outerLoop;
  473.                     }
  474.                 }

  475.                 // Notify input listener if buffer was previously empty
  476.                 if (notify) {
  477.                     client.notifyInputListener();
  478.                 }
  479.             }
  480.         } catch (final IOException ioe) {
  481.             synchronized (queue) {
  482.                 ioException = ioe;
  483.             }
  484.             client.notifyInputListener();
  485.         }

  486.         synchronized (queue) {
  487.             isClosed = true; // Possibly redundant
  488.             hasReachedEOF = true;
  489.             queue.notify();
  490.         }

  491.         threaded = false;
  492.     }

  493.     void start() {
  494.         if (thread == null) {
  495.             return;
  496.         }

  497.         int priority;
  498.         isClosed = false;
  499.         // TODO remove this
  500.         // Need to set a higher priority in case JVM does not use pre-emptive
  501.         // threads. This should prevent scheduler induced deadlock (rather than
  502.         // deadlock caused by a bug in this code).
  503.         priority = Thread.currentThread().getPriority() + 1;
  504.         if (priority > Thread.MAX_PRIORITY) {
  505.             priority = Thread.MAX_PRIORITY;
  506.         }
  507.         thread.setPriority(priority);
  508.         thread.setDaemon(true);
  509.         thread.start();
  510.         threaded = true; // tell _processChar that we are running threaded
  511.     }
  512. }