Skip to content

Commit

Permalink
okhttp: forceful close after MAX_CONNECTION_AGE_GRACE_TIME (#9968)
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Mar 27, 2023
1 parent e04c6ec commit 046e02b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
18 changes: 15 additions & 3 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import okio.Buffer;
import okio.BufferedSource;
Expand All @@ -73,6 +74,9 @@ final class OkHttpServerTransport implements ServerTransport,
ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;

private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);

private static final int KEEPALIVE_PING = 0xDEAD;
private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
Expand Down Expand Up @@ -132,6 +136,8 @@ final class OkHttpServerTransport implements ServerTransport,
/** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
@GuardedBy("lock")
private ScheduledFuture<?> forcefulCloseTimer;
@GuardedBy("lock")
private Long gracefulShutdownPeriod = null;

public OkHttpServerTransport(Config config, Socket bareSocket) {
this.config = Preconditions.checkNotNull(config, "config");
Expand Down Expand Up @@ -250,15 +256,16 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount

@Override
public void shutdown() {
shutdown(TimeUnit.SECONDS.toNanos(1L));
shutdown(null);
}

private void shutdown(Long graceTimeInNanos) {
private void shutdown(@Nullable Long gracefulShutdownPeriod) {
synchronized (lock) {
if (gracefulShutdown || abruptShutdown) {
return;
}
gracefulShutdown = true;
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
if (frameWriter == null) {
handshakeShutdown = true;
GrpcUtil.closeQuietly(bareSocket);
Expand All @@ -267,7 +274,8 @@ private void shutdown(Long graceTimeInNanos) {
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
// the client is malicious.
secondGoawayTimer = scheduledExecutorService.schedule(
this::triggerGracefulSecondGoaway, graceTimeInNanos, TimeUnit.NANOSECONDS);
this::triggerGracefulSecondGoaway,
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
frameWriter.flush();
Expand All @@ -289,6 +297,10 @@ private void triggerGracefulSecondGoaway() {
} else {
frameWriter.flush();
}
if (gracefulShutdownPeriod != null) {
forcefulCloseTimer = scheduledExecutorService.schedule(
this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
}
}
}

Expand Down
17 changes: 15 additions & 2 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void startThenShutdown() throws Exception {
@Test
public void maxConnectionAge() throws Exception {
serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS)
.maxConnectionAgeGrace(1, TimeUnit.SECONDS);
.maxConnectionAgeGrace(3, TimeUnit.SECONDS);
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
Expand All @@ -169,8 +169,20 @@ public void maxConnectionAge() throws Exception {
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1));
verifyGracefulShutdown(1);
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(3));
assertThat(socket.isClosed()).isTrue();
}

@Test
public void maxConnectionAge_shutdown() throws Exception {
serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS)
.maxConnectionAgeGrace(3, TimeUnit.SECONDS);
initTransport();
handshake();
shutdownAndTerminate(0);
assertThat(fakeClock.numPendingTasks()).isEqualTo(0);
}

@Test
Expand Down Expand Up @@ -1369,6 +1381,7 @@ public synchronized void close() throws IOException {
// PipedInputStream can only be woken by PipedOutputStream, so PipedOutputStream.close() is
// a better imitation of Socket.close().
inputStreamSource.close();
super.close();
}
}

Expand Down

0 comments on commit 046e02b

Please sign in to comment.