Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add session pool option for modelling a timeout around session acquisition. #2641

Merged
merged 13 commits into from Sep 28, 2023
Merged
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-spanner'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.47.0'
implementation 'com.google.cloud:google-cloud-spanner:6.48.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.47.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.48.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -431,7 +431,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.47.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.48.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Expand Up @@ -63,7 +63,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -120,17 +119,6 @@ class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
Expand Down Expand Up @@ -1675,7 +1663,8 @@ public PooledSession get() {
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
Expand All @@ -1685,6 +1674,11 @@ public PooledSession get() {
return s;
}
} catch (Exception e) {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
}
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
Expand All @@ -1693,15 +1687,26 @@ public PooledSession get() {
}
}

private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
private PooledSession pollUninterruptiblyWithTimeout(
long timeoutMillis, Duration acquireSessionTimeout) {
boolean interrupted = false;
try {
while (true) {
try {
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
return acquireSessionTimeout == null
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException e) {
interrupted = true;
} catch (TimeoutException e) {
if (acquireSessionTimeout != null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"Timed out after waiting "
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
+ " or increase the number of sessions in the session pool.");
}
return null;
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
Expand Down
Expand Up @@ -52,9 +52,16 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final boolean trackStackTraceOfSessionCheckout;
private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions;
private final long initialWaitForSessionTimeoutMillis;

/**
* Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while
* acquiring session for a transaction.
*/
@Deprecated private final long initialWaitForSessionTimeoutMillis;

private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -105,6 +113,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -128,6 +137,7 @@ public int hashCode() {
this.removeInactiveSessionAfter,
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -239,6 +249,11 @@ Duration getWaitForMinSessions() {
return waitForMinSessions;
}

@VisibleForTesting
Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -424,6 +439,7 @@ public static class Builder {
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);

private Clock poolMaintainerClock;

Expand All @@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) {
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() {
/**
* If all sessions are in use and there is no more room for creating new sessions, block for a
* session to become available. Default behavior is same.
*
* <p>By default the requests are blocked for 60s and will fail with a `SpannerException` with
* error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a
* different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
*/
public Builder setBlockIfPoolExhausted() {
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
Expand Down Expand Up @@ -695,6 +716,25 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
return this;
}

/**
* If greater than zero, we wait for said duration when no sessions are available in the {@link
* SessionPool}. The default is a 60s timeout. Set the value to null to disable the timeout.
*/
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
try {
if (acquireSessionTimeout != null) {
Preconditions.checkArgument(
acquireSessionTimeout.toMillis() > 0,
"acquireSessionTimeout should be greater than 0 ns");
}
} catch (ArithmeticException ex) {
throw new IllegalArgumentException(
"acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE");
}
this.acquireSessionTimeout = acquireSessionTimeout;
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down