1
2
3
4
5
6
7
8
9 package jline.internal;
10
11 import java.io.IOException;
12 import java.io.InputStream;
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 public class NonBlockingInputStream
31 extends InputStream
32 implements Runnable
33 {
34 private InputStream in;
35 private int ch = -2;
36
37 private boolean threadIsReading = false;
38 private boolean isShutdown = false;
39 private IOException exception = null;
40 private boolean nonBlockingEnabled;
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public NonBlockingInputStream (InputStream in, boolean isNonBlockingEnabled) {
55 this.in = in;
56 this.nonBlockingEnabled = isNonBlockingEnabled;
57
58 if (isNonBlockingEnabled) {
59 Thread t = new Thread(this);
60 t.setName("NonBlockingInputStreamThread");
61 t.setDaemon(true);
62 t.start();
63 }
64 }
65
66
67
68
69
70
71
72
73
74 public synchronized void shutdown() {
75 if (!isShutdown && nonBlockingEnabled) {
76 isShutdown = true;
77 notify();
78 }
79 }
80
81
82
83
84
85
86 public boolean isNonBlockingEnabled() {
87 return nonBlockingEnabled && !isShutdown;
88 }
89
90 @Override
91 public void close() throws IOException {
92
93
94
95
96 in.close();
97 shutdown();
98 }
99
100 @Override
101 public int read() throws IOException {
102 if (nonBlockingEnabled)
103 return read(0L, false);
104 return in.read ();
105 }
106
107
108
109
110
111
112
113
114
115
116 public int peek(long timeout) throws IOException {
117 if (!nonBlockingEnabled || isShutdown) {
118 throw new UnsupportedOperationException ("peek() "
119 + "cannot be called as non-blocking operation is disabled");
120 }
121 return read(timeout, true);
122 }
123
124
125
126
127
128
129
130
131
132 public int read(long timeout) throws IOException {
133 if (!nonBlockingEnabled || isShutdown) {
134 throw new UnsupportedOperationException ("read() with timeout "
135 + "cannot be called as non-blocking operation is disabled");
136 }
137 return read(timeout, false);
138 }
139
140
141
142
143
144
145
146
147
148 private synchronized int read(long timeout, boolean isPeek) throws IOException {
149
150
151
152 if (exception != null) {
153 assert ch == -2;
154 IOException toBeThrown = exception;
155 if (!isPeek)
156 exception = null;
157 throw toBeThrown;
158 }
159
160
161
162
163
164
165 if (ch >= -1) {
166 assert exception == null;
167 }
168 else if ((timeout == 0L || isShutdown) && !threadIsReading) {
169 ch = in.read();
170 }
171 else {
172
173
174
175 if (!threadIsReading) {
176 threadIsReading = true;
177 notify();
178 }
179
180 boolean isInfinite = (timeout <= 0L);
181
182
183
184
185
186 while (isInfinite || timeout > 0L) {
187 long start = System.currentTimeMillis ();
188
189 try {
190 wait(timeout);
191 }
192 catch (InterruptedException e) {
193
194 }
195
196 if (exception != null) {
197 assert ch == -2;
198
199 IOException toBeThrown = exception;
200 if (!isPeek)
201 exception = null;
202 throw toBeThrown;
203 }
204
205 if (ch >= -1) {
206 assert exception == null;
207 break;
208 }
209
210 if (!isInfinite) {
211 timeout -= System.currentTimeMillis() - start;
212 }
213 }
214 }
215
216
217
218
219
220
221
222 int ret = ch;
223 if (!isPeek) {
224 ch = -2;
225 }
226 return ret;
227 }
228
229
230
231
232
233
234 @Override
235 public int read (byte[] b, int off, int len) throws IOException {
236 if (b == null) {
237 throw new NullPointerException();
238 } else if (off < 0 || len < 0 || len > b.length - off) {
239 throw new IndexOutOfBoundsException();
240 } else if (len == 0) {
241 return 0;
242 }
243
244 int c;
245 if (nonBlockingEnabled)
246 c = this.read(0L);
247 else
248 c = in.read();
249
250 if (c == -1) {
251 return -1;
252 }
253 b[off] = (byte)c;
254 return 1;
255 }
256
257
258 public void run () {
259 Log.debug("NonBlockingInputStream start");
260 boolean needToShutdown = false;
261 boolean needToRead = false;
262
263 while (!needToShutdown) {
264
265
266
267
268
269 synchronized (this) {
270 needToShutdown = this.isShutdown;
271 needToRead = this.threadIsReading;
272
273 try {
274
275
276
277 if (!needToShutdown && !needToRead) {
278 wait(0);
279 }
280 }
281 catch (InterruptedException e) {
282
283 }
284 }
285
286
287
288
289
290 if (!needToShutdown && needToRead) {
291 int charRead = -2;
292 IOException failure = null;
293 try {
294 charRead = in.read();
295 }
296 catch (IOException e) {
297 failure = e;
298 }
299
300
301
302
303 synchronized (this) {
304 exception = failure;
305 ch = charRead;
306 threadIsReading = false;
307 notify();
308 }
309 }
310 }
311
312 Log.debug("NonBlockingInputStream shutdown");
313 }
314 }