Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: long running transaction clean up background task. Adding configuration options for closing inactive transactions. #2419

Merged
merged 57 commits into from Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
edc5bbf
fix: prevent illegal negative timeout values into thread sleep() meth…
arpan14 Feb 6, 2023
49a85df
Merge pull request #1 from arpan14/retryerror
arpan14 Feb 8, 2023
4cd497b
Fixing lint issues.
arpan14 Feb 8, 2023
4a6aa8e
Merge branch 'googleapis:main' into main
arpan14 Mar 13, 2023
b2aa09d
Merge branch 'googleapis:main' into main
arpan14 Mar 15, 2023
8d6d71e
Merge branch 'googleapis:main' into main
arpan14 May 9, 2023
34502d1
feat: long running transaction clean up background task. Adding confi…
arpan14 Apr 21, 2023
660cbcf
fix: linting issues and tests.
arpan14 May 9, 2023
82f9a09
fix: avoid refactoring existing tests.
arpan14 May 9, 2023
65a42f7
fix:lint issues and unit tests.
arpan14 May 9, 2023
d76b0b1
docs:adding documentation.
arpan14 May 10, 2023
fcf1565
fix:linting issues.
arpan14 May 10, 2023
7858d7c
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
342eed8
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
227dd27
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
ab05e08
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
3f07bf6
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
cbc93e6
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
5f36519
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
cf5f8a3
Apply suggestions from code review
arpan14 Jun 6, 2023
3469bed
fix:comments on PR.
arpan14 Jun 6, 2023
c27eaa9
fix:rename variables.
arpan14 Jun 6, 2023
ded6d75
fix:variable name rename.
arpan14 Jun 6, 2023
b4b4033
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 6, 2023
4423664
Revert "Update google-cloud-spanner/src/main/java/com/google/cloud/sp…
arpan14 Jun 6, 2023
a9da9dd
fix:variable name
arpan14 Jun 6, 2023
6239429
fix:removed lock.
arpan14 Jun 12, 2023
acdc9ac
fix:add handling to prevent duplicate session leak logs.
arpan14 Jun 13, 2023
480c719
fix:avoid long delays in integration tests.
arpan14 Jun 19, 2023
019e100
fix:lint issues.
arpan14 Jun 19, 2023
ff144ae
fix:pr comments around pool options, mock spanner.
arpan14 Jun 20, 2023
efe888f
fix: cleaned up session being returned back to the pool.
arpan14 Jun 22, 2023
d0d2c91
fix:mock timers to avoid flaky tests.
arpan14 Jun 25, 2023
9f0a100
fix:flaky tests.
arpan14 Jun 25, 2023
b96e940
fix:flaky tests.
arpan14 Jun 25, 2023
ecd2f4b
feat: support a new option to silently close inactive transactions wi…
arpan14 Jun 25, 2023
6b72583
fix:variable names.
arpan14 Jun 26, 2023
8ca552b
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 28, 2023
ed84ad1
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 28, 2023
a1b4b5f
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 28, 2023
c011d4c
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 28, 2023
6c76a05
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jun 28, 2023
dee86d0
fix:review comments.
arpan14 Jul 4, 2023
b3b45e2
Merge branch 'googleapis:main' into session-leak-pr
arpan14 Jul 6, 2023
e82c54f
Merge branch 'googleapis:main' into session-leak-pr
arpan14 Jul 10, 2023
c3e39ff
fix:add more unit tests for session pool options.
arpan14 Jul 10, 2023
db37f9d
Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/Se…
arpan14 Jul 11, 2023
4edaa52
docs:update as per PR comment.
arpan14 Jul 11, 2023
416ca61
Merge branch 'googleapis:main' into session-leak-pr
arpan14 Jul 18, 2023
ee3ec84
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 25, 2023
943c2c5
Merge branch 'googleapis:main' into session-leak-pr
arpan14 Jul 25, 2023
a4faf05
fix: solve race conditions for PDML transactions.
arpan14 Jul 25, 2023
3f1e752
fix: make method scope to be package-private.
arpan14 Jul 25, 2023
d1f599e
test: add benchmark for long-running sessions clean up task.
arpan14 Jul 25, 2023
c41fbd7
fix: disable the feature by default.
arpan14 Aug 1, 2023
ea39b03
Merge branch 'googleapis:main' into session-leak-pr
arpan14 Aug 1, 2023
d9ce080
fix: remove benchmark code.
arpan14 Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,6 +51,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1279,7 +1280,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
try {
return get().executePartitionedUpdate(stmt, options);
return get(true).executePartitionedUpdate(stmt, options);
} finally {
close();
}
Expand Down Expand Up @@ -1332,6 +1333,10 @@ private PooledSession getOrNull() {

@Override
public PooledSession get() {
return get(false);
}

PooledSession get(final boolean eligibleForLongRunning) {
if (inUse.compareAndSet(false, true)) {
PooledSession res = null;
try {
Expand All @@ -1346,6 +1351,7 @@ public PooledSession get() {
incrementNumSessionsInUse();
checkedOutSessions.add(this);
}
res.eligibleForLongRunning = eligibleForLongRunning;
}
initialized.countDown();
}
Expand All @@ -1366,6 +1372,28 @@ final class PooledSession implements Session {
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

/**
* Property to mark if the session is eligible to be long-running. This can only be true if the
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
* session is executing certain types of transactions (for ex - Partitioned DML) which can be
* long-running. By default, most transaction types are not expected to be long-running and
* hence this value is false.
*/
private volatile boolean eligibleForLongRunning = false;

/**
* Property to mark if the session is no longer part of the session pool. For ex - A session
* which is long-running gets cleaned up and removed from the pool.
*/
private volatile boolean isRemovedFromPool = false;

/**
* Property to mark if a leaked session exception is already logged. Given a session maintainer
* thread runs repeatedly at a defined interval, this property allows us to ensure that an
* exception is logged only once per leaked session. This is to avoid noisy repeated logs around
* session leaks for long-running sessions.
*/
private volatile boolean isLeakedExceptionLogged = false;
rajatbhatta marked this conversation as resolved.
Show resolved Hide resolved

@GuardedBy("lock")
private SessionState state;

Expand All @@ -1385,6 +1413,11 @@ void setAllowReplacing(boolean allowReplacing) {
this.allowReplacing = allowReplacing;
}

@VisibleForTesting
void setEligibleForLongRunning(boolean eligibleForLongRunning) {
this.eligibleForLongRunning = eligibleForLongRunning;
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
Expand Down Expand Up @@ -1485,7 +1518,7 @@ public void close() {
numSessionsInUse--;
numSessionsReleased++;
}
if (lastException != null && isSessionNotFound(lastException)) {
if ((lastException != null && isSessionNotFound(lastException)) || isRemovedFromPool) {
invalidateSession(this);
} else {
if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) {
Expand All @@ -1499,6 +1532,7 @@ public void close() {
}
}
lastException = null;
isRemovedFromPool = false;
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
}
Expand Down Expand Up @@ -1651,6 +1685,10 @@ private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
* <li>Keeps alive sessions that have not been used for a user configured time in order to keep
* MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
* over a window of 10 minutes to avoid bursty traffic.
* <li>Removes unexpected long running transactions from the pool. Only certain transaction
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
* types (for ex - Partitioned DML / Batch Reads) can be long running. This tasks checks the
* sessions which have been inactive for a longer than usual duration (for ex - 60 minutes)
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
* and removes such sessions from the pool.
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
* </ul>
*/
final class PoolMaintainer {
Expand All @@ -1659,16 +1697,24 @@ final class PoolMaintainer {
private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10));
// Frequency of the timer loop.
@VisibleForTesting final long loopFrequency = options.getLoopFrequency();
// Number of loop iterations in which we need to to close all the sessions waiting for closure.
// Number of loop iterations in which we need to close all the sessions waiting for closure.
@VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency;
private final Duration keepAliveMillis =
Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()));
// Number of loop iterations in which we need to keep alive all the sessions
@VisibleForTesting final long numKeepAliveCycles = keepAliveMillis.toMillis() / loopFrequency;

Instant lastResetTime = Instant.ofEpochMilli(0);
int numSessionsToClose = 0;
int sessionsToClosePerLoop = 0;
/**
* Variable maintaining the last execution time of the long-running transaction cleanup task.
*
* <p>The long-running transaction cleanup needs to be performed every X minutes. The X minutes
* recurs multiple times within the invocation of the pool maintainer thread. For ex - If the
* main thread runs every 10s and the long-running transaction clean-up needs to be performed
* every 2 minutes, then we need to keep a track of when was the last time that this task
* executed and makes sure we only execute it every 2 minutes and not every 10 seconds.
*/
@VisibleForTesting Instant lastExecutionTime;

boolean closed = false;

@GuardedBy("lock")
Expand All @@ -1678,6 +1724,7 @@ final class PoolMaintainer {
boolean running;

void init() {
lastExecutionTime = clock.instant();
// Scheduled pool maintenance worker.
synchronized (lock) {
scheduledFuture =
Expand Down Expand Up @@ -1723,6 +1770,7 @@ void maintainPool() {
decrementPendingClosures(1);
}
}
removeLongRunningSessions(currTime);
}

private void removeIdleSessions(Instant currTime) {
Expand All @@ -1736,7 +1784,13 @@ private void removeIdleSessions(Instant currTime) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(minLastUseTime)) {
if (session.state != SessionState.CLOSING) {
removeFromPool(session);
boolean isRemoved = removeFromPool(session);
if (isRemoved) {
numIdleSessionsRemoved++;
if (idleSessionRemovedListener != null) {
idleSessionRemovedListener.apply(session);
}
}
iterator.remove();
}
}
Expand Down Expand Up @@ -1792,6 +1846,87 @@ private void replenishPool() {
}
}
}

// cleans up sessions which are unexpectedly long-running.
void removeLongRunningSessions(Instant currentTime) {
try {
if (SessionPool.this.isClosed()) {
return;
}
final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
options.getInactiveTransactionRemovalOptions();
final Instant minExecutionTime =
lastExecutionTime.plus(inactiveTransactionRemovalOptions.getExecutionFrequency());
if (currentTime.isBefore(minExecutionTime)) {
return;
}
lastExecutionTime = currentTime; // update this only after we have decided to execute task
if (options.closeInactiveTransactions()
|| options.warnInactiveTransactions()
|| options.warnAndCloseInactiveTransactions()) {
removeLongRunningSessions(currentTime, inactiveTransactionRemovalOptions);
}
} catch (final Throwable t) {
logger.log(Level.WARNING, "Failed removing long running transactions", t);
}
}

private void removeLongRunningSessions(
final Instant currentTime,
final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions) {
synchronized (lock) {
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
final double usedSessionsRatio = getRatioOfSessionsInUse();
if (usedSessionsRatio > inactiveTransactionRemovalOptions.getUsedSessionsRatioThreshold()) {
Iterator<PooledSessionFuture> iterator = checkedOutSessions.iterator();
while (iterator.hasNext()) {
final PooledSessionFuture sessionFuture = iterator.next();
// the below get() call on future object is non-blocking since checkedOutSessions
// collection is populated only when the get() method in {@code PooledSessionFuture} is
// called.
final PooledSession session = sessionFuture.get();
final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime);
if (!session.eligibleForLongRunning
&& durationFromLastUse.compareTo(
inactiveTransactionRemovalOptions.getIdleTimeThreshold())
> 0) {
if ((options.warnInactiveTransactions() || options.warnAndCloseInactiveTransactions())
&& !session.isLeakedExceptionLogged) {
if (options.warnAndCloseInactiveTransactions()) {
logger.log(
Level.WARNING,
String.format("Removing long-running session => %s", session.getName()),
sessionFuture.leakedException);
session.isLeakedExceptionLogged = true;
} else if (options.warnInactiveTransactions()) {
logger.log(
Level.WARNING,
String.format(
"Detected long-running session => %s. To automatically remove "
+ "long-running sessions, set SessionOption ActionOnInactiveTransaction "
+ "to WARN_AND_CLOSE by invoking setWarnAndCloseIfInactiveTransactions() method.",
session.getName()),
sessionFuture.leakedException);
session.isLeakedExceptionLogged = true;
}
}
if ((options.closeInactiveTransactions()
|| options.warnAndCloseInactiveTransactions())
&& session.state != SessionState.CLOSING) {
final boolean isRemoved = removeFromPool(session);
if (isRemoved) {
session.isRemovedFromPool = true;
numLeakedSessionsRemoved++;
if (longRunningSessionRemovedListener != null) {
longRunningSessionRemovedListener.apply(session);
arpan14 marked this conversation as resolved.
Show resolved Hide resolved
}
}
iterator.remove();
}
}
}
}
}
}
}

private enum Position {
Expand Down Expand Up @@ -1872,6 +2007,9 @@ private enum Position {
@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

@GuardedBy("lock")
private long numLeakedSessionsRemoved = 0;

private AtomicLong numWaiterTimeouts = new AtomicLong();

@GuardedBy("lock")
Expand All @@ -1885,6 +2023,8 @@ private enum Position {

@VisibleForTesting Function<PooledSession, Void> idleSessionRemovedListener;

@VisibleForTesting Function<PooledSession, Void> longRunningSessionRemovedListener;

private final CountDownLatch waitOnMinSessionsLatch;

/**
Expand All @@ -1895,12 +2035,16 @@ private enum Position {
*/
static SessionPool createPool(
SpannerOptions spannerOptions, SessionClient sessionClient, List<LabelValue> labelValues) {
final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions();

// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
final Clock poolMaintainerClock = sessionPoolOptions.getPoolMaintainerClock();
return createPool(
spannerOptions.getSessionPoolOptions(),
sessionPoolOptions,
spannerOptions.getDatabaseRole(),
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
new Clock(),
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Metrics.getMetricRegistry(),
labelValues);
}
Expand Down Expand Up @@ -2015,18 +2159,26 @@ int getNumberOfSessionsInUse() {
}
}

void removeFromPool(PooledSession session) {
@VisibleForTesting
double getRatioOfSessionsInUse() {
synchronized (lock) {
final int maxSessions = options.getMaxSessions();
if (maxSessions == 0) {
return 0;
}
return (double) numSessionsInUse / maxSessions;
}
}

boolean removeFromPool(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
decrementPendingClosures(1);
return;
return false;
}
session.markClosing();
allSessions.remove(session);
numIdleSessionsRemoved++;
}
if (idleSessionRemovedListener != null) {
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
idleSessionRemovedListener.apply(session);
return true;
}
}

Expand All @@ -2036,6 +2188,13 @@ long numIdleSessionsRemoved() {
}
}

@VisibleForTesting
long numLeakedSessionsRemoved() {
synchronized (lock) {
return numLeakedSessionsRemoved;
}
}

@VisibleForTesting
int getNumberOfSessionsInPool() {
synchronized (lock) {
Expand Down