Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DelayedTransport shutdown race #7743

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 7 additions & 8 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -294,12 +294,10 @@ final void reprocess(@Nullable SubchannelPicker picker) {
if (callOptions.getExecutor() != null) {
executor = callOptions.getExecutor();
}
executor.execute(new Runnable() {
@Override
public void run() {
stream.createRealStream(transport);
}
});
Runnable runnable = stream.createRealStream(transport);
if (runnable != null) {
executor.execute(runnable);
}
toRemove.add(stream);
} // else: stay pending
}
Expand Down Expand Up @@ -346,7 +344,8 @@ private PendingStream(PickSubchannelArgs args) {
this.args = args;
}

private void createRealStream(ClientTransport transport) {
/** Runnable may be null. */
private Runnable createRealStream(ClientTransport transport) {
ClientStream realStream;
Context origContext = context.attach();
try {
Expand All @@ -355,7 +354,7 @@ private void createRealStream(ClientTransport transport) {
} finally {
context.detach(origContext);
}
setStream(realStream);
return setStream(realStream);
}

@Override
Expand Down
121 changes: 78 additions & 43 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -29,6 +29,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -59,38 +60,35 @@ class DelayedStream implements ClientStream {
private long startTimeNanos;
@GuardedBy("this")
private long streamSetTimeNanos;
// No need to synchronize; start() synchronization provides a happens-before
private List<Runnable> preStartPendingCalls = new ArrayList<>();

@Override
public void setMaxInboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxInboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxInboundMessageSize(maxSize);
}
});
}

@Override
public void setMaxOutboundMessageSize(final int maxSize) {
if (passThrough) {
realStream.setMaxOutboundMessageSize(maxSize);
} else {
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setMaxOutboundMessageSize(maxSize);
}
});
}

@Override
public void setDeadline(final Deadline deadline) {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDeadline(deadline);
Expand All @@ -115,21 +113,41 @@ public void appendTimeoutInsight(InsightBuilder insight) {
}

/**
* Transfers all pending and future requests and mutations to the given stream.
* Transfers all pending and future requests and mutations to the given stream. Method will return
* quickly, but if the returned Runnable is non-null it must be called to complete the process.
* The Runnable may take a while to execute.
*
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
final void setStream(ClientStream stream) {
// When this method returns, start() has been called on realStream or passThrough is guaranteed to
// be true
@CheckReturnValue
final Runnable setStream(ClientStream stream) {
ClientStreamListener savedListener;
synchronized (this) {
// If realStream != null, then either setStream() or cancel() has been called.
if (realStream != null) {
return;
return null;
}
setRealStream(checkNotNull(stream, "stream"));
savedListener = listener;
if (savedListener == null) {
assert pendingCalls.isEmpty();
pendingCalls = null;
passThrough = true;
}
}
if (savedListener == null) {
return null;
} else {
internalStart(savedListener);
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}

drainPendingCalls();
}

/**
Expand Down Expand Up @@ -177,6 +195,7 @@ private void drainPendingCalls() {
* only if {@code runnable} is thread-safe.
*/
private void delayOrExecute(Runnable runnable) {
checkState(listener != null, "May only be called after start");
synchronized (this) {
if (!passThrough) {
pendingCalls.add(runnable);
Expand All @@ -190,7 +209,7 @@ private void delayOrExecute(Runnable runnable) {
public void setAuthority(final String authority) {
checkState(listener == null, "May only be called before start");
checkNotNull(authority, "authority");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setAuthority(authority);
Expand All @@ -200,18 +219,19 @@ public void run() {

@Override
public void start(ClientStreamListener listener) {
checkNotNull(listener, "listener");
checkState(this.listener == null, "already started");

Status savedError;
boolean savedPassThrough;
synchronized (this) {
this.listener = checkNotNull(listener, "listener");
// If error != null, then cancel() has been called and was unable to close the listener
savedError = error;
savedPassThrough = passThrough;
if (!savedPassThrough) {
listener = delayedListener = new DelayedStreamListener(listener);
}
this.listener = listener;
startTimeNanos = System.nanoTime();
}
if (savedError != null) {
Expand All @@ -220,16 +240,20 @@ public void start(ClientStreamListener listener) {
}

if (savedPassThrough) {
realStream.start(listener);
} else {
final ClientStreamListener finalListener = listener;
delayOrExecute(new Runnable() {
@Override
public void run() {
realStream.start(finalListener);
}
});
internalStart(listener);
} // else internalStart() will be called by setStream
}

/**
* Starts stream without synchronization. {@code listener} should be same instance as {@link
* #listener}.
*/
private void internalStart(ClientStreamListener listener) {
for (Runnable runnable : preStartPendingCalls) {
runnable.run();
}
preStartPendingCalls = null;
realStream.start(listener);
}

@Override
Expand All @@ -247,6 +271,7 @@ public Attributes getAttributes() {

@Override
public void writeMessage(final InputStream message) {
checkState(listener != null, "May only be called after start");
checkNotNull(message, "message");
if (passThrough) {
realStream.writeMessage(message);
Expand All @@ -262,6 +287,7 @@ public void run() {

@Override
public void flush() {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.flush();
} else {
Expand All @@ -277,6 +303,7 @@ public void run() {
// When this method returns, passThrough is guaranteed to be true
@Override
public void cancel(final Status reason) {
checkState(listener != null, "May only be called after start");
checkNotNull(reason, "reason");
boolean delegateToRealStream = true;
ClientStreamListener listenerToClose = null;
Expand All @@ -298,10 +325,11 @@ public void run() {
}
});
} else {
drainPendingCalls();
if (listenerToClose != null) {
// Note that listenerToClose is a DelayedStreamListener
listenerToClose.closed(reason, new Metadata());
}
drainPendingCalls();
}
}

Expand All @@ -314,6 +342,7 @@ private void setRealStream(ClientStream realStream) {

@Override
public void halfClose() {
checkState(listener != null, "May only be called after start");
delayOrExecute(new Runnable() {
@Override
public void run() {
Expand All @@ -324,6 +353,7 @@ public void run() {

@Override
public void request(final int numMessages) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.request(numMessages);
} else {
Expand All @@ -338,7 +368,8 @@ public void run() {

@Override
public void optimizeForDirectExecutor() {
delayOrExecute(new Runnable() {
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.optimizeForDirectExecutor();
Expand All @@ -348,8 +379,9 @@ public void run() {

@Override
public void setCompressor(final Compressor compressor) {
checkState(listener == null, "May only be called before start");
checkNotNull(compressor, "compressor");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setCompressor(compressor);
Expand All @@ -359,7 +391,8 @@ public void run() {

@Override
public void setFullStreamDecompression(final boolean fullStreamDecompression) {
delayOrExecute(
checkState(listener == null, "May only be called before start");
preStartPendingCalls.add(
new Runnable() {
@Override
public void run() {
Expand All @@ -370,8 +403,9 @@ public void run() {

@Override
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
checkState(listener == null, "May only be called before start");
checkNotNull(decompressorRegistry, "decompressorRegistry");
delayOrExecute(new Runnable() {
preStartPendingCalls.add(new Runnable() {
@Override
public void run() {
realStream.setDecompressorRegistry(decompressorRegistry);
Expand All @@ -390,6 +424,7 @@ public boolean isReady() {

@Override
public void setMessageCompression(final boolean enable) {
checkState(listener != null, "May only be called after start");
if (passThrough) {
realStream.setMessageCompression(enable);
} else {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/grpc/internal/MetadataApplierImpl.java
Expand Up @@ -95,7 +95,11 @@ private void finalizeWith(ClientStream stream) {
// returnStream() has been called before me, thus delayedStream must have been
// created.
checkState(delayedStream != null, "delayedStream is null");
delayedStream.setStream(stream);
Runnable slow = delayedStream.setStream(stream);
if (slow != null) {
// TODO(ejona): run this on a separate thread
slow.run();
}
}

/**
Expand Down