Skip to content

Commit

Permalink
Merge pull request #135 from GoogleCloudPlatform/multi-endpoint
Browse files Browse the repository at this point in the history
Multi-endpoint feature
  • Loading branch information
nimf committed Jul 12, 2022
2 parents d5e2f70 + 34e77da commit b7e0ef2
Show file tree
Hide file tree
Showing 10 changed files with 1,752 additions and 38 deletions.
1 change: 1 addition & 0 deletions grpc-gcp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.opencensus:opencensus-api:${opencensusVersion}"
implementation "com.google.api:api-common:2.1.5"

compileOnly "org.apache.tomcat:annotations-api:6.0.53" // necessary for Java 9+

Expand Down
93 changes: 83 additions & 10 deletions grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.cloud.grpc.proto.MethodConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.base.Joiner;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Descriptors.FieldDescriptor;
Expand All @@ -48,11 +49,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -92,6 +95,10 @@ public class GcpManagedChannel extends ManagedChannel {

@VisibleForTesting final List<ChannelRef> channelRefs = new CopyOnWriteArrayList<>();

private final ExecutorService stateNotificationExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("gcp-mc-state-notifications-%d").build());
private List<Runnable> stateChangeCallbacks = Collections.synchronizedList(new LinkedList<>());

// Metrics configuration.
private MetricRegistry metricRegistry;
private final List<LabelKey> labelKeys = new ArrayList<>();
Expand Down Expand Up @@ -872,6 +879,15 @@ private void recordUnresponsiveDetection(long nanos, long dropCount) {
}
}

@Override
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
if (!getState(false).equals(source)) {
stateNotificationExecutor.execute(callback);
return;
}
stateChangeCallbacks.add(callback);
}

/**
* ChannelStateMonitor subscribes to channel's state changes and informs {@link GcpManagedChannel}
* on any new state. This monitor allows to detect when a channel is not ready and temporarily
Expand Down Expand Up @@ -919,7 +935,14 @@ public void run() {
}
}

private synchronized void executeStateChangeCallbacks() {
List<Runnable> callbacksToTrigger = stateChangeCallbacks;
stateChangeCallbacks = new LinkedList<>();
callbacksToTrigger.forEach(stateNotificationExecutor::execute);
}

void processChannelStateChange(int channelId, ConnectivityState state) {
executeStateChangeCallbacks();
if (!fallbackEnabled) {
return;
}
Expand Down Expand Up @@ -967,10 +990,12 @@ protected ChannelRef getChannelRefForBind() {
ChannelRef channelRef;
if (options.getChannelPoolOptions() != null && options.getChannelPoolOptions().isUseRoundRobinOnBind()) {
channelRef = getChannelRefRoundRobin();
logger.finest(log(
"Channel %d picked for bind operation using round-robin.", channelRef.getId()));
} else {
channelRef = getChannelRef(null);
logger.finest(log("Channel %d picked for bind operation.", channelRef.getId()));
}
logger.finest(log("Channel %d picked for bind operation.", channelRef.getId()));
return channelRef;
}

Expand Down Expand Up @@ -1061,15 +1086,45 @@ private synchronized ChannelRef createNewChannel() {
return channelRef;
}

// Returns first newly created channel or null if there are already some channels in the pool.
@Nullable
private ChannelRef createFirstChannel() {
if (!channelRefs.isEmpty()) {
return null;
}
synchronized (this) {
if (channelRefs.isEmpty()) {
return createNewChannel();
}
}
return null;
}

// Creates new channel if maxSize is not reached.
// Returns new channel or null.
@Nullable
private ChannelRef tryCreateNewChannel() {
if (channelRefs.size() >= maxSize) {
return null;
}
synchronized (this) {
if (channelRefs.size() < maxSize) {
return createNewChannel();
}
}
return null;
}

/**
* Pick a {@link ChannelRef} (and create a new one if necessary). If notReadyFallbackEnabled is
* true in the {@link GcpResiliencyOptions} then instead of a channel in a non-READY state another
* channel in the READY state and having fewer than maximum allowed number of active streams will
* be provided if available.
*/
private ChannelRef pickLeastBusyChannel(boolean forFallback) {
if (channelRefs.isEmpty()) {
return createNewChannel();
ChannelRef first = createFirstChannel();
if (first != null) {
return first;
}

// Pick the least busy channel and the least busy ready and not overloaded channel (this could
Expand All @@ -1095,17 +1150,23 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {

if (!fallbackEnabled) {
if (channelRefs.size() < maxSize && minStreams >= maxConcurrentStreamsLowWatermark) {
return createNewChannel();
ChannelRef newChannel = tryCreateNewChannel();
if (newChannel != null) {
return newChannel;
}
}
return channelCandidate;
}

if (channelRefs.size() < maxSize && readyMinStreams >= maxConcurrentStreamsLowWatermark) {
if (!forFallback && readyCandidate == null) {
logger.finest(log("Fallback to newly created channel"));
fallbacksSucceeded.incrementAndGet();
ChannelRef newChannel = tryCreateNewChannel();
if (newChannel != null) {
if (!forFallback && readyCandidate == null) {
logger.finest(log("Fallback to newly created channel %d", newChannel.getId()));
fallbacksSucceeded.incrementAndGet();
}
return newChannel;
}
return createNewChannel();
}

if (readyCandidate != null) {
Expand Down Expand Up @@ -1164,6 +1225,9 @@ public ManagedChannel shutdownNow() {
if (logMetricService != null && !logMetricService.isTerminated()) {
logMetricService.shutdownNow();
}
if (!stateNotificationExecutor.isTerminated()) {
stateNotificationExecutor.shutdownNow();
}
return this;
}

Expand All @@ -1176,6 +1240,7 @@ public ManagedChannel shutdown() {
if (logMetricService != null) {
logMetricService.shutdown();
}
stateNotificationExecutor.shutdown();
return this;
}

Expand All @@ -1197,6 +1262,11 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
//noinspection ResultOfMethodCallIgnored
logMetricService.awaitTermination(awaitTimeNanos, NANOSECONDS);
}
awaitTimeNanos = endTimeNanos - System.nanoTime();
if (awaitTimeNanos > 0) {
//noinspection ResultOfMethodCallIgnored
stateNotificationExecutor.awaitTermination(awaitTimeNanos, NANOSECONDS);
}
return isTerminated();
}

Expand All @@ -1210,7 +1280,7 @@ public boolean isShutdown() {
if (logMetricService != null) {
return logMetricService.isShutdown();
}
return true;
return stateNotificationExecutor.isShutdown();
}

@Override
Expand All @@ -1223,12 +1293,15 @@ public boolean isTerminated() {
if (logMetricService != null) {
return logMetricService.isTerminated();
}
return true;
return stateNotificationExecutor.isTerminated();
}

/** Get the current connectivity state of the channel pool. */
@Override
public ConnectivityState getState(boolean requestConnection) {
if (requestConnection && getNumberOfChannels() == 0) {
createFirstChannel();
}
int ready = 0;
int idle = 0;
int connecting = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -64,10 +65,10 @@ public GcpResiliencyOptions getResiliencyOptions() {
@Override
public String toString() {
return String.format(
"{channelPoolOptions: %s, metricsOptions: %s, resiliencyOptions: %s}",
"{channelPoolOptions: %s, resiliencyOptions: %s, metricsOptions: %s}",
getChannelPoolOptions(),
getMetricsOptions(),
getResiliencyOptions()
getResiliencyOptions(),
getMetricsOptions()
);
}

Expand Down Expand Up @@ -208,8 +209,9 @@ public boolean isUseRoundRobinOnBind() {
@Override
public String toString() {
return String.format(
"{maxSize: %d, concurrentStreamsLowWatermark: %d, useRoundRobinOnBind: %s}",
"{maxSize: %d, minSize: %d, concurrentStreamsLowWatermark: %d, useRoundRobinOnBind: %s}",
getMaxSize(),
getMinSize(),
getConcurrentStreamsLowWatermark(),
isUseRoundRobinOnBind()
);
Expand Down

0 comments on commit b7e0ef2

Please sign in to comment.