View Javadoc

1   /*
2    * Copyright (c) 2002-2012, the original author or authors.
3    *
4    * This software is distributable under the BSD license. See the terms of the
5    * BSD license in the documentation provided with this software.
6    *
7    * http://www.opensource.org/licenses/bsd-license.php
8    */
9   package jline.internal;
10  
11  import java.io.IOException;
12  import java.io.InputStream;
13  
14  /**
15   * This class wraps a regular input stream and allows it to appear as if it
16   * is non-blocking; that is, reads can be performed against it that timeout
17   * if no data is seen for a period of time.  This effect is achieved by having
18   * a separate thread perform all non-blocking read requests and then
19   * waiting on the thread to complete.  
20   * 
21   * <p>VERY IMPORTANT NOTES
22   * <ul>
23   *   <li> This class is not thread safe. It expects at most one reader.
24   *   <li> The {@link #shutdown()} method must be called in order to shut down
25   *          the thread that handles blocking I/O.
26   * </ul>
27   * @since 2.7
28   * @author Scott C. Gray <scottgray1@gmail.com>
29   */
30  public class NonBlockingInputStream
31      extends InputStream
32      implements Runnable
33  {
34      private InputStream in;               // The actual input stream
35      private int    ch   = -2;             // Recently read character
36      
37      private boolean     threadIsReading      = false;
38      private boolean     isShutdown           = false;
39      private IOException exception            = null;
40      private boolean     nonBlockingEnabled;
41      
42      /**
43       * Creates a <code>NonBlockingInputStream</code> out of a normal blocking
44       * stream. Note that this call also spawn a separate thread to perform the
45       * blocking I/O on behalf of the thread that is using this class. The
46       * {@link #shutdown()} method must be called in order to shut this thread down.
47       * @param in The input stream to wrap
48       * @param isNonBlockingEnabled If true, then the non-blocking methods 
49       *   {@link #read(long)} and {@link #peek(long)} will be available and,
50       *   more importantly, the thread will be started to provide support for the
51       *   feature.  If false, then this class acts as a clean-passthru for the
52       *   underlying I/O stream and provides very little overhead.
53       */
54      public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) {
55          this.in                 = in;
56          this.nonBlockingEnabled = isNonBlockingEnabled;
57          
58          if (isNonBlockingEnabled) {
59              Thread t = new Thread(this);
60              t.setName("NonBlockingInputStreamThread");
61              t.setDaemon(true);
62              t.start();
63          }
64      }
65      
66      /**
67       * Shuts down the thread that is handling blocking I/O. Note that if the
68       * thread is currently blocked waiting for I/O it will not actually
69       * shut down until the I/O is received.  Shutting down the I/O thread
70       * does not prevent this class from being used, but causes the 
71       * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()}
72       * to return false.
73       */
74      public synchronized void shutdown() {
75          if (!isShutdown && nonBlockingEnabled) {
76              isShutdown = true;
77              notify();
78          }
79      }
80      
81      /**
82       * Non-blocking is considered enabled if the feature is enabled and the
83       * I/O thread has not been shut down. 
84       * @return true if non-blocking mode is enabled.
85       */
86      public boolean isNonBlockingEnabled() {
87          return nonBlockingEnabled && !isShutdown;
88      }
89      
90      @Override
91      public void close() throws IOException {
92          /*
93           * The underlying input stream is closed first. This means that if the
94           * I/O thread was blocked waiting on input, it will be woken for us.
95           */
96          in.close();
97          shutdown();
98      }
99  
100     @Override
101     public int read() throws IOException {
102         if (nonBlockingEnabled)
103             return read(0L, false);
104         return in.read ();
105     }
106     
107     /**
108      * Peeks to see if there is a byte waiting in the input stream without
109      * actually consuming the byte.
110      * 
111      * @param timeout The amount of time to wait, 0 == forever
112      * @return -1 on eof, -2 if the timeout expired with no available input
113      *   or the character that was read (without consuming it).
114      * @throws IOException
115      */
116     public int peek(long timeout) throws IOException {
117         if (!nonBlockingEnabled || isShutdown) {
118             throw new UnsupportedOperationException ("peek() "
119                 + "cannot be called as non-blocking operation is disabled");
120         }
121         return read(timeout, true);
122     }
123     
124     /**
125      * Attempts to read a character from the input stream for a specific
126      * period of time.
127      * @param timeout The amount of time to wait for the character
128      * @return The character read, -1 if EOF is reached, or -2 if the
129      *   read timed out.
130      * @throws IOException
131      */
132     public int read(long timeout) throws IOException {
133         if (!nonBlockingEnabled || isShutdown) {
134             throw new UnsupportedOperationException ("read() with timeout "
135                 + "cannot be called as non-blocking operation is disabled");
136         }
137         return read(timeout, false);
138     }
139     
140     /**
141      * Attempts to read a character from the input stream for a specific
142      * period of time.
143      * @param timeout The amount of time to wait for the character
144      * @return The character read, -1 if EOF is reached, or -2 if the
145      *   read timed out.
146      * @throws IOException
147      */
148     private synchronized int read(long timeout, boolean isPeek) throws IOException {
149         /*
150          * If the thread hit an IOException, we report it.
151          */
152         if (exception != null) {
153             assert ch == -2;
154             IOException toBeThrown = exception;
155             if (!isPeek)
156                 exception = null;
157             throw toBeThrown;
158         }
159             
160         /*
161          * If there was a pending character from the thread, then
162          * we send it. If the timeout is 0L or the thread was shut down
163          * then do a local read.
164          */
165         if (ch >= -1) {
166             assert exception == null;
167         }
168         else if ((timeout == 0L || isShutdown) && !threadIsReading) {
169             ch = in.read();
170         }
171         else {
172             /*
173              * If the thread isn't reading already, then ask it to do so.
174              */
175             if (!threadIsReading) {
176                 threadIsReading = true;
177                 notify();
178             }
179             
180             boolean isInfinite = (timeout <= 0L);
181             
182             /*
183              * So the thread is currently doing the reading for us. So
184              * now we play the waiting game.
185              */
186             while (isInfinite || timeout > 0L)  {
187                 long start = System.currentTimeMillis ();
188                 
189                 try {
190                     wait(timeout);
191                 }
192                 catch (InterruptedException e) {
193                     /* IGNORED */
194                 }
195                     
196                 if (exception != null) {
197                     assert ch == -2;
198                         
199                     IOException toBeThrown = exception;
200                     if (!isPeek)
201                         exception = null;
202                     throw toBeThrown;
203                 }
204                 
205                 if (ch >= -1) {
206                     assert exception == null;
207                     break;
208                 }
209                     
210                 if (!isInfinite) {
211                     timeout -= System.currentTimeMillis() - start;
212                 }
213             }
214         }
215         
216         /*
217          * ch is the character that was just read. Either we set it because
218          * a local read was performed or the read thread set it (or failed to
219          * change it).  We will return it's value, but if this was a peek
220          * operation, then we leave it in place.
221          */
222         int ret = ch;
223         if (!isPeek) {
224             ch = -2;
225         }
226         return ret;
227     }
228     
229     /**
230      * This version of read() is very specific to jline's purposes, it 
231      * will always always return a single byte at a time, rather than filling
232      * the entire buffer. 
233      */
234     @Override
235     public int read (byte[] b, int off, int len) throws IOException {
236         if (b == null) {
237             throw new NullPointerException();
238         } else if (off < 0 || len < 0 || len > b.length - off) {
239             throw new IndexOutOfBoundsException();
240         } else if (len == 0) {
241             return 0;
242         }
243 
244         int c;
245         if (nonBlockingEnabled)
246             c = this.read(0L);
247         else
248             c = in.read();
249         
250         if (c == -1) {
251             return -1;
252         }
253         b[off] = (byte)c;
254         return 1;
255     }
256 
257     //@Override
258     public void run () {
259         Log.debug("NonBlockingInputStream start");
260         boolean needToShutdown = false;
261         boolean needToRead = false;
262         
263         while (!needToShutdown) {
264             
265             /*
266              * Synchronize to grab variables accessed by both this thread
267              * and the accessing thread.
268              */
269             synchronized (this) {
270                 needToShutdown = this.isShutdown;
271                 needToRead     = this.threadIsReading;
272                 
273                 try {
274                     /*
275                      * Nothing to do? Then wait.
276                      */
277                     if (!needToShutdown && !needToRead) {
278                         wait(0);
279                     }
280                 }
281                 catch (InterruptedException e) {
282                     /* IGNORED */
283                 }
284             }
285             
286             /*
287              * We're not shutting down, but we need to read. This cannot
288              * happen while we are holding the lock (which we aren't now).
289              */
290             if (!needToShutdown && needToRead) {
291                 int          charRead = -2;
292                 IOException  failure = null;
293                 try {
294                     charRead = in.read();
295                 }
296                 catch (IOException e) {
297                     failure = e;
298                 }
299                 
300                 /*
301                  * Re-grab the lock to update the state.
302                  */
303                 synchronized (this) {
304                     exception       = failure;
305                     ch              = charRead;
306                     threadIsReading = false;
307                     notify();
308                 }
309             }
310         }
311         
312         Log.debug("NonBlockingInputStream shutdown");
313     }
314 }