Skip to content

Commit 77f1cea

Browse files
authoredJan 16, 2023
Make readBuffered blocking and add more readBuffered methods, fixes #757 (#782)
1 parent 4f57697 commit 77f1cea

11 files changed

+230
-99
lines changed
 

‎reader/src/test/java/org/jline/terminal/impl/AbstractWindowsTerminalTest.java

+10
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,16 @@ public void testBracketingPasteHuge() throws Exception {
5252
for (int i = 0; i < 100000; i++) {
5353
str.append("0123456789");
5454
}
55+
str.toString().chars().forEachOrdered(c -> process(terminal, c) );
56+
try {
57+
Thread.sleep(1000);
58+
} catch (InterruptedException e) {
59+
e.printStackTrace();
60+
}
61+
str.setLength(0);
62+
for (int i = 0; i < 100000; i++) {
63+
str.append("0123456789");
64+
}
5565
str.append(LineReaderImpl.BRACKETED_PASTE_END);
5666
str.append("\n");
5767
str.toString().chars().forEachOrdered(c -> process(terminal, c));

‎terminal/src/main/java/org/jline/terminal/impl/AbstractPty.java

-5
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,6 @@ public int read(long timeout, boolean isPeek) throws IOException {
8686
}
8787
}
8888

89-
@Override
90-
public int readBuffered(byte[] b) throws IOException {
91-
return in.read(b);
92-
}
93-
9489
private void setNonBlocking() {
9590
if (current == null
9691
|| current.getControlChar(Attributes.ControlChar.VMIN) != 0

‎terminal/src/main/java/org/jline/utils/NonBlocking.java

+33-40
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,9 @@ public void close() throws IOException {
9595

9696
@Override
9797
public int read(long timeout, boolean isPeek) throws IOException {
98-
boolean isInfinite = (timeout <= 0L);
99-
while (!bytes.hasRemaining() && (isInfinite || timeout > 0L)) {
100-
long start = 0;
101-
if (!isInfinite) {
102-
start = System.currentTimeMillis();
103-
}
104-
int c = reader.read(timeout);
98+
Timeout t = new Timeout(timeout);
99+
while (!bytes.hasRemaining() && !t.elapsed()) {
100+
int c = reader.read(t.timeout());
105101
if (c == EOF) {
106102
return EOF;
107103
}
@@ -117,9 +113,6 @@ public int read(long timeout, boolean isPeek) throws IOException {
117113
encoder.encode(chars, bytes, false);
118114
bytes.flip();
119115
}
120-
if (!isInfinite) {
121-
timeout -= System.currentTimeMillis() - start;
122-
}
123116
}
124117
if (bytes.hasRemaining()) {
125118
if (isPeek) {
@@ -151,21 +144,17 @@ public NonBlockingInputStreamReader(NonBlockingInputStream inputStream, Charset
151144
public NonBlockingInputStreamReader(NonBlockingInputStream input, CharsetDecoder decoder) {
152145
this.input = input;
153146
this.decoder = decoder;
154-
this.bytes = ByteBuffer.allocate(4);
155-
this.chars = CharBuffer.allocate(2);
147+
this.bytes = ByteBuffer.allocate(2048);
148+
this.chars = CharBuffer.allocate(1024);
156149
this.bytes.limit(0);
157150
this.chars.limit(0);
158151
}
159152

160153
@Override
161154
protected int read(long timeout, boolean isPeek) throws IOException {
162-
boolean isInfinite = (timeout <= 0L);
163-
while (!chars.hasRemaining() && (isInfinite || timeout > 0L)) {
164-
long start = 0;
165-
if (!isInfinite) {
166-
start = System.currentTimeMillis();
167-
}
168-
int b = input.read(timeout);
155+
Timeout t = new Timeout(timeout);
156+
while (!chars.hasRemaining() && !t.elapsed()) {
157+
int b = input.read(t.timeout());
169158
if (b == EOF) {
170159
return EOF;
171160
}
@@ -181,10 +170,6 @@ protected int read(long timeout, boolean isPeek) throws IOException {
181170
decoder.decode(bytes, chars, false);
182171
chars.flip();
183172
}
184-
185-
if (!isInfinite) {
186-
timeout -= System.currentTimeMillis() - start;
187-
}
188173
}
189174
if (chars.hasRemaining()) {
190175
if (isPeek) {
@@ -198,29 +183,37 @@ protected int read(long timeout, boolean isPeek) throws IOException {
198183
}
199184

200185
@Override
201-
public int readBuffered(char[] b) throws IOException {
186+
public int readBuffered(char[] b, int off, int len, long timeout) throws IOException {
202187
if (b == null) {
203188
throw new NullPointerException();
204-
} else if (b.length == 0) {
189+
} else if (off < 0 || len < 0 || off + len < b.length) {
190+
throw new IllegalArgumentException();
191+
} else if (len == 0) {
205192
return 0;
193+
} else if (chars.hasRemaining()) {
194+
int r = Math.min(len, chars.remaining());
195+
chars.get(b, off, r);
196+
return r;
206197
} else {
207-
if (chars.hasRemaining()) {
208-
int r = Math.min(b.length, chars.remaining());
209-
chars.get(b);
210-
return r;
211-
} else {
212-
byte[] buf = new byte[b.length];
213-
int l = input.readBuffered(buf);
214-
if (l < 0) {
215-
return l;
216-
} else {
217-
ByteBuffer bytes = ByteBuffer.wrap(buf, 0, l);
218-
CharBuffer chars = CharBuffer.wrap(b);
219-
decoder.decode(bytes, chars, false);
220-
chars.flip();
221-
return chars.remaining();
198+
Timeout t = new Timeout(timeout);
199+
while (!chars.hasRemaining() && !t.elapsed()) {
200+
if (!bytes.hasRemaining()) {
201+
bytes.position(0);
202+
bytes.limit(0);
203+
}
204+
int nb = input.readBuffered(bytes.array(), bytes.limit(),
205+
bytes.capacity() - bytes.limit(), t.timeout());
206+
if (nb < 0) {
207+
return nb;
222208
}
209+
bytes.limit(bytes.limit() + nb);
210+
chars.clear();
211+
decoder.decode(bytes, chars, false);
212+
chars.flip();
223213
}
214+
int nb = Math.min(len, chars.remaining());
215+
chars.get(b, off, nb);
216+
return nb;
224217
}
225218
}
226219

‎terminal/src/main/java/org/jline/utils/NonBlockingInputStream.java

+24-2
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,34 @@ public int read(byte b[], int off, int len) throws IOException {
7979
}
8080

8181
public int readBuffered(byte[] b) throws IOException {
82+
return readBuffered(b, 0L);
83+
}
84+
85+
public int readBuffered(byte[] b, long timeout) throws IOException {
86+
return readBuffered(b, 0, b.length, timeout);
87+
}
88+
89+
public int readBuffered(byte[] b, int off, int len, long timeout) throws IOException {
8290
if (b == null) {
8391
throw new NullPointerException();
84-
} else if (b.length == 0) {
92+
} else if (off < 0 || len < 0 || off + len < b.length) {
93+
throw new IllegalArgumentException();
94+
} else if (len == 0) {
8595
return 0;
8696
} else {
87-
return super.read(b, 0, b.length);
97+
Timeout t = new Timeout(timeout);
98+
int nb = 0;
99+
while (!t.elapsed()) {
100+
int r = read(nb > 0 ? 1 : t.timeout());
101+
if (r < 0) {
102+
return nb > 0 ? nb : r;
103+
}
104+
b[off + nb++] = (byte) r;
105+
if (nb >= len || t.isInfinite()) {
106+
break;
107+
}
108+
}
109+
return nb;
88110
}
89111
}
90112

‎terminal/src/main/java/org/jline/utils/NonBlockingInputStreamImpl.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -123,20 +123,17 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) {
123123
notifyAll();
124124
}
125125

126-
boolean isInfinite = (timeout <= 0L);
127-
128126
/*
129127
* So the thread is currently doing the reading for us. So
130128
* now we play the waiting game.
131129
*/
132-
while (isInfinite || timeout > 0L) {
133-
long start = System.currentTimeMillis ();
134-
130+
Timeout t = new Timeout(timeout);
131+
while (!t.elapsed()) {
135132
try {
136133
if (Thread.interrupted()) {
137134
throw new InterruptedException();
138135
}
139-
wait(timeout);
136+
wait(t.timeout());
140137
}
141138
catch (InterruptedException e) {
142139
exception = (IOException) new InterruptedIOException().initCause(e);
@@ -155,10 +152,6 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) {
155152
assert exception == null;
156153
break;
157154
}
158-
159-
if (!isInfinite) {
160-
timeout -= System.currentTimeMillis() - start;
161-
}
162155
}
163156
}
164157

‎terminal/src/main/java/org/jline/utils/NonBlockingPumpInputStream.java

+20-19
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,17 @@ public OutputStream getOutputStream() {
4545
}
4646

4747
private int wait(ByteBuffer buffer, long timeout) throws IOException {
48-
boolean isInfinite = (timeout <= 0L);
49-
long end = 0;
50-
if (!isInfinite) {
51-
end = System.currentTimeMillis() + timeout;
52-
}
53-
while (!closed && !buffer.hasRemaining() && (isInfinite || timeout > 0L)) {
48+
Timeout t = new Timeout(timeout);
49+
while (!closed && !buffer.hasRemaining() && !t.elapsed()) {
5450
// Wake up waiting readers/writers
5551
notifyAll();
5652
try {
57-
wait(timeout);
53+
wait(t.timeout());
5854
checkIoException();
5955
} catch (InterruptedException e) {
6056
checkIoException();
6157
throw new InterruptedIOException();
6258
}
63-
if (!isInfinite) {
64-
timeout = end - System.currentTimeMillis();
65-
}
6659
}
6760
return buffer.hasRemaining()
6861
? 0
@@ -107,17 +100,25 @@ public synchronized int read(long timeout, boolean isPeek) throws IOException {
107100
}
108101

109102
@Override
110-
public synchronized int readBuffered(byte[] b) throws IOException {
111-
checkIoException();
112-
int res = wait(readBuffer, 0L);
113-
if (res >= 0) {
114-
res = 0;
115-
while (res < b.length && readBuffer.hasRemaining()) {
116-
b[res++] = (byte) (readBuffer.get() & 0x00FF);
103+
public synchronized int readBuffered(byte[] b, int off, int len, long timeout) throws IOException {
104+
if (b == null) {
105+
throw new NullPointerException();
106+
} else if (off < 0 || len < 0 || off + len < b.length) {
107+
throw new IllegalArgumentException();
108+
} else if (len == 0) {
109+
return 0;
110+
} else {
111+
checkIoException();
112+
int res = wait(readBuffer, timeout);
113+
if (res >= 0) {
114+
res = 0;
115+
while (res < len && readBuffer.hasRemaining()) {
116+
b[off + res++] = (byte) (readBuffer.get() & 0x00FF);
117+
}
117118
}
119+
rewind(readBuffer, writeBuffer);
120+
return res;
118121
}
119-
rewind(readBuffer, writeBuffer);
120-
return res;
121122
}
122123

123124
public synchronized void setIoException(IOException exception) {

‎terminal/src/main/java/org/jline/utils/NonBlockingPumpReader.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,26 @@ protected int read(long timeout, boolean isPeek) throws IOException {
106106
}
107107

108108
@Override
109-
public int readBuffered(char[] b) throws IOException {
109+
public int readBuffered(char[] b, int off, int len, long timeout) throws IOException {
110110
if (b == null) {
111111
throw new NullPointerException();
112-
} else if (b.length == 0) {
112+
} else if (off < 0 || len < 0 || off + len < b.length) {
113+
throw new IllegalArgumentException();
114+
} else if (len == 0) {
113115
return 0;
114116
} else {
115117
final ReentrantLock lock = this.lock;
116118
lock.lock();
117119
try {
118120
if (!closed && count == 0) {
119121
try {
120-
notEmpty.await();
122+
if (timeout > 0) {
123+
if (!notEmpty.await(timeout, TimeUnit.MILLISECONDS)) {
124+
throw new IOException( "Timeout reading" );
125+
}
126+
} else {
127+
notEmpty.await();
128+
}
121129
} catch (InterruptedException e) {
122130
throw (IOException) new InterruptedIOException().initCause(e);
123131
}
@@ -127,9 +135,9 @@ public int readBuffered(char[] b) throws IOException {
127135
} else if (count == 0) {
128136
return READ_EXPIRED;
129137
} else {
130-
int r = Math.min(b.length, count);
138+
int r = Math.min(len, count);
131139
for (int i = 0; i < r; i++) {
132-
b[i] = buffer[read++];
140+
b[off + i] = buffer[read++];
133141
if (read == buffer.length) {
134142
read = 0;
135143
}

‎terminal/src/main/java/org/jline/utils/NonBlockingReader.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,15 @@ public int read(char[] b, int off, int len) throws IOException {
8585
return 1;
8686
}
8787

88-
public abstract int readBuffered(char[] b) throws IOException;
88+
public int readBuffered(char[] b) throws IOException {
89+
return readBuffered(b, 0L);
90+
}
91+
92+
public int readBuffered(char[] b, long timeout) throws IOException {
93+
return readBuffered(b, 0, b.length, timeout);
94+
}
95+
96+
public abstract int readBuffered(char[] b, int off, int len, long timeout) throws IOException;
8997

9098
public int available() {
9199
return 0;

‎terminal/src/main/java/org/jline/utils/NonBlockingReaderImpl.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,12 @@ public synchronized boolean ready() throws IOException {
9191
}
9292

9393
@Override
94-
public int readBuffered(char[] b) throws IOException {
94+
public int readBuffered(char[] b, int off, int len, long timeout) throws IOException {
9595
if (b == null) {
9696
throw new NullPointerException();
97-
} else if (b.length == 0) {
97+
} else if (off < 0 || len < 0 || off + len < b.length) {
98+
throw new IllegalArgumentException();
99+
} else if (len == 0) {
98100
return 0;
99101
} else if (exception != null) {
100102
assert ch == READ_EXPIRED;
@@ -105,15 +107,16 @@ public int readBuffered(char[] b) throws IOException {
105107
b[0] = (char) ch;
106108
ch = READ_EXPIRED;
107109
return 1;
108-
} else if (!threadIsReading) {
109-
return in.read(b);
110+
} else if (!threadIsReading && timeout <= 0) {
111+
return in.read(b, off, len);
110112
} else {
111-
int c = read(-1, false);
113+
// TODO: rework implementation to read as much as possible
114+
int c = read(timeout, false);
112115
if (c >= 0) {
113-
b[0] = (char) c;
116+
b[off] = (char) c;
114117
return 1;
115118
} else {
116-
return -1;
119+
return c;
117120
}
118121
}
119122
}
@@ -158,20 +161,17 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) {
158161
notifyAll();
159162
}
160163

161-
boolean isInfinite = (timeout <= 0L);
162-
163164
/*
164165
* So the thread is currently doing the reading for us. So
165166
* now we play the waiting game.
166167
*/
167-
while (isInfinite || timeout > 0L) {
168-
long start = System.currentTimeMillis ();
169-
168+
Timeout t = new Timeout(timeout);
169+
while (!t.elapsed()) {
170170
try {
171171
if (Thread.interrupted()) {
172172
throw new InterruptedException();
173173
}
174-
wait(timeout);
174+
wait(t.timeout());
175175
}
176176
catch (InterruptedException e) {
177177
exception = (IOException) new InterruptedIOException().initCause(e);
@@ -190,10 +190,6 @@ else if (!isPeek && timeout <= 0L && !threadIsReading) {
190190
assert exception == null;
191191
break;
192192
}
193-
194-
if (!isInfinite) {
195-
timeout -= System.currentTimeMillis() - start;
196-
}
197193
}
198194
}
199195

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2002-2018, the original author or authors.
3+
*
4+
* This software is distributable under the BSD license. See the terms of the
5+
* BSD license in the documentation provided with this software.
6+
*
7+
* https://opensource.org/licenses/BSD-3-Clause
8+
*/
9+
package org.jline.utils;
10+
11+
/**
12+
* Helper class ti use during I/O operations with an eventual timeout.
13+
*/
14+
public class Timeout {
15+
16+
private final long timeout;
17+
private long cur = 0;
18+
private long end = Long.MAX_VALUE;
19+
20+
public Timeout(long timeout) {
21+
this.timeout = timeout;
22+
}
23+
24+
public boolean isInfinite() {
25+
return timeout <= 0;
26+
}
27+
28+
public boolean isFinite() {
29+
return timeout > 0;
30+
}
31+
32+
public boolean elapsed() {
33+
if (timeout > 0) {
34+
cur = System.currentTimeMillis();
35+
if (end == Long.MAX_VALUE) {
36+
end = cur + timeout;
37+
}
38+
return cur >= end;
39+
} else {
40+
return false;
41+
}
42+
}
43+
44+
public long timeout() {
45+
return timeout > 0 ? Math.max(1, end - cur) : timeout;
46+
}
47+
48+
}

‎terminal/src/test/java/org/jline/utils/NonBlockingTest.java

+57
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,63 @@ public int read(long timeout, boolean isPeek) throws IOException {
5050
assertEquals(-1, nbr.read(100));
5151
}
5252

53+
@Test
54+
public void testNonBlockingReaderBufferedWithNonBufferedInput() throws IOException {
55+
NonBlockingInputStream nbis = new NonBlockingInputStream() {
56+
int idx = 0;
57+
byte[] input = "中英字典".getBytes(StandardCharsets.UTF_8);
58+
@Override
59+
public int read(long timeout, boolean isPeek) throws IOException {
60+
if (idx < input.length) {
61+
return input[idx++] & 0x00FF;
62+
} else {
63+
return -1;
64+
}
65+
}
66+
};
67+
NonBlockingReader nbr = NonBlocking.nonBlocking("name", nbis, StandardCharsets.UTF_8);
68+
char[] buf = new char[4];
69+
assertEquals( 1, nbr.readBuffered(buf, 0));
70+
assertEquals('中', buf[0]);
71+
assertEquals( 1, nbr.readBuffered(buf, 0));
72+
assertEquals('英', buf[0]);
73+
assertEquals( 1, nbr.readBuffered(buf, 0));
74+
assertEquals('字', buf[0]);
75+
assertEquals( 1, nbr.readBuffered(buf, 0));
76+
assertEquals('典', buf[0]);
77+
}
78+
79+
@Test
80+
public void testNonBlockingReaderBufferedWithBufferedInput() throws IOException {
81+
NonBlockingInputStream nbis = new NonBlockingInputStream() {
82+
int idx = 0;
83+
byte[] input = "中英字典".getBytes(StandardCharsets.UTF_8);
84+
@Override
85+
public int read(long timeout, boolean isPeek) throws IOException {
86+
if (idx < input.length) {
87+
return input[idx++] & 0x00FF;
88+
} else {
89+
return -1;
90+
}
91+
}
92+
@Override
93+
public int readBuffered(byte[] b, int off, int len, long timeout) throws IOException {
94+
int i = 0;
95+
while (i < len && idx < input.length) {
96+
b[off + i++] = input[idx++];
97+
}
98+
return i > 0 ? i : -1;
99+
}
100+
};
101+
NonBlockingReader nbr = NonBlocking.nonBlocking("name", nbis, StandardCharsets.UTF_8);
102+
char[] buf = new char[4];
103+
assertEquals( 4, nbr.readBuffered(buf, 0));
104+
assertEquals('中', buf[0]);
105+
assertEquals('英', buf[1]);
106+
assertEquals('字', buf[2]);
107+
assertEquals('典', buf[3]);
108+
}
109+
53110
@Test
54111
public void testNonBlockingPumpReader() throws IOException {
55112
NonBlockingPumpReader nbr = NonBlocking.nonBlockingPumpReader();

0 commit comments

Comments
 (0)
Please sign in to comment.