Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: add max connection idle at OkHttpServerBuilder #9494

Merged
merged 4 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.DoNotCall;
Expand Down Expand Up @@ -62,6 +64,10 @@
public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpServerBuilder> {
private static final Logger log = Logger.getLogger(OkHttpServerBuilder.class.getName());
private static final int DEFAULT_FLOW_CONTROL_WINDOW = 65535;

static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
OkHttpChannelBuilder.DEFAULT_TRANSPORT_EXECUTOR_POOL;
Expand Down Expand Up @@ -110,6 +116,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
int maxInboundMetadataSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
int maxInboundMessageSize = GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;

@VisibleForTesting
OkHttpServerBuilder(
Expand Down Expand Up @@ -178,6 +185,27 @@ public OkHttpServerBuilder keepAliveTime(long keepAliveTime, TimeUnit timeUnit)
return this;
}

/**
* Sets a custom max connection idle time, connection being idle for longer than which will be
* gracefully terminated. Idleness duration is defined since the most recent time the number of
* outstanding RPCs became zero or the connection establishment. An unreasonably small value might
* be increased. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
* max connection idle.
*/
@Override
public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit timeUnit) {
checkArgument(maxConnectionIdle > 0L, "max connection idle must be positive: %s",
maxConnectionIdle);
maxConnectionIdleInNanos = timeUnit.toNanos(maxConnectionIdle);
if (maxConnectionIdleInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
}
if (maxConnectionIdleInNanos < MIN_MAX_CONNECTION_IDLE_NANO) {
maxConnectionIdleInNanos = MIN_MAX_CONNECTION_IDLE_NANO;
}
return this;
}

/**
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
* without any read activity on the connection, the connection is considered dead. An unreasonably
Expand Down
20 changes: 20 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Expand Up @@ -16,6 +16,8 @@

package io.grpc.okhttp;

import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -28,6 +30,7 @@
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.ServerTransport;
Expand Down Expand Up @@ -91,6 +94,7 @@ final class OkHttpServerTransport implements ServerTransport,
private ScheduledExecutorService scheduledExecutorService;
private Attributes attributes;
private KeepAliveManager keepAliveManager;
private MaxConnectionIdleManager maxConnectionIdleManager;

private final Object lock = new Object();
@GuardedBy("lock")
Expand Down Expand Up @@ -189,6 +193,11 @@ private void startIo(SerializingExecutor serializingExecutor) {
keepAliveManager.onTransportStarted();
}

if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) {
Expand Down Expand Up @@ -311,6 +320,9 @@ private void terminated() {
if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
}
if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination();
}
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
scheduledExecutorService =
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
Expand Down Expand Up @@ -369,6 +381,9 @@ public OutboundFlowController.StreamState[] getActiveStreams() {
void streamClosed(int streamId, boolean flush) {
synchronized (lock) {
streams.remove(streamId);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportIdle();
}
if (gracefulShutdown && streams.isEmpty()) {
frameWriter.close();
} else {
Expand Down Expand Up @@ -433,6 +448,7 @@ static final class Config {
final int flowControlWindow;
final int maxInboundMessageSize;
final int maxInboundMetadataSize;
final long maxConnectionIdleNanos;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -452,6 +468,7 @@ public Config(
flowControlWindow = builder.flowControlWindow;
maxInboundMessageSize = builder.maxInboundMessageSize;
maxInboundMetadataSize = builder.maxInboundMetadataSize;
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
}
}

Expand Down Expand Up @@ -697,6 +714,9 @@ public void headers(boolean outFinished,
authority == null ? null : asciiString(authority),
statsTraceCtx,
tracer);
if (maxConnectionIdleManager != null && streams.isEmpty()) {
maxConnectionIdleManager.onTransportActive();
}
streams.put(streamId, stream);
listener.streamCreated(streamForApp, method, metadata);
stream.onStreamAllocated();
Expand Down
52 changes: 46 additions & 6 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Expand Up @@ -90,6 +90,7 @@
public class OkHttpServerTransportTest {
private static final int TIME_OUT_MS = 2000;
private static final int INITIAL_WINDOW_SIZE = 65535;
private static final long MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toNanos(1);

private MockServerTransportListener mockTransportListener = new MockServerTransportListener();
private ServerTransportListener transportListener
Expand All @@ -105,10 +106,11 @@ public class OkHttpServerTransportTest {
private ExecutorService threadPool = Executors.newCachedThreadPool();
private HandshakerSocketFactory handshakerSocketFactory
= mock(HandshakerSocketFactory.class, delegatesTo(new PlaintextHandshakerSocketFactory()));
private final FakeClock fakeClock = new FakeClock();
private OkHttpServerBuilder serverBuilder
= new OkHttpServerBuilder(new InetSocketAddress(1234), handshakerSocketFactory)
.executor(new FakeClock().getScheduledExecutorService()) // Executor unused
.scheduledExecutorService(new FakeClock().getScheduledExecutorService())
.scheduledExecutorService(fakeClock.getScheduledExecutorService())
.transportExecutor(new Executor() {
@Override public void execute(Runnable runnable) {
if (runnable instanceof OkHttpServerTransport.FrameHandler) {
Expand All @@ -119,7 +121,8 @@ public class OkHttpServerTransportTest {
}
}
})
.flowControlWindow(INITIAL_WINDOW_SIZE);
.flowControlWindow(INITIAL_WINDOW_SIZE)
.maxConnectionIdle(MAX_CONNECTION_IDLE, TimeUnit.NANOSECONDS);

@Rule public final Timeout globalTimeout = Timeout.seconds(10);

Expand All @@ -146,6 +149,40 @@ public void startThenShutdown() throws Exception {
shutdownAndTerminate(/*lastStreamId=*/ 0);
}

@Test
public void maxConnectionIdleTimer() throws Exception {
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER));
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
assertThat(streamListener.messages.peek()).isNull();
assertThat(streamListener.halfClosedCalled).isTrue();

streamListener.stream.close(Status.OK, new Metadata());

List<Header> responseTrailers = Arrays.asList(
new Header(":status", "200"),
CONTENT_TYPE_HEADER,
new Header("grpc-status", "0"));
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead)
.headers(false, true, 1, -1, responseTrailers, HeadersMode.HTTP_20_HEADERS);

fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
fakeClock.forwardNanos(MAX_CONNECTION_IDLE);
maybeShutdownAndVerifyGraceful(1, false);
}

@Test
public void startThenShutdownTwice() throws Exception {
initTransport();
Expand Down Expand Up @@ -316,7 +353,7 @@ public void activeRpc_delaysShutdownTermination() throws Exception {
clientFrameWriter.data(true, 1, requestMessageFrame, (int) requestMessageFrame.size());
pingPong();

shutdownAndVerifyGraceful(1);
maybeShutdownAndVerifyGraceful(1, true);
verify(transportListener, never()).transportTerminated();

MockStreamListener streamListener = mockTransportListener.newStreams.pop();
Expand Down Expand Up @@ -1038,8 +1075,11 @@ private Metadata metadata(String... keysAndValues) {
return metadata;
}

private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {
serverTransport.shutdown();
private void maybeShutdownAndVerifyGraceful(int lastStreamId, boolean shutdown)
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
throws IOException {
if (shutdown) {
serverTransport.shutdown();
}
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).goAway(2147483647, ErrorCode.NO_ERROR, ByteString.EMPTY);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
Expand All @@ -1052,7 +1092,7 @@ private void shutdownAndVerifyGraceful(int lastStreamId) throws IOException {

private void shutdownAndTerminate(int lastStreamId) throws IOException {
assertThat(serverTransport.getActiveStreams().length).isEqualTo(0);
shutdownAndVerifyGraceful(lastStreamId);
maybeShutdownAndVerifyGraceful(lastStreamId, true);
assertThat(clientFrameReader.nextFrame(clientFramesRead)).isFalse();
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
}
Expand Down