New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core: fix a race in ServerImpl for multi-transport-server #6614
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,7 @@ | |
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.FutureTask; | ||
|
@@ -112,6 +113,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume | |
/** Service encapsulating something similar to an accept() socket. */ | ||
private final List<? extends InternalServer> transportServers; | ||
private final Object lock = new Object(); | ||
private final CountDownLatch startComplete = new CountDownLatch(1); | ||
@GuardedBy("lock") private boolean transportServersTerminated; | ||
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */ | ||
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>(); | ||
|
@@ -144,7 +146,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume | |
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry"); | ||
Preconditions.checkNotNull(transportServers, "transportServers"); | ||
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided"); | ||
this.transportServers = new ArrayList<>(transportServers); | ||
this.transportServers = Collections.unmodifiableList(new ArrayList<>(transportServers)); | ||
this.logId = | ||
InternalLogId.allocate("Server", String.valueOf(getListenSocketsIgnoringLifecycle())); | ||
// Fork from the passed in context so that it does not propagate cancellation, it only | ||
|
@@ -177,19 +179,25 @@ public ServerImpl start() throws IOException { | |
synchronized (lock) { | ||
checkState(!started, "Already started"); | ||
checkState(!shutdown, "Shutting down"); | ||
// Start and wait for any ports to actually be bound. | ||
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); | ||
started = true; | ||
} | ||
|
||
ServerListenerImpl listener = new ServerListenerImpl(); | ||
// Start and wait for any ports to actually be bound. | ||
ServerListenerImpl listener = new ServerListenerImpl(); | ||
try { | ||
for (InternalServer ts : transportServers) { | ||
ts.start(listener); | ||
activeTransportServers++; | ||
synchronized (lock) { | ||
activeTransportServers++; | ||
} | ||
} | ||
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); | ||
started = true; | ||
return this; | ||
} finally { | ||
startComplete.countDown(); | ||
} | ||
} | ||
|
||
return this; | ||
} | ||
|
||
@Override | ||
public int getPort() { | ||
|
@@ -269,13 +277,26 @@ public ServerImpl shutdown() { | |
} | ||
} | ||
if (shutdownTransportServers) { | ||
awaitStartComplete(); | ||
for (InternalServer ts : transportServers) { | ||
ts.shutdown(); | ||
} | ||
} | ||
return this; | ||
} | ||
|
||
private void awaitStartComplete() { | ||
boolean startCompleted = false; | ||
try { | ||
startCompleted = startComplete.await(30, TimeUnit.SECONDS); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
if (!startCompleted) { | ||
log.log(Level.WARNING, "Server was not starting properly, resources may leak"); | ||
} | ||
} | ||
|
||
@Override | ||
public ServerImpl shutdownNow() { | ||
shutdown(); | ||
|
@@ -388,6 +409,7 @@ public ServerTransportListener transportCreated(ServerTransport transport) { | |
public void serverShutdown() { | ||
ArrayList<ServerTransport> copiedTransports; | ||
Status shutdownNowStatusCopy; | ||
awaitStartComplete(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks dangerous. I'd much rather we set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems we need redesign multiple transportServers support in future. It's just too inconvenient now. For now the fix seems working. Can you elaborate what is the danger? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I see the danger, because |
||
synchronized (lock) { | ||
activeTransportServers--; | ||
if (activeTransportServers != 0) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -420,7 +420,6 @@ public void start(ServerListener listener) throws IOException { | |
} catch (IOException e) { | ||
assertSame(ex, e); | ||
} | ||
verifyNoMoreInteractions(executorPool); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because |
||
} | ||
|
||
@Test | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only await for start to complete if
started == true
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can assert that is true.