1 /*
2 * Copyright (c) 2002-2012, the original author or authors.
3 *
4 * This software is distributable under the BSD license. See the terms of the
5 * BSD license in the documentation provided with this software.
6 *
7 * http://www.opensource.org/licenses/bsd-license.php
8 */
9 package jline.internal;
10
11 import java.io.IOException;
12 import java.io.InputStream;
13
14 /**
15 * This class wraps a regular input stream and allows it to appear as if it
16 * is non-blocking; that is, reads can be performed against it that timeout
17 * if no data is seen for a period of time. This effect is achieved by having
18 * a separate thread perform all non-blocking read requests and then
19 * waiting on the thread to complete.
20 *
21 * <p>VERY IMPORTANT NOTES
22 * <ul>
23 * <li> This class is not thread safe. It expects at most one reader.
24 * <li> The {@link #shutdown()} method must be called in order to shut down
25 * the thread that handles blocking I/O.
26 * </ul>
27 * @since 2.7
28 * @author Scott C. Gray <scottgray1@gmail.com>
29 */
30 public class NonBlockingInputStream
31 extends InputStream
32 implements Runnable
33 {
34 private InputStream in; // The actual input stream
35 private int ch = -2; // Recently read character
36
37 private boolean threadIsReading = false;
38 private boolean isShutdown = false;
39 private IOException exception = null;
40 private boolean nonBlockingEnabled;
41
42 /**
43 * Creates a <code>NonBlockingInputStream</code> out of a normal blocking
44 * stream. Note that this call also spawn a separate thread to perform the
45 * blocking I/O on behalf of the thread that is using this class. The
46 * {@link #shutdown()} method must be called in order to shut this thread down.
47 * @param in The input stream to wrap
48 * @param isNonBlockingEnabled If true, then the non-blocking methods
49 * {@link #read(long)} and {@link #peek(long)} will be available and,
50 * more importantly, the thread will be started to provide support for the
51 * feature. If false, then this class acts as a clean-passthru for the
52 * underlying I/O stream and provides very little overhead.
53 */
54 public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) {
55 this.in = in;
56 this.nonBlockingEnabled = isNonBlockingEnabled;
57
58 if (isNonBlockingEnabled) {
59 Thread t = new Thread(this);
60 t.setName("NonBlockingInputStreamThread");
61 t.setDaemon(true);
62 t.start();
63 }
64 }
65
66 /**
67 * Shuts down the thread that is handling blocking I/O. Note that if the
68 * thread is currently blocked waiting for I/O it will not actually
69 * shut down until the I/O is received. Shutting down the I/O thread
70 * does not prevent this class from being used, but causes the
71 * non-blocking methods to fail if called and causes {@link #isNonBlockingEnabled()}
72 * to return false.
73 */
74 public synchronized void shutdown() {
75 if (!isShutdown && nonBlockingEnabled) {
76 isShutdown = true;
77 notify();
78 }
79 }
80
81 /**
82 * Non-blocking is considered enabled if the feature is enabled and the
83 * I/O thread has not been shut down.
84 * @return true if non-blocking mode is enabled.
85 */
86 public boolean isNonBlockingEnabled() {
87 return nonBlockingEnabled && !isShutdown;
88 }
89
90 @Override
91 public void close() throws IOException {
92 /*
93 * The underlying input stream is closed first. This means that if the
94 * I/O thread was blocked waiting on input, it will be woken for us.
95 */
96 in.close();
97 shutdown();
98 }
99
100 @Override
101 public int read() throws IOException {
102 if (nonBlockingEnabled)
103 return read(0L, false);
104 return in.read ();
105 }
106
107 /**
108 * Peeks to see if there is a byte waiting in the input stream without
109 * actually consuming the byte.
110 *
111 * @param timeout The amount of time to wait, 0 == forever
112 * @return -1 on eof, -2 if the timeout expired with no available input
113 * or the character that was read (without consuming it).
114 * @throws IOException
115 */
116 public int peek(long timeout) throws IOException {
117 if (!nonBlockingEnabled || isShutdown) {
118 throw new UnsupportedOperationException ("peek() "
119 + "cannot be called as non-blocking operation is disabled");
120 }
121 return read(timeout, true);
122 }
123
124 /**
125 * Attempts to read a character from the input stream for a specific
126 * period of time.
127 * @param timeout The amount of time to wait for the character
128 * @return The character read, -1 if EOF is reached, or -2 if the
129 * read timed out.
130 * @throws IOException
131 */
132 public int read(long timeout) throws IOException {
133 if (!nonBlockingEnabled || isShutdown) {
134 throw new UnsupportedOperationException ("read() with timeout "
135 + "cannot be called as non-blocking operation is disabled");
136 }
137 return read(timeout, false);
138 }
139
140 /**
141 * Attempts to read a character from the input stream for a specific
142 * period of time.
143 * @param timeout The amount of time to wait for the character
144 * @return The character read, -1 if EOF is reached, or -2 if the
145 * read timed out.
146 * @throws IOException
147 */
148 private synchronized int read(long timeout, boolean isPeek) throws IOException {
149 /*
150 * If the thread hit an IOException, we report it.
151 */
152 if (exception != null) {
153 assert ch == -2;
154 IOException toBeThrown = exception;
155 if (!isPeek)
156 exception = null;
157 throw toBeThrown;
158 }
159
160 /*
161 * If there was a pending character from the thread, then
162 * we send it. If the timeout is 0L or the thread was shut down
163 * then do a local read.
164 */
165 if (ch >= -1) {
166 assert exception == null;
167 }
168 else if ((timeout == 0L || isShutdown) && !threadIsReading) {
169 ch = in.read();
170 }
171 else {
172 /*
173 * If the thread isn't reading already, then ask it to do so.
174 */
175 if (!threadIsReading) {
176 threadIsReading = true;
177 notify();
178 }
179
180 boolean isInfinite = (timeout <= 0L);
181
182 /*
183 * So the thread is currently doing the reading for us. So
184 * now we play the waiting game.
185 */
186 while (isInfinite || timeout > 0L) {
187 long start = System.currentTimeMillis ();
188
189 try {
190 wait(timeout);
191 }
192 catch (InterruptedException e) {
193 /* IGNORED */
194 }
195
196 if (exception != null) {
197 assert ch == -2;
198
199 IOException toBeThrown = exception;
200 if (!isPeek)
201 exception = null;
202 throw toBeThrown;
203 }
204
205 if (ch >= -1) {
206 assert exception == null;
207 break;
208 }
209
210 if (!isInfinite) {
211 timeout -= System.currentTimeMillis() - start;
212 }
213 }
214 }
215
216 /*
217 * ch is the character that was just read. Either we set it because
218 * a local read was performed or the read thread set it (or failed to
219 * change it). We will return it's value, but if this was a peek
220 * operation, then we leave it in place.
221 */
222 int ret = ch;
223 if (!isPeek) {
224 ch = -2;
225 }
226 return ret;
227 }
228
229 /**
230 * This version of read() is very specific to jline's purposes, it
231 * will always always return a single byte at a time, rather than filling
232 * the entire buffer.
233 */
234 @Override
235 public int read (byte[] b, int off, int len) throws IOException {
236 if (b == null) {
237 throw new NullPointerException();
238 } else if (off < 0 || len < 0 || len > b.length - off) {
239 throw new IndexOutOfBoundsException();
240 } else if (len == 0) {
241 return 0;
242 }
243
244 int c;
245 if (nonBlockingEnabled)
246 c = this.read(0L);
247 else
248 c = in.read();
249
250 if (c == -1) {
251 return -1;
252 }
253 b[off] = (byte)c;
254 return 1;
255 }
256
257 //@Override
258 public void run () {
259 Log.debug("NonBlockingInputStream start");
260 boolean needToShutdown = false;
261 boolean needToRead = false;
262
263 while (!needToShutdown) {
264
265 /*
266 * Synchronize to grab variables accessed by both this thread
267 * and the accessing thread.
268 */
269 synchronized (this) {
270 needToShutdown = this.isShutdown;
271 needToRead = this.threadIsReading;
272
273 try {
274 /*
275 * Nothing to do? Then wait.
276 */
277 if (!needToShutdown && !needToRead) {
278 wait(0);
279 }
280 }
281 catch (InterruptedException e) {
282 /* IGNORED */
283 }
284 }
285
286 /*
287 * We're not shutting down, but we need to read. This cannot
288 * happen while we are holding the lock (which we aren't now).
289 */
290 if (!needToShutdown && needToRead) {
291 int charRead = -2;
292 IOException failure = null;
293 try {
294 charRead = in.read();
295 }
296 catch (IOException e) {
297 failure = e;
298 }
299
300 /*
301 * Re-grab the lock to update the state.
302 */
303 synchronized (this) {
304 exception = failure;
305 ch = charRead;
306 threadIsReading = false;
307 notify();
308 }
309 }
310 }
311
312 Log.debug("NonBlockingInputStream shutdown");
313 }
314 }