Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ThreadlessExecutor from BlockingServerStream #10496

Merged
merged 2 commits into from
Aug 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 14 additions & 49 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,7 @@
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try {
return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -167,10 +165,7 @@
}
executor.shutdown();
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
Expand Down Expand Up @@ -206,14 +201,12 @@
*
* @return an iterator over the response stream.
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));

BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener());
return result;
}
Expand Down Expand Up @@ -288,8 +281,7 @@
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion can be disabled. So this change may skip logging some error messages, its safer to not change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assertion is checking for checked exceptions, which should't happen. Catching Throwable was because this was written before we could write RuntimeException | Error. I'm not worried about this change.

assert e instanceof RuntimeException || e instanceof Error;
} catch (RuntimeException | Error e) {

Check warning on line 284 in stub/src/main/java/io/grpc/stub/ClientCalls.java

View check run for this annotation

Codecov / codecov/patch

stub/src/main/java/io/grpc/stub/ClientCalls.java#L284

Added line #L284 was not covered by tests
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
Expand Down Expand Up @@ -320,9 +312,7 @@
try {
call.sendMessage(req);
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
} catch (RuntimeException | Error e) {

Check warning on line 315 in stub/src/main/java/io/grpc/stub/ClientCalls.java

View check run for this annotation

Codecov / codecov/patch

stub/src/main/java/io/grpc/stub/ClientCalls.java#L315

Added line #L315 was not covered by tests
throw cancelThrow(call, e);
}
}
Expand Down Expand Up @@ -597,20 +587,12 @@
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating.
private Object last;

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}

// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call;
this.threadless = threadless;
}

StartableListener<T> listener() {
Expand All @@ -620,31 +602,14 @@
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
return next;
}
} finally {
if (interrupt) {
Expand Down