Skip to content

Commit

Permalink
perf: remove custom transport executor (#2366)
Browse files Browse the repository at this point in the history
* perf: remove custom transport executor

Remove the custom executor provider that is set on the underlying generated
clients and instead use the internal core gRPC executor provider. The latter
is a shared executor provider for all gRPC channels that creates threads on
demand. This prevents the creation of unnecessary threads at startup, and can
reduce overall thread usage for applications that create multiple Spanner
instances during their lifetime.

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] committed Apr 12, 2023
1 parent b02f584 commit e27dbe5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 253 deletions.
Expand Up @@ -23,7 +23,6 @@
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -185,7 +184,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -200,8 +198,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -211,53 +207,6 @@
/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {

/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {

// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new LinkedList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}

/** Shuts down all executors that have been created by this {@link ExecutorProvider}. */
private synchronized void shutdown() {
for (ScheduledExecutorService executor : executors) {
executor.shutdown();
}
}

private void awaitTermination() throws InterruptedException {
for (ScheduledExecutorService executor : executors) {
executor.awaitTermination(10L, TimeUnit.SECONDS);
}
}
}

private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");
private static final PathTemplate OPERATION_NAME_TEMPLATE =
Expand All @@ -277,7 +226,6 @@ private void awaitTermination() throws InterruptedException {
CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
private static final String API_FILE = "grpc-gcp-apiconfig.json";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
Expand Down Expand Up @@ -356,13 +304,6 @@ public GapicSpannerRpc(final SpannerOptions options) {
this.compressorName = options.getCompressorName();

if (initializeStubs) {
// Create a managed executor provider.
this.executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(options.getTransportChannelExecutorThreadNameFormat())
.build());
// First check if SpannerOptions provides a TransportChannelProvider. Create one
// with information gathered from SpannerOptions if none is provided
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
Expand All @@ -373,11 +314,6 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())

// Before updating this method to setExecutor, please verify with a code owner on
// the lowest version of gax-grpc that needs to be supported. Currently v1.47.17,
// which doesn't support the setExecutor variant.
.setExecutorProvider(executorProvider)

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
Expand Down Expand Up @@ -536,7 +472,6 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.databaseAdminStubSettings = null;
this.spannerWatchdog = null;
this.partitionedDmlRetrySettings = null;
this.executorProvider = null;
}
}

Expand Down Expand Up @@ -1932,15 +1867,13 @@ public void shutdown() {
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();

try {
this.spannerStub.awaitTermination(10L, TimeUnit.SECONDS);
this.partitionedDmlStub.awaitTermination(10L, TimeUnit.SECONDS);
this.instanceAdminStub.awaitTermination(10L, TimeUnit.SECONDS);
this.databaseAdminStub.awaitTermination(10L, TimeUnit.SECONDS);
this.spannerWatchdog.awaitTermination(10L, TimeUnit.SECONDS);
this.executorProvider.awaitTermination();
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
Expand All @@ -1954,7 +1887,6 @@ public void shutdownNow() {
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();

this.spannerStub.shutdownNow();
this.partitionedDmlStub.shutdownNow();
Expand Down
Expand Up @@ -54,7 +54,10 @@ public ScheduledExecutorService get() {

@Override
public void release(ScheduledExecutorService executor) {
executor.shutdown();
try {
executor.shutdown();
} catch (Throwable ignore) {
}
}
}

Expand Down
Expand Up @@ -923,14 +923,22 @@ public void testCustomAsyncExecutorProvider() {
@Test
public void testDefaultNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build();
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.enableGrpcGcpExtension()
.build();

assertEquals(SpannerOptions.GRPC_GCP_ENABLED_DEFAULT_CHANNELS, options.getNumChannels());
}

@Test
public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() {
SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build();
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.build();

assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels());
}
Expand All @@ -943,6 +951,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options1 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.setNumChannels(numChannels)
.enableGrpcGcpExtension()
.build();
Expand All @@ -954,6 +963,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options2 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.enableGrpcGcpExtension()
.setNumChannels(numChannels)
.build();
Expand All @@ -972,12 +982,19 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() {
Spanner spanner2 = options1.getService();

assertNotSame(spanner1, spanner2);

spanner1.close();
spanner2.close();
}

@Test
public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build();
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
.enableGrpcGcpExtension()
.build();
SpannerOptions options1 = options.toBuilder().build();
assertEquals(true, options.isGrpcGcpExtensionEnabled());
assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled());
Expand All @@ -986,5 +1003,8 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() {
Spanner spanner2 = options1.getService();

assertNotSame(spanner1, spanner2);

spanner1.close();
spanner2.close();
}
}

0 comments on commit e27dbe5

Please sign in to comment.