Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: Improve internals for executor handling #9073

Merged
merged 2 commits into from Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 31 additions & 33 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
Expand Up @@ -37,13 +37,15 @@
import io.grpc.internal.AtomicBackoff;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ManagedChannelImplBuilder;
import io.grpc.internal.ManagedChannelImplBuilder.ChannelBuilderDefaultPortProvider;
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.CipherSuite;
import io.grpc.okhttp.internal.ConnectionSpec;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void close(Executor executor) {
((ExecutorService) executor).shutdown();
}
};
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
SharedResourcePool.forResource(SHARED_EXECUTOR);

/** Creates a new builder for the given server host and port. */
public static OkHttpChannelBuilder forAddress(String host, int port) {
Expand Down Expand Up @@ -168,8 +172,9 @@ public static OkHttpChannelBuilder forTarget(String target, ChannelCredentials c
return new OkHttpChannelBuilder(target, creds, result.callCredentials, result.factory);
}

private Executor transportExecutor;
private ScheduledExecutorService scheduledExecutorService;
private ObjectPool<Executor> transportExecutorPool = DEFAULT_TRANSPORT_EXECUTOR_POOL;
private ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);

private SocketFactory socketFactory;
private SSLSocketFactory sslSocketFactory;
Expand Down Expand Up @@ -247,7 +252,11 @@ OkHttpChannelBuilder setTransportTracerFactory(TransportTracer.Factory transport
* to shutdown the executor when appropriate.
*/
public OkHttpChannelBuilder transportExecutor(@Nullable Executor transportExecutor) {
this.transportExecutor = transportExecutor;
if (transportExecutor == null) {
this.transportExecutorPool = DEFAULT_TRANSPORT_EXECUTOR_POOL;
} else {
this.transportExecutorPool = new FixedObjectPool<>(transportExecutor);
}
return this;
}

Expand Down Expand Up @@ -468,8 +477,8 @@ public OkHttpChannelBuilder useTransportSecurity() {
*/
public OkHttpChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
this.scheduledExecutorServicePool =
new FixedObjectPool<>(checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it that "It's an optional parameter" in the JavaDoc is referring to scheduledExecutorService. It's not optional so the JavaDoc should change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is meaning, "you don't have to call this method on the builder."

There is oddity here. Compare how scheduledExecutorService() handles null vs transportExecutor(). But that was existing, so I didn't touch it.

return this;
}

Expand Down Expand Up @@ -508,8 +517,8 @@ public OkHttpChannelBuilder maxInboundMessageSize(int max) {
OkHttpTransportFactory buildTransportFactory() {
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(
transportExecutor,
scheduledExecutorService,
transportExecutorPool,
scheduledExecutorServicePool,
socketFactory,
createSslSocketFactory(),
hostnameVerifier,
Expand Down Expand Up @@ -712,9 +721,10 @@ public SslSocketFactoryResult withCallCredentials(CallCredentials callCreds) {
*/
@Internal
static final class OkHttpTransportFactory implements ClientTransportFactory {
private final ObjectPool<Executor> executorPool;
final Executor executor;
private final boolean usingSharedExecutor;
private final boolean usingSharedScheduler;
private final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
final ScheduledExecutorService scheduledExecutorService;
final TransportTracer.Factory transportTracerFactory;
final SocketFactory socketFactory;
@Nullable final SSLSocketFactory sslSocketFactory;
Expand All @@ -729,13 +739,12 @@ static final class OkHttpTransportFactory implements ClientTransportFactory {
final int flowControlWindow;
private final boolean keepAliveWithoutCalls;
final int maxInboundMetadataSize;
private final ScheduledExecutorService timeoutService;
final boolean useGetForSafeMethods;
private boolean closed;

private OkHttpTransportFactory(
Executor executor,
@Nullable ScheduledExecutorService timeoutService,
ObjectPool<Executor> executorPool,
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool,
@Nullable SocketFactory socketFactory,
@Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier,
Expand All @@ -749,9 +758,10 @@ private OkHttpTransportFactory(
int maxInboundMetadataSize,
TransportTracer.Factory transportTracerFactory,
boolean useGetForSafeMethods) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
this.executorPool = executorPool;
this.executor = executorPool.getObject();
this.scheduledExecutorServicePool = scheduledExecutorServicePool;
this.scheduledExecutorService = scheduledExecutorServicePool.getObject();
this.socketFactory = socketFactory;
this.sslSocketFactory = sslSocketFactory;
this.hostnameVerifier = hostnameVerifier;
Expand All @@ -766,15 +776,8 @@ private OkHttpTransportFactory(
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.useGetForSafeMethods = useGetForSafeMethods;

usingSharedExecutor = executor == null;
this.transportTracerFactory =
Preconditions.checkNotNull(transportTracerFactory, "transportTracerFactory");
if (usingSharedExecutor) {
// The executor was unspecified, using the shared executor.
this.executor = SharedResourceHolder.get(SHARED_EXECUTOR);
} else {
this.executor = executor;
}
}

@Override
Expand Down Expand Up @@ -809,7 +812,7 @@ public void run() {

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return timeoutService;
return scheduledExecutorService;
}

@Nullable
Expand All @@ -821,8 +824,8 @@ public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials ch
return null;
}
ClientTransportFactory factory = new OkHttpTransportFactory(
executor,
timeoutService,
executorPool,
scheduledExecutorServicePool,
socketFactory,
result.factory,
hostnameVerifier,
Expand All @@ -846,13 +849,8 @@ public void close() {
}
closed = true;

if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}

if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, executor);
}
executorPool.returnObject(executor);
scheduledExecutorServicePool.returnObject(scheduledExecutorService);
}
}
}
Expand Up @@ -17,7 +17,6 @@
package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;

Expand Down Expand Up @@ -52,7 +51,6 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
Expand Down Expand Up @@ -162,6 +160,7 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
private final Executor executor;
// Wrap on executor, to guarantee some operations be executed serially.
private final SerializingExecutor serializingExecutor;
private final ScheduledExecutorService scheduler;
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
Expand Down Expand Up @@ -191,7 +190,6 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
@GuardedBy("lock")
private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
private final ConnectionSpec connectionSpec;
private ScheduledExecutorService scheduler;
private KeepAliveManager keepAliveManager;
private boolean enableKeepAlive;
private long keepAliveTimeNanos;
Expand Down Expand Up @@ -262,6 +260,8 @@ private OkHttpClientTransport(
this.initialWindowSize = transportFactory.flowControlWindow;
this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
serializingExecutor = new SerializingExecutor(transportFactory.executor);
this.scheduler = Preconditions.checkNotNull(
transportFactory.scheduledExecutorService, "scheduledExecutorService");
// Client initiated streams are odd, server initiated ones are even. Server should not need to
// use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
nextStreamId = 3;
Expand Down Expand Up @@ -472,7 +472,6 @@ public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");

if (enableKeepAlive) {
scheduler = SharedResourceHolder.get(TIMER_SERVICE);
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
Expand Down Expand Up @@ -949,8 +948,6 @@ private void stopIfNecessary() {

if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
// KeepAliveManager should stop using the scheduler after onTransportTermination gets called.
scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler);
}

if (ping != null) {
Expand Down