Skip to content

Commit

Permalink
Merge pull request #127 from nimf/rr-bind
Browse files Browse the repository at this point in the history
Add optional round-robin channel selection for affinity binding calls.
  • Loading branch information
nimf committed May 12, 2022
2 parents 230513e + 7f4b3bd commit 9233b35
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ public void sendMessage(ReqT message) {
&& delegateChannel.getChannelRef(keys.get(0)) != null) {
key = keys.get(0);
}
delegateChannelRef = delegateChannel.getChannelRef(key);

if (affinity != null && affinity.getCommand().equals(AffinityConfig.Command.BIND)) {
delegateChannelRef = delegateChannel.getChannelRefForBind();
} else {
delegateChannelRef = delegateChannel.getChannelRef(key);
}
delegateChannelRef.activeStreamsCountIncr();

// Create the client call and do the previous operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.cloud.grpc.proto.MethodConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.MessageOrBuilder;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class GcpManagedChannel extends ManagedChannel {
static final int DEFAULT_MAX_CHANNEL = 10;
static final int DEFAULT_MAX_STREAM = 100;

@GuardedBy("this")
private Integer bindingIndex = -1;

private final ManagedChannelBuilder<?> delegateChannelBuilder;
private final GcpManagedChannelOptions options;
private final boolean fallbackEnabled;
Expand Down Expand Up @@ -772,6 +776,37 @@ public int getMaxActiveStreams() {
return channelRefs.stream().mapToInt(ChannelRef::getActiveStreamsCount).max().orElse(0);
}

/**
* Returns a {@link ChannelRef} from the pool for a binding call.
* If round-robin on bind is enabled, uses {@link #getChannelRefRoundRobin()}
* otherwise {@link #getChannelRef(String)}
*
* @return {@link ChannelRef} channel to use for a call.
*/
protected ChannelRef getChannelRefForBind() {
if (options.getChannelPoolOptions() != null && options.getChannelPoolOptions().isUseRoundRobinOnBind()) {
return getChannelRefRoundRobin();
}
return getChannelRef(null);
}

/**
* Returns a {@link ChannelRef} from the pool in round-robin manner.
* Creates a new channel in the pool until the pool reaches its max size.
*
* @return {@link ChannelRef}
*/
protected synchronized ChannelRef getChannelRefRoundRobin() {
if (channelRefs.size() < maxSize) {
return createNewChannel();
}
bindingIndex++;
if (bindingIndex >= channelRefs.size()) {
bindingIndex = 0;
}
return channelRefs.get(bindingIndex);
}

/**
* Pick a {@link ChannelRef} (and create a new one if necessary). If notReadyFallbackEnabled is
* true in the {@link GcpResiliencyOptions} then instead of a channel in a non-READY state another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,13 @@ public static class GcpChannelPoolOptions {
// If every channel in the pool has at least this amount of concurrent streams then a new channel will be created
// in the pool unless the pool reached its maximum size.
private final int concurrentStreamsLowWatermark;
// Use round-robin channel selection for affinity binding calls.
private final boolean useRoundRobinOnBind;

public GcpChannelPoolOptions(Builder builder) {
maxSize = builder.maxSize;
concurrentStreamsLowWatermark = builder.concurrentStreamsLowWatermark;
useRoundRobinOnBind = builder.useRoundRobinOnBind;
}

public int getMaxSize() {
Expand All @@ -179,9 +182,14 @@ public static GcpChannelPoolOptions.Builder newBuilder(GcpChannelPoolOptions opt
return new GcpChannelPoolOptions.Builder(options);
}

public boolean isUseRoundRobinOnBind() {
return useRoundRobinOnBind;
}

public static class Builder {
private int maxSize = GcpManagedChannel.DEFAULT_MAX_CHANNEL;
private int concurrentStreamsLowWatermark = GcpManagedChannel.DEFAULT_MAX_STREAM;
private boolean useRoundRobinOnBind = false;

public Builder() {}

Expand All @@ -192,6 +200,7 @@ public Builder(GcpChannelPoolOptions options) {
}
this.maxSize = options.getMaxSize();
this.concurrentStreamsLowWatermark = options.getConcurrentStreamsLowWatermark();
this.useRoundRobinOnBind = options.isUseRoundRobinOnBind();
}

public GcpChannelPoolOptions build() {
Expand Down Expand Up @@ -221,6 +230,16 @@ public Builder setConcurrentStreamsLowWatermark(int concurrentStreamsLowWatermar
this.concurrentStreamsLowWatermark = concurrentStreamsLowWatermark;
return this;
}

/**
* Enables/disables using round-robin channel selection for affinity binding calls.
*
* @param enabled If true, use round-robin channel selection for affinity binding calls.
*/
public Builder setUseRoundRobinOnBind(boolean enabled) {
this.useRoundRobinOnBind = enabled;
return this;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public final class SpannerIntegrationTest {
private static final ManagedChannelBuilder builder =
ManagedChannelBuilder.forAddress(SPANNER_TARGET, 443);
private GcpManagedChannel gcpChannel;
private GcpManagedChannel gcpChannelBRR;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -242,10 +243,35 @@ private SpannerStub getSpannerStub() {

/** A wrapper of checking the status of each channelRef in the gcpChannel. */
private void checkChannelRefs(int channels, int streams, int affinities) {
assertEquals(channels, gcpChannel.channelRefs.size());
checkChannelRefs(gcpChannel, channels, streams, affinities);
}

private void checkChannelRefs(GcpManagedChannel gcpChannel, int channels, int streams, int affinities) {
assertEquals("Channel pool size mismatch.", channels, gcpChannel.channelRefs.size());
for (int i = 0; i < channels; i++) {
assertEquals(streams, gcpChannel.channelRefs.get(i).getActiveStreamsCount());
assertEquals(affinities, gcpChannel.channelRefs.get(i).getAffinityCount());
assertEquals(
String.format("Channel %d streams mismatch.", i),
streams, gcpChannel.channelRefs.get(i).getActiveStreamsCount()
);
assertEquals(
String.format("Channel %d affinities mismatch.", i),
affinities,
gcpChannel.channelRefs.get(i).getAffinityCount()
);
}
}

private void checkChannelRefs(int[] streams, int[] affinities) {
for (int i = 0; i < streams.length; i++) {
assertEquals(
String.format("Channel %d streams mismatch.", i),
streams[i], gcpChannel.channelRefs.get(i).getActiveStreamsCount()
);
assertEquals(
String.format("Channel %d affinities mismatch.", i),
affinities[i],
gcpChannel.channelRefs.get(i).getAffinityCount()
);
}
}

Expand Down Expand Up @@ -292,10 +318,12 @@ private void deleteAsyncSessions(SpannerStub stub, List<String> respNames) throw

/** Helper Functions for FutureStub. */
private SpannerFutureStub getSpannerFutureStub() {
return getSpannerFutureStub(gcpChannel);
}

private SpannerFutureStub getSpannerFutureStub(GcpManagedChannel gcpChannel) {
GoogleCredentials creds = getCreds();
SpannerFutureStub stub =
SpannerGrpc.newFutureStub(gcpChannel).withCallCredentials(MoreCallCredentials.from(creds));
return stub;
return SpannerGrpc.newFutureStub(gcpChannel).withCallCredentials(MoreCallCredentials.from(creds));
}

private List<String> createFutureSessions(SpannerFutureStub stub) throws Exception {
Expand Down Expand Up @@ -341,19 +369,33 @@ private void deleteFutureSessions(SpannerFutureStub stub, List<String> futureNam
@Rule public ExpectedException expectedEx = ExpectedException.none();

@Before
public void setupChannel() {
public void setupChannels() {
File configFile =
new File(SpannerIntegrationTest.class.getClassLoader().getResource(API_FILE).getFile());
gcpChannel =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonFile(configFile)
.build();
gcpChannelBRR =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonFile(configFile)
.withOptions(GcpManagedChannelOptions.newBuilder()
.withChannelPoolOptions(
GcpManagedChannelOptions.GcpChannelPoolOptions.newBuilder()
.setMaxSize(MAX_CHANNEL)
.setConcurrentStreamsLowWatermark(MAX_STREAM)
.setUseRoundRobinOnBind(true)
.build())
.build())
.build();
}

@After
public void shutdownChannel() {
public void shutdownChannels() {
gcpChannel.shutdownNow();
gcpChannelBRR.shutdownNow();
}

@Test
Expand Down Expand Up @@ -403,6 +445,107 @@ public void testBatchCreateSessionsBlocking() throws Exception {
checkChannelRefs(MAX_CHANNEL, 0, 0);
}

@Test
public void testSessionsCreatedUsingRoundRobin() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub(gcpChannelBRR);
List<ListenableFuture<Session>> futures = new ArrayList<>();
assertEquals(ConnectivityState.IDLE, gcpChannelBRR.getState(false));

// Should create one session per channel.
CreateSessionRequest req = CreateSessionRequest.newBuilder().setDatabase(DATABASE_PATH).build();
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
futures.add(future);
}
// If round-robin in use as expected, then each channel should have 1 active stream with the CreateSession request.
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 1, 0);

// Collecting futures results.
String lastSession = "";
for (ListenableFuture<Session> future : futures) {
lastSession = future.get().getName();
}
// Since createSession will bind the key, check the number of keys bound with channels.
// Each channel should have 1 affinity key.
assertEquals(MAX_CHANNEL, gcpChannelBRR.affinityKeyToChannelRef.size());
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 0, 1);

// Create a different request with the lastSession created.
ListenableFuture<ResultSet> responseFuture =
stub.executeSql(
ExecuteSqlRequest.newBuilder()
.setSession(lastSession)
.setSql("select * FROM Users")
.build());
// The ChannelRef which is bound with the lastSession.
GcpManagedChannel.ChannelRef currentChannel =
gcpChannelBRR.affinityKeyToChannelRef.get(lastSession);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

// Create another 1 session per channel sequentially.
// Without the round-robin it won't use the currentChannel as it has more active streams (1) than other channels.
// But with round-robin each channel should get one create session request.
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
future.get();
}
ResultSet response = responseFuture.get();

// If round-robin in use, then each channel should now have 2 active stream with the CreateSession request.
checkChannelRefs(gcpChannelBRR, MAX_CHANNEL, 0, 2);
}

@Test
public void testSessionsCreatedWithoutRoundRobin() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub();
List<ListenableFuture<Session>> futures = new ArrayList<>();
assertEquals(ConnectivityState.IDLE, gcpChannel.getState(false));

// Should create one session per channel.
CreateSessionRequest req = CreateSessionRequest.newBuilder().setDatabase(DATABASE_PATH).build();
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
futures.add(future);
}
// Each channel should have 1 active stream with the CreateSession request because we create them concurrently.
checkChannelRefs(gcpChannel, MAX_CHANNEL, 1, 0);

// Collecting futures results.
String lastSession = "";
for (ListenableFuture<Session> future : futures) {
lastSession = future.get().getName();
}
// Since createSession will bind the key, check the number of keys bound with channels.
// Each channel should have 1 affinity key.
assertEquals(MAX_CHANNEL, gcpChannel.affinityKeyToChannelRef.size());
checkChannelRefs(MAX_CHANNEL, 0, 1);

// Create a different request with the lastSession created.
ListenableFuture<ResultSet> responseFuture =
stub.executeSql(
ExecuteSqlRequest.newBuilder()
.setSession(lastSession)
.setSql("select * FROM Users")
.build());
// The ChannelRef which is bound with the lastSession.
GcpManagedChannel.ChannelRef currentChannel =
gcpChannel.affinityKeyToChannelRef.get(lastSession);
// Verify the channel is in use.
assertEquals(1, currentChannel.getActiveStreamsCount());

// Create another 1 session per channel sequentially.
// Without the round-robin it won't use the currentChannel as it has more active streams (1) than other channels.
for (int i = 0; i < MAX_CHANNEL; i++) {
ListenableFuture<Session> future = stub.createSession(req);
future.get();
}
ResultSet response = responseFuture.get();

// Without round-robin the first channel will get all additional 3 sessions.
checkChannelRefs(new int[]{0, 0, 0}, new int[]{4, 1, 1});
}

@Test
public void testListSessionsFuture() throws Exception {
SpannerFutureStub stub = getSpannerFutureStub();
Expand Down

0 comments on commit 9233b35

Please sign in to comment.