Skip to content

Commit

Permalink
feat: add a flag to add / remove routing cookie from callable chain (#…
Browse files Browse the repository at this point in the history
…2032)

Test and rollback plan: [go/cbt-routing-cookie-rollback](http://goto.google.com/cbt-routing-cookie-rollback)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)
- [ ] Rollback plan is reviewed and LGTMed 

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
mutianf committed Dec 18, 2023
1 parent ccc2764 commit 201e631
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 64 deletions.
Expand Up @@ -55,14 +55,14 @@ Metadata injectCookiesInRequestHeaders(Metadata headers) {
* COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial
* metadata for the same keys.
*/
void extractCookiesFromMetadata(@Nullable Metadata trailers) {
if (trailers == null) {
void extractCookiesFromMetadata(@Nullable Metadata metadata) {
if (metadata == null) {
return;
}
for (String key : trailers.keys()) {
for (String key : metadata.keys()) {
if (key.startsWith(COOKIE_KEY_PREFIX)) {
Metadata.Key<String> metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
String value = trailers.get(metadataKey);
String value = metadata.get(metadataKey);
cookies.put(metadataKey, value);
}
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.api.gax.tracing.SpanName;
Expand Down Expand Up @@ -185,11 +186,14 @@ public static EnhancedBigtableStubSettings finalizeSettings(
// workaround JWT audience issues
patchCredentials(builder);

// patch cookies interceptor
InstantiatingGrpcChannelProvider.Builder transportProvider = null;
if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) {
transportProvider =
((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder();
InstantiatingGrpcChannelProvider.Builder transportProvider =
builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider
? ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder()
: null;

if (builder.getEnableRoutingCookie() && transportProvider != null) {
// TODO: this also need to be added to BigtableClientFactory
// patch cookies interceptor
transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor()));
}

Expand Down Expand Up @@ -371,11 +375,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -411,9 +411,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

UnaryCallable<Query, RowT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -485,7 +483,7 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
new ReadRowsRetryCompletedCallable<>(withBigtableTracer);

ServerStreamingCallable<ReadRowsRequest, RowT> retrying2 =
Callables.retrying(retrying1, innerSettings, clientContext);
withRetries(retrying1, innerSettings);

return new FilterMarkerRowsCallable<>(retrying2, rowAdapter);
}
Expand Down Expand Up @@ -568,7 +566,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(withBigtableTracer, settings.sampleRowKeysSettings(), clientContext);
withRetries(withBigtableTracer, settings.sampleRowKeysSettings());

return createUserFacingUnaryCallable(
methodName, new SampleRowKeysCallable(retryable, requestContext));
Expand Down Expand Up @@ -607,7 +605,7 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.mutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.mutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
Expand All @@ -631,19 +629,25 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();

UnaryCallable<MutateRowsRequest, Void> withCookie = baseCallable;

if (settings.getEnableRoutingCookie()) {
withCookie = new CookiesUnaryCallable<>(baseCallable);
}

UnaryCallable<MutateRowsRequest, Void> flowControlCallable = null;
if (settings.bulkMutateRowsSettings().isLatencyBasedThrottlingEnabled()) {
flowControlCallable =
new DynamicFlowControlCallable(
baseCallable,
withCookie,
bulkMutationFlowController,
bulkMutationDynamicFlowControlStats,
settings.bulkMutateRowsSettings().getTargetRpcLatencyMs(),
FLOW_CONTROL_ADJUSTING_INTERVAL_MS);
}
UnaryCallable<BulkMutation, Void> userFacing =
new BulkMutateRowsUserFacingCallable(
flowControlCallable != null ? flowControlCallable : baseCallable, requestContext);
flowControlCallable != null ? flowControlCallable : withCookie, requestContext);

SpanName spanName = getSpanName("MutateRows");

Expand All @@ -654,9 +658,7 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
new TracedUnaryCallable<>(
tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName);

UnaryCallable<BulkMutation, Void> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -810,7 +812,7 @@ public Map<String, String> extract(
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<CheckAndMutateRowRequest, CheckAndMutateRowResponse> retrying =
Callables.retrying(withBigtableTracer, settings.checkAndMutateRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.checkAndMutateRowSettings());

return createUserFacingUnaryCallable(
methodName, new CheckAndMutateRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -851,8 +853,7 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
new BigtableTracerUnaryCallable<>(withStatsHeaders);

UnaryCallable<ReadModifyWriteRowRequest, ReadModifyWriteRowResponse> retrying =
Callables.retrying(
withBigtableTracer, settings.readModifyWriteRowSettings(), clientContext);
withRetries(withBigtableTracer, settings.readModifyWriteRowSettings());

return createUserFacingUnaryCallable(
methodName, new ReadModifyWriteRowCallable(retrying, requestContext));
Expand Down Expand Up @@ -932,16 +933,13 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<String, ByteStringRange> retrying =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

SpanName span = getSpanName("GenerateInitialChangeStreamPartitions");
ServerStreamingCallable<String, ByteStringRange> traced =
new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span);

ServerStreamingCallable<String, ByteStringRange> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1010,7 +1008,7 @@ public Map<String, String> extract(
new BigtableTracerStreamingCallable<>(watched);

ServerStreamingCallable<ReadChangeStreamRequest, ChangeStreamRecordT> readChangeStreamCallable =
Callables.retrying(withBigtableTracer, innerSettings, clientContext);
withRetries(withBigtableTracer, innerSettings);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT>
readChangeStreamUserCallable =
Expand All @@ -1021,10 +1019,7 @@ public Map<String, String> extract(
new TracedServerStreamingCallable<>(
readChangeStreamUserCallable, clientContext.getTracerFactory(), span);

ServerStreamingCallable<ReadChangeStreamQuery, ChangeStreamRecordT> withCookie =
new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand All @@ -1037,11 +1032,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
Expand All @@ -1062,6 +1053,27 @@ public Map<String, String> extract(PingAndWarmRequest request) {
Collections.emptySet());
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
UnaryCallable<RequestT, ResponseT> innerCallable, UnaryCallSettings<?, ?> unaryCallSettings) {
UnaryCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, unaryCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesUnaryCallable<>(retrying);
}
return retrying;
}

private <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT> withRetries(
ServerStreamingCallable<RequestT, ResponseT> innerCallable,
ServerStreamingCallSettings<RequestT, ResponseT> serverStreamingCallSettings) {
ServerStreamingCallable<RequestT, ResponseT> retrying =
Callables.retrying(innerCallable, serverStreamingCallSettings, clientContext);
if (settings.getEnableRoutingCookie()) {
return new CookiesServerStreamingCallable<>(retrying);
}
return retrying;
}
// </editor-fold>

// <editor-fold desc="Callable accessors">
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.batching.BatchingCallSettings;
import com.google.api.gax.batching.BatchingSettings;
Expand Down Expand Up @@ -211,6 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings<EnhancedBigtableS
private final boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private final Map<String, String> jwtAudienceMapping;
private final boolean enableRoutingCookie;

private final ServerStreamingCallSettings<Query, Row> readRowsSettings;
private final UnaryCallSettings<Query, Row> readRowSettings;
Expand Down Expand Up @@ -252,6 +254,7 @@ private EnhancedBigtableStubSettings(Builder builder) {
isRefreshingChannel = builder.isRefreshingChannel;
primedTableIds = builder.primedTableIds;
jwtAudienceMapping = builder.jwtAudienceMapping;
enableRoutingCookie = builder.enableRoutingCookie;

// Per method settings.
readRowsSettings = builder.readRowsSettings.build();
Expand Down Expand Up @@ -313,6 +316,15 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns a builder for the default ChannelProvider for this service. */
public static InstantiatingGrpcChannelProvider.Builder defaultGrpcTransportProviderBuilder() {
return BigtableStubSettings.defaultGrpcTransportProviderBuilder()
Expand Down Expand Up @@ -595,6 +607,7 @@ public static class Builder extends StubSettings.Builder<EnhancedBigtableStubSet
private boolean isRefreshingChannel;
private ImmutableList<String> primedTableIds;
private Map<String, String> jwtAudienceMapping;
private boolean enableRoutingCookie;

private final ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings;
private final UnaryCallSettings.Builder<Query, Row> readRowSettings;
Expand Down Expand Up @@ -627,6 +640,7 @@ private Builder() {
primedTableIds = ImmutableList.of();
jwtAudienceMapping = DEFAULT_JWT_AUDIENCE_MAPPING;
setCredentialsProvider(defaultCredentialsProviderBuilder().build());
this.enableRoutingCookie = true;

// Defaults provider
BigtableStubSettings.Builder baseDefaults = BigtableStubSettings.newBuilder();
Expand Down Expand Up @@ -745,6 +759,7 @@ private Builder(EnhancedBigtableStubSettings settings) {
isRefreshingChannel = settings.isRefreshingChannel;
primedTableIds = settings.primedTableIds;
jwtAudienceMapping = settings.jwtAudienceMapping;
enableRoutingCookie = settings.enableRoutingCookie;

// Per method settings.
readRowsSettings = settings.readRowsSettings.toBuilder();
Expand Down Expand Up @@ -893,6 +908,25 @@ public Map<String, String> getJwtAudienceMapping() {
return jwtAudienceMapping;
}

/**
* Sets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public Builder setEnableRoutingCookie(boolean enableRoutingCookie) {
this.enableRoutingCookie = enableRoutingCookie;
return this;
}

/**
* Gets if routing cookie is enabled. If true, client will retry a request with extra metadata
* server sent back.
*/
@BetaApi("Routing cookie is not currently stable and may change in the future")
public boolean getEnableRoutingCookie() {
return enableRoutingCookie;
}

/** Returns the builder for the settings used for calls to readRows. */
public ServerStreamingCallSettings.Builder<Query, Row> readRowsSettings() {
return readRowsSettings;
Expand Down Expand Up @@ -1019,6 +1053,7 @@ public String toString() {
.add("isRefreshingChannel", isRefreshingChannel)
.add("primedTableIds", primedTableIds)
.add("jwtAudienceMapping", jwtAudienceMapping)
.add("enableRoutingCookie", enableRoutingCookie)
.add("readRowsSettings", readRowsSettings)
.add("readRowSettings", readRowSettings)
.add("sampleRowKeysSettings", sampleRowKeysSettings)
Expand Down
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -322,6 +323,7 @@ public void reversed() {
}

@Test
@Ignore("Test taking too long to run, ignore for now")
public void reversedWithForcedResumption() throws IOException, InterruptedException {
assume()
.withMessage("reverse scans are not supported in the emulator")
Expand Down

0 comments on commit 201e631

Please sign in to comment.