diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 85d85793230..ebcba7ac9a9 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Service encapsulating something similar to an accept() socket. */ private final List transportServers; private final Object lock = new Object(); + private final CountDownLatch startComplete = new CountDownLatch(1); @GuardedBy("lock") private boolean transportServersTerminated; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ @GuardedBy("lock") private final Set transports = new HashSet<>(); @@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); Preconditions.checkNotNull(transportServers, "transportServers"); Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); - this.transportServers = new ArrayList<>(transportServers); + this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers)); this.logId = InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); // Fork from the passed in context so that it does not propagate cancellation, it only @@ -177,19 +179,54 @@ public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); - // Start and wait for any ports to actually be bound. + activeTransportServers = transportServers.size(); + executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); + started = true; + } - ServerListenerImpl listener = new ServerListenerImpl(); + // Start and wait for any ports to actually be bound. + ServerListenerImpl listener = new ServerListenerImpl(); + int unstarted = transportServers.size(); + try { for (InternalServer ts : transportServers) { ts.start(listener); - activeTransportServers++; + unstarted--; + } + } finally { + startComplete.countDown(); + + // Adjust number of active transport servers if there is any unstarted servers. + List transportsToCleanUp = null; + Status shutdownNowStatusCopy = null; + synchronized (lock) { + if (unstarted != 0) { + activeTransportServers -= unstarted; + if (activeTransportServers == 0) { + // transports collection can be modified during shutdown(), even if we hold the lock, + // due to reentrancy. + transportsToCleanUp = new ArrayList<>(transports); + shutdownNowStatusCopy = shutdownNowStatus; + serverShutdownCallbackInvoked = true; + } + } + } + if (transportsToCleanUp != null) { + for (ServerTransport transport : transportsToCleanUp) { + if (shutdownNowStatusCopy == null) { + transport.shutdown(); + } else { + transport.shutdownNow(shutdownNowStatusCopy); + } + } + synchronized (lock) { + transportServersTerminated = true; + checkForTermination(); + } } - executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); - started = true; - return this; } - } + return this; + } @Override public int getPort() { @@ -269,6 +306,17 @@ public ServerImpl shutdown() { } } if (shutdownTransportServers) { + boolean startCompleted = false; + try { + startCompleted = startComplete.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!startCompleted) { + log.log( + Level.WARNING, + "Server is shutdown while it was not starting properly, resources may leak"); + } for (InternalServer ts : transportServers) { ts.shutdown(); } diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index d2011f9541b..08d5a7a1ba1 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException { } catch (IOException e) { assertSame(ex, e); } - verifyNoMoreInteractions(executorPool); } @Test