Skip to content

Commit

Permalink
core: fix a race in ServerImpl for multi-transport-server
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Jan 16, 2020
1 parent b8474d6 commit bb608b6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
38 changes: 30 additions & 8 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Expand Up @@ -65,6 +65,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
/** Service encapsulating something similar to an accept() socket. */
private final List<? extends InternalServer> transportServers;
private final Object lock = new Object();
private final CountDownLatch startComplete = new CountDownLatch(1);
@GuardedBy("lock") private boolean transportServersTerminated;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
Expand Down Expand Up @@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
Preconditions.checkNotNull(transportServers, "transportServers");
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
this.transportServers = new ArrayList<>(transportServers);
this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers));
this.logId =
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle()));
// Fork from the passed in context so that it does not propagate cancellation, it only
Expand Down Expand Up @@ -177,19 +179,25 @@ public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any ports to actually be bound.
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
}

ServerListenerImpl listener = new ServerListenerImpl();
// Start and wait for any ports to actually be bound.
ServerListenerImpl listener = new ServerListenerImpl();
try {
for (InternalServer ts : transportServers) {
ts.start(listener);
activeTransportServers++;
synchronized (lock) {
activeTransportServers++;
}
}
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
} finally {
startComplete.countDown();
}
}

return this;
}

@Override
public int getPort() {
Expand Down Expand Up @@ -269,13 +277,26 @@ public ServerImpl shutdown() {
}
}
if (shutdownTransportServers) {
awaitStartComplete();
for (InternalServer ts : transportServers) {
ts.shutdown();
}
}
return this;
}

private void awaitStartComplete() {
boolean startCompleted = false;
try {
startCompleted = startComplete.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!startCompleted) {
log.log(Level.WARNING, "Server was not starting properly, resources may leak");
}
}

@Override
public ServerImpl shutdownNow() {
shutdown();
Expand Down Expand Up @@ -388,6 +409,7 @@ public ServerTransportListener transportCreated(ServerTransport transport) {
public void serverShutdown() {
ArrayList<ServerTransport> copiedTransports;
Status shutdownNowStatusCopy;
awaitStartComplete();
synchronized (lock) {
activeTransportServers--;
if (activeTransportServers != 0) {
Expand Down
1 change: 0 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Expand Up @@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException {
} catch (IOException e) {
assertSame(ex, e);
}
verifyNoMoreInteractions(executorPool);
}

@Test
Expand Down

0 comments on commit bb608b6

Please sign in to comment.