Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
Allow channel pool to refresh its channels periodically (#805)
Browse files Browse the repository at this point in the history
* add refresh capability to channelpool

* add tests

* fix minor issues

* respond to comments and add synchronization to newcall and channel swap

* clean up concurrency test

* Move refreshing logic to a wrapper class of managed channel

* change channel pool test to remove reliance on timer

* working solution

* Move shutdown logic to sub class to safely perform to ensure non-blocking shutdown

* Add comments

* Inline jitter calculation

* Add comments and documentation

* Respond to comments

* add comment on synchronization

* simplified channel pool creation logic

* Add for internal use only comment
  • Loading branch information
tonytanger authored and igorbernstein2 committed Nov 21, 2019
1 parent fad1d50 commit b059473
Show file tree
Hide file tree
Showing 12 changed files with 1,142 additions and 22 deletions.
44 changes: 44 additions & 0 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelFactory.java
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import io.grpc.ManagedChannel;
import java.io.IOException;

/**
* This interface represents a factory for creating one ManagedChannel
*
* <p>This is public only for technical reasons, for advanced usage.
*/
@InternalApi("For internal use by google-cloud-java clients only")
public interface ChannelFactory {
ManagedChannel createSingleChannel() throws IOException;
}
39 changes: 37 additions & 2 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java
Expand Up @@ -34,7 +34,10 @@
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,12 +51,45 @@ class ChannelPool extends ManagedChannel {
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

/**
* Factory method to create a non-refreshing channel pool
*
* @param poolSize number of channels in the pool
* @param channelFactory method to create the channels
* @return ChannelPool of non refreshing channels
*/
static ChannelPool create(int poolSize, final ChannelFactory channelFactory) throws IOException {
List<ManagedChannel> channels = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
channels.add(channelFactory.createSingleChannel());
}
return new ChannelPool(channels);
}

/**
* Factory method to create a refreshing channel pool
*
* @param poolSize number of channels in the pool
* @param channelFactory method to create the channels
* @param executorService periodically refreshes the channels
* @return ChannelPool of refreshing channels
*/
static ChannelPool createRefreshing(
int poolSize, final ChannelFactory channelFactory, ScheduledExecutorService executorService)
throws IOException {
List<ManagedChannel> channels = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
channels.add(new RefreshingManagedChannel(channelFactory, executorService));
}
return new ChannelPool(channels);
}

/**
* Initializes the channel pool. Assumes that all channels have the same authority.
*
* @param channels a List of channels to pool.
*/
ChannelPool(List<ManagedChannel> channels) {
private ChannelPool(List<ManagedChannel> channels) {
this.channels = ImmutableList.copyOf(channels);
authority = channels.get(0).authority();
}
Expand All @@ -73,7 +109,6 @@ public String authority() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) {

return getNextChannel().newCall(methodDescriptor, callOptions);
}

Expand Down
43 changes: 43 additions & 0 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPrimer.java
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import io.grpc.ManagedChannel;

/**
* An interface to prepare a ManagedChannel for normal requests by priming the channel
*
* <p>This is public only for technical reasons, for advanced usage.
*/
@InternalApi("For internal use by google-cloud-java clients only")
public interface ChannelPrimer {
void primeChannel(ManagedChannel managedChannel);
}
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
Expand All @@ -40,6 +41,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -70,6 +72,8 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
static final long DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS = 3600;
static final long DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS = 20;
// reduce the thundering herd problem of too many channels trying to (re)connect at the same time
static final int MAX_POOL_SIZE = 1000;

private final int processorCount;
private final ExecutorProvider executorProvider;
Expand All @@ -84,6 +88,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
@Nullable private final Boolean keepAliveWithoutCalls;
@Nullable private final Integer poolSize;
@Nullable private final Credentials credentials;
@Nullable private final ChannelPrimer channelPrimer;

@Nullable
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
Expand All @@ -103,6 +108,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
this.poolSize = builder.poolSize;
this.channelConfigurator = builder.channelConfigurator;
this.credentials = builder.credentials;
this.channelPrimer = builder.channelPrimer;
}

@Override
Expand Down Expand Up @@ -188,19 +194,22 @@ public TransportChannel getTransportChannel() throws IOException {
}

private TransportChannel createChannel() throws IOException {
ManagedChannel outerChannel;

if (poolSize == null || poolSize == 1) {
outerChannel = createSingleChannel();
int realPoolSize = MoreObjects.firstNonNull(poolSize, 1);
ChannelFactory channelFactory =
new ChannelFactory() {
public ManagedChannel createSingleChannel() throws IOException {
return InstantiatingGrpcChannelProvider.this.createSingleChannel();
}
};
ManagedChannel outerChannel;
if (channelPrimer != null) {
outerChannel =
ChannelPool.createRefreshing(
realPoolSize, channelFactory, executorProvider.getExecutor());
} else {
ImmutableList.Builder<ManagedChannel> channels = ImmutableList.builder();

for (int i = 0; i < poolSize; i++) {
channels.add(createSingleChannel());
}
outerChannel = new ChannelPool(channels.build());
outerChannel = ChannelPool.create(realPoolSize, channelFactory);
}

return GrpcTransportChannel.create(outerChannel);
}

Expand Down Expand Up @@ -293,7 +302,11 @@ private ManagedChannel createSingleChannel() throws IOException {
builder = channelConfigurator.apply(builder);
}

return builder.build();
ManagedChannel managedChannel = builder.build();
if (channelPrimer != null) {
channelPrimer.primeChannel(managedChannel);
}
return managedChannel;
}

/** The endpoint to be used for the channel. */
Expand Down Expand Up @@ -350,6 +363,7 @@ public static final class Builder {
@Nullable private Integer poolSize;
@Nullable private ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
@Nullable private Credentials credentials;
@Nullable private ChannelPrimer channelPrimer;

private Builder() {
processorCount = Runtime.getRuntime().availableProcessors();
Expand All @@ -371,6 +385,7 @@ private Builder(InstantiatingGrpcChannelProvider provider) {
this.poolSize = provider.poolSize;
this.channelConfigurator = provider.channelConfigurator;
this.credentials = provider.credentials;
this.channelPrimer = provider.channelPrimer;
}

/** Sets the number of available CPUs, used internally for testing. */
Expand Down Expand Up @@ -509,6 +524,8 @@ public int getPoolSize() {
*/
public Builder setPoolSize(int poolSize) {
Preconditions.checkArgument(poolSize > 0, "Pool size must be positive");
Preconditions.checkArgument(
poolSize <= MAX_POOL_SIZE, "Pool size must be less than %d", MAX_POOL_SIZE);
this.poolSize = poolSize;
return this;
}
Expand All @@ -534,6 +551,21 @@ public Builder setCredentials(Credentials credentials) {
return this;
}

/**
* By setting a channelPrimer, the ChannelPool created by the provider will be refreshing
* ChannelPool. channelPrimer will be invoked periodically when the channels are refreshed
*
* <p>This is public only for technical reasons, for advanced usage.
*
* @param channelPrimer invoked when the channels are refreshed
* @return builder for the provider
*/
@InternalApi("For internal use by google-cloud-java clients only")
public Builder setChannelPrimer(ChannelPrimer channelPrimer) {
this.channelPrimer = channelPrimer;
return this;
}

public InstantiatingGrpcChannelProvider build() {
return new InstantiatingGrpcChannelProvider(this);
}
Expand Down

0 comments on commit b059473

Please sign in to comment.