Skip to content

Commit

Permalink
okhttp: Allow keepalive scheduled executor to be overridden
Browse files Browse the repository at this point in the history
Users should be able to inject all executors. The transport shouldn't be
hard-coded to create the TIMER_SERVICE, especially since a scheduler is
already available to the builder.
  • Loading branch information
ejona86 committed Apr 15, 2022
1 parent 8862dca commit 592a227
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
16 changes: 8 additions & 8 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
Expand Up @@ -723,8 +723,8 @@ public SslSocketFactoryResult withCallCredentials(CallCredentials callCreds) {
static final class OkHttpTransportFactory implements ClientTransportFactory {
private final ObjectPool<Executor> executorPool;
final Executor executor;
private final ObjectPool<ScheduledExecutorService> timeoutServicePool;
private final ScheduledExecutorService timeoutService;
private final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
final ScheduledExecutorService scheduledExecutorService;
final TransportTracer.Factory transportTracerFactory;
final SocketFactory socketFactory;
@Nullable final SSLSocketFactory sslSocketFactory;
Expand All @@ -744,7 +744,7 @@ static final class OkHttpTransportFactory implements ClientTransportFactory {

private OkHttpTransportFactory(
ObjectPool<Executor> executorPool,
ObjectPool<ScheduledExecutorService> timeoutServicePool,
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool,
@Nullable SocketFactory socketFactory,
@Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier,
Expand All @@ -760,8 +760,8 @@ private OkHttpTransportFactory(
boolean useGetForSafeMethods) {
this.executorPool = executorPool;
this.executor = executorPool.getObject();
this.timeoutServicePool = timeoutServicePool;
this.timeoutService = timeoutServicePool.getObject();
this.scheduledExecutorServicePool = scheduledExecutorServicePool;
this.scheduledExecutorService = scheduledExecutorServicePool.getObject();
this.socketFactory = socketFactory;
this.sslSocketFactory = sslSocketFactory;
this.hostnameVerifier = hostnameVerifier;
Expand Down Expand Up @@ -812,7 +812,7 @@ public void run() {

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return timeoutService;
return scheduledExecutorService;
}

@Nullable
Expand All @@ -825,7 +825,7 @@ public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials ch
}
ClientTransportFactory factory = new OkHttpTransportFactory(
executorPool,
timeoutServicePool,
scheduledExecutorServicePool,
socketFactory,
result.factory,
hostnameVerifier,
Expand All @@ -850,7 +850,7 @@ public void close() {
closed = true;

executorPool.returnObject(executor);
timeoutServicePool.returnObject(timeoutService);
scheduledExecutorServicePool.returnObject(scheduledExecutorService);
}
}
}
Expand Up @@ -17,7 +17,6 @@
package io.grpc.okhttp;

import static com.google.common.base.Preconditions.checkState;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;

Expand Down Expand Up @@ -52,7 +51,6 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
Expand Down Expand Up @@ -162,6 +160,7 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
private final Executor executor;
// Wrap on executor, to guarantee some operations be executed serially.
private final SerializingExecutor serializingExecutor;
private final ScheduledExecutorService scheduler;
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
Expand Down Expand Up @@ -191,7 +190,6 @@ private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
@GuardedBy("lock")
private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
private final ConnectionSpec connectionSpec;
private ScheduledExecutorService scheduler;
private KeepAliveManager keepAliveManager;
private boolean enableKeepAlive;
private long keepAliveTimeNanos;
Expand Down Expand Up @@ -262,6 +260,8 @@ private OkHttpClientTransport(
this.initialWindowSize = transportFactory.flowControlWindow;
this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
serializingExecutor = new SerializingExecutor(transportFactory.executor);
this.scheduler = Preconditions.checkNotNull(
transportFactory.scheduledExecutorService, "scheduledExecutorService");
// Client initiated streams are odd, server initiated ones are even. Server should not need to
// use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
nextStreamId = 3;
Expand Down Expand Up @@ -472,7 +472,6 @@ public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");

if (enableKeepAlive) {
scheduler = SharedResourceHolder.get(TIMER_SERVICE);
keepAliveManager = new KeepAliveManager(
new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
keepAliveWithoutCalls);
Expand Down Expand Up @@ -949,8 +948,6 @@ private void stopIfNecessary() {

if (keepAliveManager != null) {
keepAliveManager.onTransportTermination();
// KeepAliveManager should stop using the scheduler after onTransportTermination gets called.
scheduler = SharedResourceHolder.release(TIMER_SERVICE, scheduler);
}

if (ping != null) {
Expand Down

0 comments on commit 592a227

Please sign in to comment.