Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "stub: Wait for onClose when blocking stub is interrupted"" #6255

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
79 changes: 49 additions & 30 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Expand Up @@ -124,25 +124,30 @@ public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call
public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
try {
executor.waitAndDrain();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withCause(e)
.asRuntimeException();
interrupt = true;
call.cancel("Thread interrupted", e);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return getUnchecked(responseFuture);
} catch (RuntimeException e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
}
}

Expand Down Expand Up @@ -209,7 +214,7 @@ private static <V> V getUnchecked(Future<V> future) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Status.CANCELLED
.withDescription("Call was interrupted")
.withDescription("Thread interrupted")
.withCause(e)
.asRuntimeException();
} catch (ExecutionException e) {
Expand Down Expand Up @@ -553,30 +558,45 @@ ClientCall.Listener<T> listener() {
return listener;
}

private Object waitForNext() throws InterruptedException {
if (threadless == null) {
return buffer.take();
} else {
Object next = buffer.poll();
while (next == null) {
threadless.waitAndDrain();
next = buffer.poll();
private Object waitForNext() {
boolean interrupt = false;
try {
if (threadless == null) {
while (true) {
try {
return buffer.take();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
}
}
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
return next;
}
} finally {
if (interrupt) {
Thread.currentThread().interrupt();
}
return next;
}
}

@Override
public boolean hasNext() {
if (last == null) {
try {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw Status.CANCELLED.withDescription("interrupted").withCause(ie).asRuntimeException();
}
while (last == null) {
// Will block here indefinitely waiting for content. RPC timeouts defend against permanent
// hangs here as the call will become closed.
last = waitForNext();
}
if (last instanceof StatusRuntimeException) {
// Rethrow the exception with a new stacktrace.
Expand Down Expand Up @@ -650,15 +670,14 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
* Must only be called by one thread at a time.
*/
public void waitAndDrain() throws InterruptedException {
final Thread currentThread = Thread.currentThread();
throwIfInterrupted(currentThread);
throwIfInterrupted();
Runnable runnable = poll();
if (runnable == null) {
waiter = currentThread;
waiter = Thread.currentThread();
try {
while ((runnable = poll()) == null) {
LockSupport.park(this);
throwIfInterrupted(currentThread);
throwIfInterrupted();
}
} finally {
waiter = null;
Expand All @@ -673,8 +692,8 @@ public void waitAndDrain() throws InterruptedException {
} while ((runnable = poll()) != null);
}

private static void throwIfInterrupted(Thread currentThread) throws InterruptedException {
if (currentThread.isInterrupted()) {
private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
Expand Down