Skip to content

Commit

Permalink
okhttp: add max connection idle at OkHttpServer and fix test (#9533)
Browse files Browse the repository at this point in the history
* Revert "Revert "okhttp: add max connection idle at OkHttpServer (#9494)" (#9528)"

This reverts commit 95b9d6d and fixed flaky test.
  • Loading branch information
YifeiZhuang committed Sep 9, 2022
1 parent 88a035e commit eac4178
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 6 deletions.
28 changes: 28 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.DoNotCall;
Expand Down Expand Up @@ -62,6 +64,10 @@
public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpServerBuilder> {
private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName());
private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;

static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL;
Expand Down Expand Up @@ -110,6 +116,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
int maxInboundMetadataSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;

@VisibleForTesting
OkHttpServerBuilder(
Expand Down Expand Up @@ -178,6 +185,27 @@ public OkHttpServerBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit)
return this;
}

/**
* Sets a custom max connection idle time, connection being idle for longer than which will be
* gracefully terminated. Idleness duration is defined since the most recent time the number of
* outstanding RPCs became zero or the connection establishment. An unreasonably small value might
* be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
* max connection idle.
*/
@Override
public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) {
checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s",
maxConnectionIdle);
maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle);
if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
}
if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) {
maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO;
}
return this;
}

/**
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
* without any read activity on the connection, the connection is considered dead. An unreasonably
Expand Down
23 changes: 23 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.okhttp;

import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -28,6 +30,7 @@
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.ServerTransport;
Expand Down Expand Up @@ -91,6 +94,7 @@ final class OkHttpServerTransport implements ServerTransport,
private ScheduledExecutorService scheduledExecutorService;
private Attributes attributes;
private KeepAliveManager keepAliveManager;
private MaxConnectionIdleManager maxConnectionIdleManager;

private final Object lock = new Object();
@GuardedBy("lock")
Expand Down Expand Up @@ -189,6 +193,11 @@ private void startIo(SerializingExecutor serializingExecutor) {
keepAliveManager.onTransportStarted();
}

if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) {
Expand Down Expand Up @@ -311,6 +320,9 @@ private void terminated() {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
}
if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination();
}
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
scheduledExecutorService =
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
Expand Down Expand Up @@ -369,6 +381,9 @@ public OutboundFlowController.StreamState[] getActiveStreams() {
void streamClosed(int streamId, boolean flush) {
synchronized (lock) {
streams.remove(streamId);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportIdle();
}
if (gracefulShutdown && streams.isEmpty()) {
frameWriter.close();
} else {
Expand Down Expand Up @@ -433,6 +448,7 @@ static final class Config {
final int flowControlWindow;
final int maxInboundMessageSize;
final int maxInboundMetadataSize;
final long maxConnectionIdleNanos;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -452,6 +468,7 @@ public Config(
flowControlWindow = builder.flowControlWindow;
maxInboundMessageSize = builder.maxInboundMessageSize;
maxInboundMetadataSize = builder.maxInboundMetadataSize;
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
}
}

Expand Down Expand Up @@ -697,6 +714,9 @@ public void headers(boolean outFinished,
authority == null ? null : asciiString(authority),
statsTraceCtx,
tracer);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportActive();
}
streams.put(streamId, stream);
listener.streamCreated(streamForApp, method, metadata);
stream.onStreamAllocated();
Expand Down Expand Up @@ -953,6 +973,9 @@ private void respondWithHttpError(
synchronized (lock) {
Http2ErrorStreamState stream =
new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportActive();
}
streams.put(streamId, stream);
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, true);
Expand Down
75 changes: 69 additions & 6 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Expand Up @@ -90,6 +90,7 @@
public class OkHttpServerTransportTest {
private static final int TIME_OUT_MS = 2000;
private static final int INITIAL_WINDOW_SIZE = 65535;
private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1);

private MockServerTransportListener mockTransportListener = new MockServerTransportListener();
private ServerTransportListener transportListener
Expand All @@ -105,10 +106,11 @@ public class OkHttpServerTransportTest {
private ExecutorService threadPool = Executors.newCachedThreadPool();
private HandshakerSocketFactory handshakerSocketFactory
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
private final FakeClock fakeClock = new FakeClock();
private OkHttpServerBuilder serverBuilder
= new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory)
.executor(new FakeClock().getScheduledExecutorService()) // Executor unused
.scheduledExecutorService(new FakeClock().getScheduledExecutorService())
.scheduledExecutorService(fakeClock.getScheduledExecutorService())
.transportExecutor(new Executor() {
@Override public void execute(Runnable runnable) {
if (runnable instanceof OkHttpServerTransport.FrameHandler) {
Expand All @@ -119,7 +121,8 @@ public class OkHttpServerTransportTest {
}
}
})
.flowControlWindow(INITIAL_WINDOW_SIZE);
.flowControlWindow(INITIAL_WINDOW_SIZE)
.maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS);

@Rule public final Timeout globalTimeout = Timeout.seconds(10);

Expand All @@ -146,6 +149,64 @@ public void startThenShutdown() throws Exception {
shutdownAndTerminate(/*lastStreamId=*/ 0);
}

@Test
public void maxConnectionIdleTimer() throws Exception {
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER));
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
assertThat(streamListener.messages.peek()).isNull();
assertThat(streamListener.halfClosedCalled).isTrue();

streamListener.stream.close(Status.OK, new Metadata());

List<Header> responseTrailers = Arrays.asList(
new Header(":status", "200"),
CONTENT_TYPE_HEADER,
new Header("grpc-status", "0"));
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead)
.headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS);

fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1);
}

@Test
public void maxConnectionIdleTimer_respondWithError() throws Exception {
initTransport();
handshake();

clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER,
new Header("host", "example.com:80"),
new Header("host", "example.com:80")));
clientFrameWriter.flush();

verifyHttpError(
1, 400, Status.Code.INTERNAL, "Multiple host headers disallowed. RFC7230 section 5.4");

pingPong();
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
verifyGracefulShutdown(1);
}

@Test
public void startThenShutdownTwice() throws Exception {
initTransport();
Expand Down Expand Up @@ -316,7 +377,8 @@ public void activeRpc_delaysShutdownTermination() throws Exception {
clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size());
pingPong();

shutdownAndVerifyGraceful(1);
serverTransport.shutdown();
verifyGracefulShutdown(1);
verify(transportListener, never()).transportTerminated();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
Expand Down Expand Up @@ -1038,8 +1100,8 @@ private Metadata metadata(String... keysAndValues) {
return metadata;
}

private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
serverTransport.shutdown();
private void verifyGracefulShutdown(int lastStreamId)
throws IOException {
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
Expand All @@ -1052,7 +1114,8 @@ private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {

private void shutdownAndTerminate(int lastStreamId) throws IOException {
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
shutdownAndVerifyGraceful(lastStreamId);
serverTransport.shutdown();
verifyGracefulShutdown(lastStreamId);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
Expand Down

0 comments on commit eac4178

Please sign in to comment.