Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional round-robin channel selection for affinity binding calls. #127

Merged
merged 1 commit into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ public void sendMessage(ReqT message) {
&& delegateChannel.getChannelRef(keys.get(0)) != null) {
key = keys.get(0);
}
delegateChannelRef = delegateChannel.getChannelRef(key);

if (affinity != null && affinity.getCommand().equals(AffinityConfig.Command.BIND)) {
delegateChannelRef = delegateChannel.getChannelRefForBind();
} else {
delegateChannelRef = delegateChannel.getChannelRef(key);
}
delegateChannelRef.activeStreamsCountIncr();

// Create the client call and do the previous operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.cloud.grpc.proto.MethodConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.MessageOrBuilder;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class GcpManagedChannel extends ManagedChannel {
static final int DEFAULT_MAX_CHANNEL = 10;
static final int DEFAULT_MAX_STREAM = 100;

@GuardedBy("this")
private Integer bindingIndex = -1;

private final ManagedChannelBuilder<?> delegateChannelBuilder;
private final GcpManagedChannelOptions options;
private final boolean fallbackEnabled;
Expand Down Expand Up @@ -772,6 +776,37 @@ public int getMaxActiveStreams() {
return channelRefs.stream().mapToInt(ChannelRef::getActiveStreamsCount).max().orElse(0);
}

/**
* Returns a {@link ChannelRef} from the pool for a binding call.
* If round-robin on bind is enabled, uses {@link #getChannelRefRoundRobin()}
* otherwise {@link #getChannelRef(String)}
*
* @return {@link ChannelRef} channel to use for a call.
*/
protected ChannelRef getChannelRefForBind() {
if (options.getChannelPoolOptions() != null && options.getChannelPoolOptions().isUseRoundRobinOnBind()) {
return getChannelRefRoundRobin();
}
return getChannelRef(null);
}

/**
* Returns a {@link ChannelRef} from the pool in round-robin manner.
* Creates a new channel in the pool until the pool reaches its max size.
*
* @return {@link ChannelRef}
*/
protected synchronized ChannelRef getChannelRefRoundRobin() {
if (channelRefs.size() < maxSize) {
return createNewChannel();
}
bindingIndex++;
if (bindingIndex >= channelRefs.size()) {
bindingIndex = 0;
}
return channelRefs.get(bindingIndex);
}

/**
* Pick a {@link ChannelRef} (and create a new one if necessary). If notReadyFallbackEnabled is
* true in the {@link GcpResiliencyOptions} then instead of a channel in a non-READY state another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,13 @@ public static class GcpChannelPoolOptions {
// If every channel in the pool has at least this amount of concurrent streams then a new channel will be created
// in the pool unless the pool reached its maximum size.
private final int concurrentStreamsLowWatermark;
// Use round-robin channel selection for affinity binding calls.
private final boolean useRoundRobinOnBind;

public GcpChannelPoolOptions(Builder builder) {
maxSize = builder.maxSize;
concurrentStreamsLowWatermark = builder.concurrentStreamsLowWatermark;
useRoundRobinOnBind = builder.useRoundRobinOnBind;
}

public int getMaxSize() {
Expand All @@ -179,9 +182,14 @@ public static GcpChannelPoolOptions.Builder newBuilder(GcpChannelPoolOptions opt
return new GcpChannelPoolOptions.Builder(options);
}

public boolean isUseRoundRobinOnBind() {
return useRoundRobinOnBind;
}

public static class Builder {
private int maxSize = GcpManagedChannel.DEFAULT_MAX_CHANNEL;
private int concurrentStreamsLowWatermark = GcpManagedChannel.DEFAULT_MAX_STREAM;
private boolean useRoundRobinOnBind = false;

public Builder() {}

Expand All @@ -192,6 +200,7 @@ public Builder(GcpChannelPoolOptions options) {
}
this.maxSize = options.getMaxSize();
this.concurrentStreamsLowWatermark = options.getConcurrentStreamsLowWatermark();
this.useRoundRobinOnBind = options.isUseRoundRobinOnBind();
}

public GcpChannelPoolOptions build() {
Expand Down Expand Up @@ -221,6 +230,16 @@ public Builder setConcurrentStreamsLowWatermark(int concurrentStreamsLowWatermar
this.concurrentStreamsLowWatermark = concurrentStreamsLowWatermark;
return this;
}

/**
* Enables/disables using round-robin channel selection for affinity binding calls.
*
* @param enabled If true, use round-robin channel selection for affinity binding calls.
*/
public Builder setUseRoundRobinOnBind(boolean enabled) {
this.useRoundRobinOnBind = enabled;
return this;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public final class SpannerIntegrationTest {
private static final ManagedChannelBuilder builder =
ManagedChannelBuilder.forAddress(SPANNER_TARGET, 443);
private GcpManagedChannel gcpChannel;
private GcpManagedChannel gcpChannelBRR;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -242,10 +243,35 @@ private SpannerStub getSpannerStub() {

/** A wrapper of checking the status of each channelRef in the gcpChannel. */
private void checkChannelRefs(int channels, int streams, int affinities) {
assertEquals(channels, gcpChannel.channelRefs.size());
checkChannelRefs(gcpChannel, channels, streams, affinities);
}

private void checkChannelRefs(GcpManagedChannel gcpChannel, int channels, int streams, int affinities) {
assertEquals("Channel pool size mismatch.", channels, gcpChannel.channelRefs.size());
for (int i = 0; i < channels; i++) {
assertEquals(streams, gcpChannel.channelRefs.get(i).getActiveStreamsCount());
assertEquals(affinities, gcpChannel.channelRefs.get(i).getAffinityCount());
assertEquals(
String.format("Channel %d streams mismatch.", i),
streams, gcpChannel.channelRefs.get(i).getActiveStreamsCount()
);
assertEquals(
String.format("Channel %d affinities mismatch.", i),
affinities,
gcpChannel.channelRefs.get(i).getAffinityCount()
);
}
}

private void checkChannelRefs(int[] streams, int[] affinities) {
for (int i = 0; i < streams.length; i++) {
assertEquals(
String.format("Channel %d streams mismatch.", i),
streams[i], gcpChannel.channelRefs.get(i).getActiveStreamsCount()
);
assertEquals(
String.format("Channel %d affinities mismatch.", i),
affinities[i],
gcpChannel.channelRefs.get(i).getAffinityCount()
);
}
}

Expand Down Expand Up @@ -292,10 +318,12 @@ private void deleteAsyncSessions(SpannerStub stub, List<String> respNames) throw

/** Helper Functions for FutureStub. */
private SpannerFutureStub getSpannerFutureStub() {
return getSpannerFutureStub(gcpChannel);
}

private SpannerFutureStub getSpannerFutureStub(GcpManagedChannel gcpChannel) {
GoogleCredentials creds = getCreds();
SpannerFutureStub stub =
SpannerGrpc.newFutureStub(gcpChannel).withCallCredentials(MoreCallCredentials.from(creds));
return stub;
return SpannerGrpc.newFutureStub(gcpChannel).withCallCredentials(MoreCallCredentials.from(creds));
}

private List<String> createFutureSessions(SpannerFutureStub stub) throws Exception {
Expand Down Expand Up @@ -341,19 +369,33 @@ private void deleteFutureSessions(SpannerFutureStub stub, List<String> futureNam
@Rule public ExpectedException expectedEx = ExpectedException.none();

@Before
public void setupChannel() {
public void setupChannels() {
File configFile =
new File(SpannerIntegrationTest.class.getClassLoader().getResource(API_FILE).getFile());
gcpChannel =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonFile(configFile)
.build();
gcpChannelBRR =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonFile(configFile)
.withOptions(GcpManagedChannelOptions.newBuilder()
.withChannelPoolOptions(
GcpManagedChannelOptions.GcpChannelPoolOptions.newBuilder()
.setMaxSize(MAX_CHANNEL)
.setConcurrentStreamsLowWatermark(MAX_STREAM)
.setUseRoundRobinOnBind(true)
.build())
.build())
.build();
}

@After
public void shutdownChannel() {
public void shutdownChannels() {
gcpChannel.shutdownNow();
gcpChannelBRR.shutdownNow();
}

@Test
Expand Down Expand Up @@ -403,6 +445,107 @@ public void testBatchCreateSessionsBlocking() throws Exception {
checkChannelRefs(MAX_CHANNEL, 0, 0);
}

@Test
public void testSessionsCreatedUsingRoundRobin() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub(gcpChannelBRR);
List<ListenableFuture<Session>> futures = new ArrayList<>();
assertEquals(ConnectivityState.IDLE, gcpChannelBRR.getState(false));

// Should create one session per channel.
CreateSessionRequest req = CreateSessionRequest.newBuilder().setDatabase(DATABASE_PATH).build();
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
futures.add(future);
}
// If round-robin in use as expected, then each channel should have 1 active stream with the CreateSession request.
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 1, 0);

// Collecting futures results.
String lastSession = "";
for (ListenableFuture<Session> future : futures) {
lastSession = future.get().getName();
}
// Since createSession will bind the key, check the number of keys bound with channels.
// Each channel should have 1 affinity key.
assertEquals(MAX_CHANNEL, gcpChannelBRR.affinityKeyToChannelRef.size());
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 0, 1);

// Create a different request with the lastSession created.
ListenableFuture<ResultSet> responseFuture =
stub.executeSql(
ExecuteSqlRequest.newBuilder()
.setSession(lastSession)
.setSql("select * FROM Users")
.build());
// The ChannelRef which is bound with the lastSession.
GcpManagedChannel.ChannelRef currentChannel =
gcpChannelBRR.affinityKeyToChannelRef.get(lastSession);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

// Create another 1 session per channel sequentially.
// Without the round-robin it won't use the currentChannel as it has more active streams (1) than other channels.
// But with round-robin each channel should get one create session request.
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
future.get();
}
ResultSet response = responseFuture.get();

// If round-robin in use, then each channel should now have 2 active stream with the CreateSession request.
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 0, 2);
}

@Test
public void testSessionsCreatedWithoutRoundRobin() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub();
List<ListenableFuture<Session>> futures = new ArrayList<>();
assertEquals(ConnectivityState.IDLE, gcpChannel.getState(false));

// Should create one session per channel.
CreateSessionRequest req = CreateSessionRequest.newBuilder().setDatabase(DATABASE_PATH).build();
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
futures.add(future);
}
// Each channel should have 1 active stream with the CreateSession request because we create them concurrently.
checkChannelRefs(gcpChannel, MAX_CHANNEL, 1, 0);

// Collecting futures results.
String lastSession = "";
for (ListenableFuture<Session> future : futures) {
lastSession = future.get().getName();
}
// Since createSession will bind the key, check the number of keys bound with channels.
// Each channel should have 1 affinity key.
assertEquals(MAX_CHANNEL, gcpChannel.affinityKeyToChannelRef.size());
checkChannelRefs(MAX_CHANNEL, 0, 1);

// Create a different request with the lastSession created.
ListenableFuture<ResultSet> responseFuture =
stub.executeSql(
ExecuteSqlRequest.newBuilder()
.setSession(lastSession)
.setSql("select * FROM Users")
.build());
// The ChannelRef which is bound with the lastSession.
GcpManagedChannel.ChannelRef currentChannel =
gcpChannel.affinityKeyToChannelRef.get(lastSession);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

// Create another 1 session per channel sequentially.
// Without the round-robin it won't use the currentChannel as it has more active streams (1) than other channels.
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
future.get();
}
ResultSet response = responseFuture.get();

// Without round-robin the first channel will get all additional 3 sessions.
checkChannelRefs(new int[]{0, 0, 0}, new int[]{4, 1, 1});
}

@Test
public void testListSessionsFuture() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub();
Expand Down