Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add session pool option for modelling a timeout around session acquisition. #2641

Merged
merged 13 commits into from Sep 28, 2023
Merged
Expand Up @@ -63,7 +63,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -120,17 +119,6 @@ class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
Expand Down Expand Up @@ -1675,7 +1663,10 @@ public PooledSession get() {
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
// TODO clean up this code since acquireSessionTimeout will never be null and hence
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
// we won't have a use-case when s == null
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
Expand All @@ -1685,6 +1676,11 @@ public PooledSession get() {
return s;
}
} catch (Exception e) {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
}
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
Expand All @@ -1693,15 +1689,27 @@ public PooledSession get() {
}
}

private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
private PooledSession pollUninterruptiblyWithTimeout(
long timeoutMillis, Duration acquireSessionTimeout) {
boolean interrupted = false;
try {
while (true) {
try {
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
// TODO refactor this code since eventually acquireSessionTimeout will always have a
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
// default value and won't be null
return acquireSessionTimeout == null
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
interrupted = true;
} catch (TimeoutException e) {
if (acquireSessionTimeout != null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"Timed out after waiting "
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session.");
}
return null;
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
Expand Down
Expand Up @@ -55,6 +55,7 @@ public class SessionPoolOptions {
private final long initialWaitForSessionTimeoutMillis;
private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -78,6 +79,7 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -105,6 +107,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -128,6 +131,7 @@ public int hashCode() {
this.removeInactiveSessionAfter,
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -239,6 +243,11 @@ Duration getWaitForMinSessions() {
return waitForMinSessions;
}

@VisibleForTesting
Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -424,6 +433,7 @@ public static class Builder {
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);

private Clock poolMaintainerClock;

Expand All @@ -446,6 +456,7 @@ private Builder(SessionPoolOptions options) {
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -538,6 +549,9 @@ public Builder setFailIfPoolExhausted() {
/**
* If all sessions are in use and there is no more room for creating new sessions, block for a
* session to become available. Default behavior is same.
*
* <p>By default the requests are blocked for 60s. If we wish to block for a different period
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
* use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
*/
public Builder setBlockIfPoolExhausted() {
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
Expand Down Expand Up @@ -695,6 +709,15 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
return this;
}

/**
* If greater than zero, we wait for said duration when no sessions are available in the {@link
* SessionPool}. When no configuration is passed, we default to 60s to acquire a session.
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
*/
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
this.acquireSessionTimeout = acquireSessionTimeout;
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Expand Up @@ -66,6 +66,10 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -2207,6 +2211,107 @@ public void testBatchCreateSessionsPermissionDenied() {
}
}

@Test
public void testBatchCreateSessionsTimesOut_whenDeadlineExceeded() throws Exception {
// Simulate a minimum execution time of 1000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(1000, 0));
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance());
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 100ms.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(100));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// The DEADLINE_EXCEEDED error of the BatchCreateSessions RPC is in this case
// propagated to
// the application.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Test
public void testBatchCreateSessionsTimesOut_whenResourceExhausted() throws Exception {
// Simulate a minimum execution time of 2000 milliseconds for the BatchCreateSessions RPC.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(2000, 0));
// Add a timeout for the max amount of time (60ms) that a request waits when a session is
// unavailable.
SessionPoolOptions sessionPoolOptions =
SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofMillis(60)).build();
SpannerOptions.Builder builder =
SpannerOptions.newBuilder()
.setProjectId("my-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(sessionPoolOptions);
// Set the timeout and retry settings for BatchCreateSessions to a simple
// single-attempt-and-timeout after 1000ms. This will ensure that session acquisition timeout of
// 60ms will kick for all requests before the overall request RPC timeout is breached.
builder
.getSpannerStubSettingsBuilder()
.batchCreateSessionsSettings()
.setSimpleTimeoutNoRetries(Duration.ofMillis(1000));

try (Spanner spanner = builder.build().getService()) {
DatabaseId databaseId = DatabaseId.of("my-project", "my-instance", "my-database");
DatabaseClient client = spanner.getDatabaseClient(databaseId);

ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1000));
List<ListenableFuture<Void>> futures = new ArrayList<>(5000);
AtomicInteger counter = new AtomicInteger();
for (int i = 0; i < 5000; i++) {
final int index = i;
futures.add(
service.submit(
() -> {
// The following call is non-blocking and will not generate an exception.
ResultSet rs = client.singleUse().executeQuery(SELECT1);
// Actually trying to get any results will cause an exception.
// When number of requests > MAX_SESSIONS, post setAcquireSessionTimeout
// a few requests will timeout with RESOURCE_EXHAUSTED error.
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertEquals(ErrorCode.RESOURCE_EXHAUSTED, e.getErrorCode());
System.out.printf("finished test %d\n", counter.incrementAndGet());

return null;
}));
}
service.shutdown();
assertEquals(5000, Futures.allAsList(futures).get().size());
}
}
arpan14 marked this conversation as resolved.
Show resolved Hide resolved

arpan14 marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testExceptionIncludesStatement() {
mockSpanner.setExecuteStreamingSqlExecutionTime(
Expand Down
Expand Up @@ -186,4 +186,19 @@ public void setNegativeIdleTimeThreshold() {
SessionPoolOptions.newBuilder()
.setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions);
}

@Test
public void setAcquireSessionTimeout() {
SessionPoolOptions sessionPoolOptions =
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
SessionPoolOptions.newBuilder().setAcquireSessionTimeout(Duration.ofSeconds(20)).build();

assertEquals(Duration.ofSeconds(20), sessionPoolOptions.getAcquireSessionTimeout());
}

@Test
public void verifyDefaultAcquireSessionTimeout() {
SessionPoolOptions sessionPoolOptions = SessionPoolOptions.newBuilder().build();

assertEquals(Duration.ofSeconds(60), sessionPoolOptions.getAcquireSessionTimeout());
}
}