Skip to content

Commit

Permalink
graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Mar 22, 2023
1 parent f6af33f commit e3c5690
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 24 deletions.
22 changes: 11 additions & 11 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import okio.Buffer;
import okio.BufferedSource;
Expand Down Expand Up @@ -136,6 +135,8 @@ final class OkHttpServerTransport implements ServerTransport,
/** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
@GuardedBy("lock")
private ScheduledFuture<?> forcefulCloseTimer;
@GuardedBy("lock")
private Long gracefulShutdownPeriod = null;

public OkHttpServerTransport(Config config, Socket bareSocket) {
this.config = Preconditions.checkNotNull(config, "config");
Expand Down Expand Up @@ -233,8 +234,11 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
long maxConnectionAgeInNanos =
(long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
synchronized (lock) {
gracefulShutdownPeriod = config.maxConnectionAgeGraceInNanos;
}
maxConnectionAgeMonitor = scheduledExecutorService.schedule(
new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
new LogExceptionRunnable(this::shutdown),
maxConnectionAgeInNanos,
TimeUnit.NANOSECONDS);
}
Expand All @@ -254,10 +258,6 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount

@Override
public void shutdown() {
shutdown(null);
}

private void shutdown(@Nullable Long graceTimeInNanos) {
synchronized (lock) {
if (gracefulShutdown || abruptShutdown) {
return;
Expand All @@ -271,7 +271,7 @@ private void shutdown(@Nullable Long graceTimeInNanos) {
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
// the client is malicious.
secondGoawayTimer = scheduledExecutorService.schedule(
() -> triggerGracefulSecondGoaway(graceTimeInNanos),
this::triggerGracefulSecondGoaway,
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
Expand All @@ -280,7 +280,7 @@ private void shutdown(@Nullable Long graceTimeInNanos) {
}
}

private void triggerGracefulSecondGoaway(@Nullable Long gracePeriodNanos) {
private void triggerGracefulSecondGoaway() {
synchronized (lock) {
if (secondGoawayTimer == null) {
return;
Expand All @@ -294,9 +294,9 @@ private void triggerGracefulSecondGoaway(@Nullable Long gracePeriodNanos) {
} else {
frameWriter.flush();
}
if (gracePeriodNanos != null) {
if (gracefulShutdownPeriod != null) {
forcefulCloseTimer = scheduledExecutorService.schedule(
this::triggerForcefulClose, gracePeriodNanos, TimeUnit.NANOSECONDS);
this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
}
}
}
Expand Down Expand Up @@ -935,7 +935,7 @@ public void ping(boolean ack, int payload1, int payload2) {
return;
}
if (GRACEFUL_SHUTDOWN_PING == payload) {
triggerGracefulSecondGoaway(null);
triggerGracefulSecondGoaway();
return;
}
log.log(Level.INFO, "Received unexpected ping ack: " + payload);
Expand Down
23 changes: 10 additions & 13 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
Expand Down Expand Up @@ -156,7 +155,7 @@ public void startThenShutdown() throws Exception {
@Test
public void maxConnectionAge() throws Exception {
serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS)
.maxConnectionAgeGrace(1, TimeUnit.SECONDS);
.maxConnectionAgeGrace(3, TimeUnit.SECONDS);
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
Expand All @@ -170,8 +169,10 @@ public void maxConnectionAge() throws Exception {
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1));
verifyGracefulShutdown(1, TimeUnit.SECONDS.toNanos(1));
verifyGracefulShutdown(1);
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(3));
assertThat(socket.isClosed()).isTrue();
}

@Test
Expand Down Expand Up @@ -205,7 +206,7 @@ public void maxConnectionIdleTimer() throws Exception {

fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1, null);
verifyGracefulShutdown(1);
}

@Test
Expand All @@ -229,7 +230,7 @@ public void maxConnectionIdleTimer_respondWithError() throws Exception {
pingPong();
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1, null);
verifyGracefulShutdown(1);
}

@Test
Expand Down Expand Up @@ -403,7 +404,7 @@ public void activeRpc_delaysShutdownTermination() throws Exception {
pingPong();

serverTransport.shutdown();
verifyGracefulShutdown(1, null);
verifyGracefulShutdown(1);
verify(transportListener, never()).transportTerminated();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
Expand Down Expand Up @@ -1220,7 +1221,7 @@ private Metadata metadata(String... keysAndValues) {
return metadata;
}

private void verifyGracefulShutdown(int lastStreamId, @Nullable Long gracePeriodNanos)
private void verifyGracefulShutdown(int lastStreamId)
throws IOException {
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
Expand All @@ -1230,16 +1231,12 @@ private void verifyGracefulShutdown(int lastStreamId, @Nullable Long gracePeriod
clientFrameWriter.flush();
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).goAway(lastStreamId, ErrorCode.NO_ERROR, ByteString.EMPTY);
if (gracePeriodNanos != null) {
assertThat(fakeClock.forwardNanos(gracePeriodNanos)).isEqualTo(1);
assertThat(socket.isClosed()).isTrue();
}
}

private void shutdownAndTerminate(int lastStreamId) throws IOException {
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
serverTransport.shutdown();
verifyGracefulShutdown(lastStreamId, null);
verifyGracefulShutdown(lastStreamId);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
Expand Down

0 comments on commit e3c5690

Please sign in to comment.