From bb608b60debcb422472eed164d7cd9646e0383ef Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 16 Jan 2020 13:43:48 -0800 Subject: [PATCH 1/2] core: fix a race in ServerImpl for multi-transport-server --- .../java/io/grpc/internal/ServerImpl.java | 38 +++++++++++++++---- .../java/io/grpc/internal/ServerImplTest.java | 1 - 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 85d85793230..dceb94d00bf 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -65,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume /** Service encapsulating something similar to an accept() socket. */ private final List transportServers; private final Object lock = new Object(); + private final CountDownLatch startComplete = new CountDownLatch(1); @GuardedBy("lock") private boolean transportServersTerminated; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ @GuardedBy("lock") private final Set transports = new HashSet<>(); @@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); Preconditions.checkNotNull(transportServers, "transportServers"); Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); - this.transportServers = new ArrayList<>(transportServers); + this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers)); this.logId = InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); // Fork from the passed in context so that it does not propagate cancellation, it only @@ -177,19 +179,25 @@ public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); - // Start and wait for any ports to actually be bound. + executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); + started = true; + } - ServerListenerImpl listener = new ServerListenerImpl(); + // Start and wait for any ports to actually be bound. + ServerListenerImpl listener = new ServerListenerImpl(); + try { for (InternalServer ts : transportServers) { ts.start(listener); - activeTransportServers++; + synchronized (lock) { + activeTransportServers++; + } } - executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); - started = true; - return this; + } finally { + startComplete.countDown(); } - } + return this; + } @Override public int getPort() { @@ -269,6 +277,7 @@ public ServerImpl shutdown() { } } if (shutdownTransportServers) { + awaitStartComplete(); for (InternalServer ts : transportServers) { ts.shutdown(); } @@ -276,6 +285,18 @@ public ServerImpl shutdown() { return this; } + private void awaitStartComplete() { + boolean startCompleted = false; + try { + startCompleted = startComplete.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!startCompleted) { + log.log(Level.WARNING, "Server was not starting properly, resources may leak"); + } + } + @Override public ServerImpl shutdownNow() { shutdown(); @@ -388,6 +409,7 @@ public ServerTransportListener transportCreated(ServerTransport transport) { public void serverShutdown() { ArrayList copiedTransports; Status shutdownNowStatusCopy; + awaitStartComplete(); synchronized (lock) { activeTransportServers--; if (activeTransportServers != 0) { diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index d2011f9541b..08d5a7a1ba1 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException { } catch (IOException e) { assertSame(ex, e); } - verifyNoMoreInteractions(executorPool); } @Test From d27d7932d9f6206f30acde1e98ed66cb558c0089 Mon Sep 17 00:00:00 2001 From: "Penn (Dapeng) Zhang" Date: Thu, 16 Jan 2020 17:31:17 -0800 Subject: [PATCH 2/2] not to run awaintStartComplete in listener.serverShutdown --- .../java/io/grpc/internal/ServerImpl.java | 60 +++++++++++++------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index dceb94d00bf..ebcba7ac9a9 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -179,21 +179,50 @@ public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started"); checkState(!shutdown, "Shutting down"); + activeTransportServers = transportServers.size(); executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); started = true; } // Start and wait for any ports to actually be bound. ServerListenerImpl listener = new ServerListenerImpl(); + int unstarted = transportServers.size(); try { for (InternalServer ts : transportServers) { ts.start(listener); - synchronized (lock) { - activeTransportServers++; - } + unstarted--; } } finally { startComplete.countDown(); + + // Adjust number of active transport servers if there is any unstarted servers. + List transportsToCleanUp = null; + Status shutdownNowStatusCopy = null; + synchronized (lock) { + if (unstarted != 0) { + activeTransportServers -= unstarted; + if (activeTransportServers == 0) { + // transports collection can be modified during shutdown(), even if we hold the lock, + // due to reentrancy. + transportsToCleanUp = new ArrayList<>(transports); + shutdownNowStatusCopy = shutdownNowStatus; + serverShutdownCallbackInvoked = true; + } + } + } + if (transportsToCleanUp != null) { + for (ServerTransport transport : transportsToCleanUp) { + if (shutdownNowStatusCopy == null) { + transport.shutdown(); + } else { + transport.shutdownNow(shutdownNowStatusCopy); + } + } + synchronized (lock) { + transportServersTerminated = true; + checkForTermination(); + } + } } return this; @@ -277,7 +306,17 @@ public ServerImpl shutdown() { } } if (shutdownTransportServers) { - awaitStartComplete(); + boolean startCompleted = false; + try { + startCompleted = startComplete.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (!startCompleted) { + log.log( + Level.WARNING, + "Server is shutdown while it was not starting properly, resources may leak"); + } for (InternalServer ts : transportServers) { ts.shutdown(); } @@ -285,18 +324,6 @@ public ServerImpl shutdown() { return this; } - private void awaitStartComplete() { - boolean startCompleted = false; - try { - startCompleted = startComplete.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (!startCompleted) { - log.log(Level.WARNING, "Server was not starting properly, resources may leak"); - } - } - @Override public ServerImpl shutdownNow() { shutdown(); @@ -409,7 +436,6 @@ public ServerTransportListener transportCreated(ServerTransport transport) { public void serverShutdown() { ArrayList copiedTransports; Status shutdownNowStatusCopy; - awaitStartComplete(); synchronized (lock) { activeTransportServers--; if (activeTransportServers != 0) {