Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add session pool option for modelling a timeout around session acquisition. #2641

Merged
merged 13 commits into from Sep 28, 2023
Merged
Expand Up @@ -63,7 +63,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -120,17 +119,6 @@ class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
Expand Down Expand Up @@ -1675,7 +1663,8 @@ public PooledSession get() {
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
Expand All @@ -1685,6 +1674,11 @@ public PooledSession get() {
return s;
}
} catch (Exception e) {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
}
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
Expand All @@ -1693,15 +1687,27 @@ public PooledSession get() {
}
}

private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
private PooledSession pollUninterruptiblyWithTimeout(
long timeoutMillis, Duration acquireSessionTimeout) {
boolean interrupted = false;
try {
while (true) {
try {
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
return acquireSessionTimeout == null
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
interrupted = true;
} catch (TimeoutException e) {
if (acquireSessionTimeout != null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"Timed out after waiting "
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error increase the timeout duration at"
+ " session pool option `acquireSessionTimeout` or increase the "
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
+ "number of sessions in the session pool.");
}
return null;
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
Expand Down
Expand Up @@ -52,9 +52,16 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final boolean trackStackTraceOfSessionCheckout;
private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions;
private final long initialWaitForSessionTimeoutMillis;

/**
* Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while
* acquiring session for a transaction.
*/
@Deprecated private final long initialWaitForSessionTimeoutMillis;

private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -105,6 +113,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -128,6 +137,7 @@ public int hashCode() {
this.removeInactiveSessionAfter,
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -239,6 +249,11 @@ Duration getWaitForMinSessions() {
return waitForMinSessions;
}

@VisibleForTesting
Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -424,6 +439,7 @@ public static class Builder {
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);

private Clock poolMaintainerClock;

Expand All @@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) {
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() {
/**
* If all sessions are in use and there is no more room for creating new sessions, block for a
* session to become available. Default behavior is same.
*
* <p>By default the requests are blocked for 60s and will fail with a `SpannerException` with
* error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a
* different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
*/
public Builder setBlockIfPoolExhausted() {
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
Expand Down Expand Up @@ -695,6 +716,26 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
return this;
}

/**
* If greater than zero, we wait for said duration when no sessions are available in the {@link
* SessionPool}. When no configuration is passed, the default is a 60s timeout. To avoid setting
* a default value, set the value as null.
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
*/
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
try {
if (acquireSessionTimeout != null) {
Preconditions.checkArgument(
acquireSessionTimeout.toMillis() > 0,
"acquireSessionTimeout should be greater than 0 ns");
}
} catch (ArithmeticException ex) {
throw new IllegalArgumentException(
"acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE");
}
this.acquireSessionTimeout = acquireSessionTimeout;
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
@@ -0,0 +1,210 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner;

import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET;
import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT;
import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Server;
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.MethodSorters;
import org.threeten.bp.Duration;

@Category(SlowTest.class)
@RunWith(JUnit4.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
public class BatchCreateSessionsSlowTest {
private static final String TEST_PROJECT = "my-project";
private static final String TEST_DATABASE_ROLE = "my-role";
private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static LocalChannelProvider channelProvider;
private Spanner spanner;

@BeforeClass
public static void startStaticServer() throws IOException {
mockSpanner = new MockSpannerServiceImpl();
mockSpanner.setAbortProbability(0.0D); // We don't want any unpredictable aborted transactions.
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, MockSpannerTestUtil.SELECT1_RESULTSET));
mockSpanner.putStatementResult(
StatementResult.query(READ_ONE_KEY_VALUE_STATEMENT, READ_ONE_KEY_VALUE_RESULTSET));

String uniqueName = InProcessServerBuilder.generateName();
server =
InProcessServerBuilder.forName(uniqueName)
// We need to use a real executor for timeouts to occur.
.scheduledExecutorService(new ScheduledThreadPoolExecutor(1))
.addService(mockSpanner)
.build()
.start();
channelProvider = LocalChannelProvider.create(uniqueName);
}

@AfterClass
public static void stopServer() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}

@Before
public void setUp() {
spanner =
SpannerOptions.newBuilder()
.setProjectId(TEST_PROJECT)
.setDatabaseRole(TEST_DATABASE_ROLE)
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(SessionPoolOptions.newBuilder().setFailOnSessionLeak().build())
.build()
.getService();
}

@After
public void tearDown() {
mockSpanner.unfreeze();
spanner.close();
mockSpanner.reset();
mockSpanner.removeAllExecutionTimes();
}

@Test
public void testBatchCreateSessionsTimesOut_whenDeadlineExceeded() throws Exception {
// Simulate a minimum execution time of 1000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 100ms.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(100));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// The DEADLINE_EXCEEDED error of the BatchCreateSessions RPC is in this case
// propagated to
// the application.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
}
}

@Test
public void testBatchCreateSessionsTimesOut_whenResourceExhausted() throws Exception {
// Simulate a minimum execution time of 2000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(2000, 0));
// Add a timeout for the max amount of time (60ms) that a request waits when a session is
// unavailable.
SessionPoolOptions sessionPoolOptions =
SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(60)).build();
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(sessionPoolOptions);
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 1000ms. This will ensure that session acquisition timeout of
// 60ms will kick for all requests before the overall request RPC timeout is breached.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(1000));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// When number of requests > MAX_SESSIONS, post setAcquireSessionTimeout
// a few requests will timeout with RESOURCE_EXHAUSTED error.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
}
}
}