Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: fix a race in ServerImpl for multi-transport-server #6614

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 30 additions & 8 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -65,6 +65,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
/** Service encapsulating something similar to an accept() socket. */
private final List<? extends InternalServer> transportServers;
private final Object lock = new Object();
private final CountDownLatch startComplete = new CountDownLatch(1);
@GuardedBy("lock") private boolean transportServersTerminated;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
Expand Down Expand Up @@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
Preconditions.checkNotNull(transportServers, "transportServers");
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
this.transportServers = new ArrayList<>(transportServers);
this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers));
this.logId =
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
// Fork from the passed in context so that it does not propagate cancellation, it only
Expand Down Expand Up @@ -177,19 +179,25 @@ public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any ports to actually be bound.
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
}

ServerListenerImpl listener = new ServerListenerImpl();
// Start and wait for any ports to actually be bound.
ServerListenerImpl listener = new ServerListenerImpl();
try {
for (InternalServer ts : transportServers) {
ts.start(listener);
activeTransportServers++;
synchronized (lock) {
activeTransportServers++;
}
}
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
} finally {
startComplete.countDown();
}
}

return this;
}

@Override
public int getPort() {
Expand Down Expand Up @@ -269,13 +277,26 @@ public ServerImpl shutdown() {
}
}
if (shutdownTransportServers) {
awaitStartComplete();
for (InternalServer ts : transportServers) {
ts.shutdown();
}
}
return this;
}

private void awaitStartComplete() {
boolean startCompleted = false;
try {
startCompleted = startComplete.await(30, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only await for start to complete if started == true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can assert that is true.

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!startCompleted) {
log.log(Level.WARNING, "Server was not starting properly, resources may leak");
}
}

@Override
public ServerImpl shutdownNow() {
shutdown();
Expand Down Expand Up @@ -388,6 +409,7 @@ public ServerTransportListener transportCreated(ServerTransport transport) {
public void serverShutdown() {
ArrayList<ServerTransport> copiedTransports;
Status shutdownNowStatusCopy;
awaitStartComplete();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks dangerous.

I'd much rather we set activeTransportServers = transportServers.size() within the initial synchronization block of start() to prevent needing to block here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set activeTransportServers = transportServers.size() is broken if one of ts.start() throws, because then there will be not enough times of listern.serverShutdown() called .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need redesign multiple transportServers support in future. It's just too inconvenient now. For now the fix seems working. Can you elaborate what is the danger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see the danger, because serverShutdown() is running in the event loop, if another ts.start() submits something into the event loop while awaitStartComplete() in serverShutdown(), then it will be deadlock again.

synchronized (lock) {
activeTransportServers--;
if (activeTransportServers != 0) {
Expand Down
1 change: 0 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException {
} catch (IOException e) {
assertSame(ex, e);
}
verifyNoMoreInteractions(executorPool);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because executorPool.getObject() is called after the change.

}

@Test
Expand Down