Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Allow channel pool to refresh its channels periodically #805

Merged
merged 16 commits into from Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelFactory.java
@@ -0,0 +1,44 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import io.grpc.ManagedChannel;
import java.io.IOException;

/**
* This interface represents a factory for creating one ManagedChannel
*
* <p>This is public only for technical reasons, for advanced usage.
*/
@InternalApi("For internal use by google-cloud-java clients only")
public interface ChannelFactory {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
ManagedChannel createSingleChannel() throws IOException;
}
39 changes: 37 additions & 2 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java
Expand Up @@ -34,7 +34,10 @@
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,12 +51,45 @@ class ChannelPool extends ManagedChannel {
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

/**
* Factory method to create a non-refreshing channel pool
*
* @param poolSize number of channels in the pool
* @param channelFactory method to create the channels
* @return ChannelPool of non refreshing channels
*/
static ChannelPool create(int poolSize, final ChannelFactory channelFactory) throws IOException {
List<ManagedChannel> channels = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
channels.add(channelFactory.createSingleChannel());
}
return new ChannelPool(channels);
}

/**
* Factory method to create a refreshing channel pool
*
* @param poolSize number of channels in the pool
* @param channelFactory method to create the channels
* @param executorService periodically refreshes the channels
* @return ChannelPool of refreshing channels
*/
static ChannelPool createRefreshing(
int poolSize, final ChannelFactory channelFactory, ScheduledExecutorService executorService)
throws IOException {
List<ManagedChannel> channels = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
channels.add(new RefreshingManagedChannel(channelFactory, executorService));
}
return new ChannelPool(channels);
}

/**
* Initializes the channel pool. Assumes that all channels have the same authority.
*
* @param channels a List of channels to pool.
*/
ChannelPool(List<ManagedChannel> channels) {
private ChannelPool(List<ManagedChannel> channels) {
this.channels = ImmutableList.copyOf(channels);
authority = channels.get(0).authority();
}
Expand All @@ -73,7 +109,6 @@ public String authority() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions) {

return getNextChannel().newCall(methodDescriptor, callOptions);
}

Expand Down
43 changes: 43 additions & 0 deletions gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPrimer.java
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.grpc;

import com.google.api.core.InternalApi;
import io.grpc.ManagedChannel;

/**
* An interface to prepare a ManagedChannel for normal requests by priming the channel
*
* <p>This is public only for technical reasons, for advanced usage.
*/
@InternalApi("For internal use by google-cloud-java clients only")
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
public interface ChannelPrimer {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
void primeChannel(ManagedChannel managedChannel);
}
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFunction;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
Expand All @@ -40,6 +41,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -70,6 +72,8 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
static final String DIRECT_PATH_ENV_VAR = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH";
static final long DIRECT_PATH_KEEP_ALIVE_TIME_SECONDS = 3600;
static final long DIRECT_PATH_KEEP_ALIVE_TIMEOUT_SECONDS = 20;
// reduce the thundering herd problem of too many channels trying to (re)connect at the same time
static final int MAX_POOL_SIZE = 1000;
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

private final int processorCount;
private final ExecutorProvider executorProvider;
Expand All @@ -84,6 +88,7 @@ public final class InstantiatingGrpcChannelProvider implements TransportChannelP
@Nullable private final Boolean keepAliveWithoutCalls;
@Nullable private final Integer poolSize;
@Nullable private final Credentials credentials;
@Nullable private final ChannelPrimer channelPrimer;

@Nullable
private final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
Expand All @@ -103,6 +108,7 @@ private InstantiatingGrpcChannelProvider(Builder builder) {
this.poolSize = builder.poolSize;
this.channelConfigurator = builder.channelConfigurator;
this.credentials = builder.credentials;
this.channelPrimer = builder.channelPrimer;
}

@Override
Expand Down Expand Up @@ -188,19 +194,22 @@ public TransportChannel getTransportChannel() throws IOException {
}

private TransportChannel createChannel() throws IOException {
ManagedChannel outerChannel;

if (poolSize == null || poolSize == 1) {
outerChannel = createSingleChannel();
int realPoolSize = MoreObjects.firstNonNull(poolSize, 1);
ChannelFactory channelFactory =
new ChannelFactory() {
public ManagedChannel createSingleChannel() throws IOException {
return InstantiatingGrpcChannelProvider.this.createSingleChannel();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
};
ManagedChannel outerChannel;
if (channelPrimer != null) {
outerChannel =
ChannelPool.createRefreshing(
realPoolSize, channelFactory, executorProvider.getExecutor());
} else {
ImmutableList.Builder<ManagedChannel> channels = ImmutableList.builder();

for (int i = 0; i < poolSize; i++) {
channels.add(createSingleChannel());
}
outerChannel = new ChannelPool(channels.build());
outerChannel = ChannelPool.create(realPoolSize, channelFactory);
}

return GrpcTransportChannel.create(outerChannel);
}

Expand Down Expand Up @@ -293,7 +302,11 @@ private ManagedChannel createSingleChannel() throws IOException {
builder = channelConfigurator.apply(builder);
}

return builder.build();
ManagedChannel managedChannel = builder.build();
if (channelPrimer != null) {
channelPrimer.primeChannel(managedChannel);
}
return managedChannel;
}

/** The endpoint to be used for the channel. */
Expand Down Expand Up @@ -350,6 +363,7 @@ public static final class Builder {
@Nullable private Integer poolSize;
@Nullable private ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> channelConfigurator;
@Nullable private Credentials credentials;
@Nullable private ChannelPrimer channelPrimer;

private Builder() {
processorCount = Runtime.getRuntime().availableProcessors();
Expand All @@ -371,6 +385,7 @@ private Builder(InstantiatingGrpcChannelProvider provider) {
this.poolSize = provider.poolSize;
this.channelConfigurator = provider.channelConfigurator;
this.credentials = provider.credentials;
this.channelPrimer = provider.channelPrimer;
}

/** Sets the number of available CPUs, used internally for testing. */
Expand Down Expand Up @@ -509,6 +524,8 @@ public int getPoolSize() {
*/
public Builder setPoolSize(int poolSize) {
Preconditions.checkArgument(poolSize > 0, "Pool size must be positive");
Preconditions.checkArgument(
poolSize <= MAX_POOL_SIZE, "Pool size must be less than %d", MAX_POOL_SIZE);
this.poolSize = poolSize;
return this;
}
Expand All @@ -534,6 +551,21 @@ public Builder setCredentials(Credentials credentials) {
return this;
}

/**
* By setting a channelPrimer, the ChannelPool created by the provider will be refreshing
* ChannelPool. channelPrimer will be invoked periodically when the channels are refreshed
*
tonytanger marked this conversation as resolved.
Show resolved Hide resolved
* <p>This is public only for technical reasons, for advanced usage.
*
* @param channelPrimer invoked when the channels are refreshed
* @return builder for the provider
*/
@InternalApi("For internal use by google-cloud-java clients only")
public Builder setChannelPrimer(ChannelPrimer channelPrimer) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
this.channelPrimer = channelPrimer;
return this;
}

public InstantiatingGrpcChannelProvider build() {
return new InstantiatingGrpcChannelProvider(this);
}
Expand Down