From bb608b60debcb422472eed164d7cd9646e0383ef Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 16 Jan 2020 13:43:48 -0800 Subject: [PATCH] core: fix a race in ServerImpl for multi-transport-server --- .../java/io/grpc/internal/ServerImpl.java | 38 +++++++++++++++---- .../java/io/grpc/internal/ServerImplTest.java | 1 - 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 85d85793230..dceb94d00bf 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Service encapsulating something similar to an accept() socket. */ private final List transportServers; private final Object lock = new Object(); + private final CountDownLatch startComplete = new CountDownLatch(1); @GuardedBy("lock") private boolean transportServersTerminated; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ @GuardedBy("lock") private final Set transports = new HashSet<>(); @@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); Preconditions.checkNotNull(transportServers, "transportServers"); Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); - this.transportServers = new ArrayList<>(transportServers); + this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers)); this.logId = InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); // Fork from the passed in context so that it does not propagate cancellation, it only @@ -177,19 +179,25 @@ public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); - // Start and wait for any ports to actually be bound. + executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); + started = true; + } - ServerListenerImpl listener = new ServerListenerImpl(); + // Start and wait for any ports to actually be bound. + ServerListenerImpl listener = new ServerListenerImpl(); + try { for (InternalServer ts : transportServers) { ts.start(listener); - activeTransportServers++; + synchronized (lock) { + activeTransportServers++; + } } - executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); - started = true; - return this; + } finally { + startComplete.countDown(); } - } + return this; + } @Override public int getPort() { @@ -269,6 +277,7 @@ public ServerImpl shutdown() { } } if (shutdownTransportServers) { + awaitStartComplete(); for (InternalServer ts : transportServers) { ts.shutdown(); } @@ -276,6 +285,18 @@ public ServerImpl shutdown() { return this; } + private void awaitStartComplete() { + boolean startCompleted = false; + try { + startCompleted = startComplete.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!startCompleted) { + log.log(Level.WARNING, "Server was not starting properly, resources may leak"); + } + } + @Override public ServerImpl shutdownNow() { shutdown(); @@ -388,6 +409,7 @@ public ServerTransportListener transportCreated(ServerTransport transport) { public void serverShutdown() { ArrayList copiedTransports; Status shutdownNowStatusCopy; + awaitStartComplete(); synchronized (lock) { activeTransportServers--; if (activeTransportServers != 0) { diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index d2011f9541b..08d5a7a1ba1 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException { } catch (IOException e) { assertSame(ex, e); } - verifyNoMoreInteractions(executorPool); } @Test