Skip to content

Commit

Permalink
feat: long running transaction clean up background task. Adding confi…
Browse files Browse the repository at this point in the history
…guration options for closing inactive transactions. (#2419)
  • Loading branch information
arpan14 committed Aug 1, 2023
1 parent 5a12cd2 commit 423e1a4
Show file tree
Hide file tree
Showing 7 changed files with 1,181 additions and 30 deletions.
Expand Up @@ -51,6 +51,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -1279,7 +1280,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
try {
return get().executePartitionedUpdate(stmt, options);
return get(true).executePartitionedUpdate(stmt, options);
} finally {
close();
}
Expand Down Expand Up @@ -1332,6 +1333,10 @@ private PooledSession getOrNull() {

@Override
public PooledSession get() {
return get(false);
}

PooledSession get(final boolean eligibleForLongRunning) {
if (inUse.compareAndSet(false, true)) {
PooledSession res = null;
try {
Expand All @@ -1346,6 +1351,7 @@ public PooledSession get() {
incrementNumSessionsInUse();
checkedOutSessions.add(this);
}
res.eligibleForLongRunning = eligibleForLongRunning;
}
initialized.countDown();
}
Expand All @@ -1366,6 +1372,28 @@ final class PooledSession implements Session {
private volatile SpannerException lastException;
private volatile boolean allowReplacing = true;

/**
* Property to mark if the session is eligible to be long-running. This can only be true if the
* session is executing certain types of transactions (for ex - Partitioned DML) which can be
* long-running. By default, most transaction types are not expected to be long-running and
* hence this value is false.
*/
private volatile boolean eligibleForLongRunning = false;

/**
* Property to mark if the session is no longer part of the session pool. For ex - A session
* which is long-running gets cleaned up and removed from the pool.
*/
private volatile boolean isRemovedFromPool = false;

/**
* Property to mark if a leaked session exception is already logged. Given a session maintainer
* thread runs repeatedly at a defined interval, this property allows us to ensure that an
* exception is logged only once per leaked session. This is to avoid noisy repeated logs around
* session leaks for long-running sessions.
*/
private volatile boolean isLeakedExceptionLogged = false;

@GuardedBy("lock")
private SessionState state;

Expand All @@ -1385,6 +1413,11 @@ void setAllowReplacing(boolean allowReplacing) {
this.allowReplacing = allowReplacing;
}

@VisibleForTesting
void setEligibleForLongRunning(boolean eligibleForLongRunning) {
this.eligibleForLongRunning = eligibleForLongRunning;
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
Expand Down Expand Up @@ -1485,7 +1518,7 @@ public void close() {
numSessionsInUse--;
numSessionsReleased++;
}
if (lastException != null && isSessionNotFound(lastException)) {
if ((lastException != null && isSessionNotFound(lastException)) || isRemovedFromPool) {
invalidateSession(this);
} else {
if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) {
Expand All @@ -1499,6 +1532,7 @@ public void close() {
}
}
lastException = null;
isRemovedFromPool = false;
if (state != SessionState.CLOSING) {
state = SessionState.AVAILABLE;
}
Expand Down Expand Up @@ -1651,6 +1685,10 @@ private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
* <li>Keeps alive sessions that have not been used for a user configured time in order to keep
* MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
* over a window of 10 minutes to avoid bursty traffic.
* <li>Removes unexpected long running transactions from the pool. Only certain transaction
* types (for ex - Partitioned DML / Batch Reads) can be long running. This tasks checks the
* sessions which have been inactive for a longer than usual duration (for ex - 60 minutes)
* and removes such sessions from the pool.
* </ul>
*/
final class PoolMaintainer {
Expand All @@ -1659,16 +1697,24 @@ final class PoolMaintainer {
private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10));
// Frequency of the timer loop.
@VisibleForTesting final long loopFrequency = options.getLoopFrequency();
// Number of loop iterations in which we need to to close all the sessions waiting for closure.
// Number of loop iterations in which we need to close all the sessions waiting for closure.
@VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency;
private final Duration keepAliveMillis =
Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()));
// Number of loop iterations in which we need to keep alive all the sessions
@VisibleForTesting final long numKeepAliveCycles = keepAliveMillis.toMillis() / loopFrequency;

Instant lastResetTime = Instant.ofEpochMilli(0);
int numSessionsToClose = 0;
int sessionsToClosePerLoop = 0;
/**
* Variable maintaining the last execution time of the long-running transaction cleanup task.
*
* <p>The long-running transaction cleanup needs to be performed every X minutes. The X minutes
* recurs multiple times within the invocation of the pool maintainer thread. For ex - If the
* main thread runs every 10s and the long-running transaction clean-up needs to be performed
* every 2 minutes, then we need to keep a track of when was the last time that this task
* executed and makes sure we only execute it every 2 minutes and not every 10 seconds.
*/
@VisibleForTesting Instant lastExecutionTime;

boolean closed = false;

@GuardedBy("lock")
Expand All @@ -1678,6 +1724,7 @@ final class PoolMaintainer {
boolean running;

void init() {
lastExecutionTime = clock.instant();
// Scheduled pool maintenance worker.
synchronized (lock) {
scheduledFuture =
Expand Down Expand Up @@ -1723,6 +1770,7 @@ void maintainPool() {
decrementPendingClosures(1);
}
}
removeLongRunningSessions(currTime);
}

private void removeIdleSessions(Instant currTime) {
Expand All @@ -1736,7 +1784,13 @@ private void removeIdleSessions(Instant currTime) {
PooledSession session = iterator.next();
if (session.lastUseTime.isBefore(minLastUseTime)) {
if (session.state != SessionState.CLOSING) {
removeFromPool(session);
boolean isRemoved = removeFromPool(session);
if (isRemoved) {
numIdleSessionsRemoved++;
if (idleSessionRemovedListener != null) {
idleSessionRemovedListener.apply(session);
}
}
iterator.remove();
}
}
Expand Down Expand Up @@ -1792,6 +1846,87 @@ private void replenishPool() {
}
}
}

// cleans up sessions which are unexpectedly long-running.
void removeLongRunningSessions(Instant currentTime) {
try {
if (SessionPool.this.isClosed()) {
return;
}
final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
options.getInactiveTransactionRemovalOptions();
final Instant minExecutionTime =
lastExecutionTime.plus(inactiveTransactionRemovalOptions.getExecutionFrequency());
if (currentTime.isBefore(minExecutionTime)) {
return;
}
lastExecutionTime = currentTime; // update this only after we have decided to execute task
if (options.closeInactiveTransactions()
|| options.warnInactiveTransactions()
|| options.warnAndCloseInactiveTransactions()) {
removeLongRunningSessions(currentTime, inactiveTransactionRemovalOptions);
}
} catch (final Throwable t) {
logger.log(Level.WARNING, "Failed removing long running transactions", t);
}
}

private void removeLongRunningSessions(
final Instant currentTime,
final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions) {
synchronized (lock) {
final double usedSessionsRatio = getRatioOfSessionsInUse();
if (usedSessionsRatio > inactiveTransactionRemovalOptions.getUsedSessionsRatioThreshold()) {
Iterator<PooledSessionFuture> iterator = checkedOutSessions.iterator();
while (iterator.hasNext()) {
final PooledSessionFuture sessionFuture = iterator.next();
// the below get() call on future object is non-blocking since checkedOutSessions
// collection is populated only when the get() method in {@code PooledSessionFuture} is
// called.
final PooledSession session = sessionFuture.get();
final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime);
if (!session.eligibleForLongRunning
&& durationFromLastUse.compareTo(
inactiveTransactionRemovalOptions.getIdleTimeThreshold())
> 0) {
if ((options.warnInactiveTransactions() || options.warnAndCloseInactiveTransactions())
&& !session.isLeakedExceptionLogged) {
if (options.warnAndCloseInactiveTransactions()) {
logger.log(
Level.WARNING,
String.format("Removing long-running session => %s", session.getName()),
sessionFuture.leakedException);
session.isLeakedExceptionLogged = true;
} else if (options.warnInactiveTransactions()) {
logger.log(
Level.WARNING,
String.format(
"Detected long-running session => %s. To automatically remove "
+ "long-running sessions, set SessionOption ActionOnInactiveTransaction "
+ "to WARN_AND_CLOSE by invoking setWarnAndCloseIfInactiveTransactions() method.",
session.getName()),
sessionFuture.leakedException);
session.isLeakedExceptionLogged = true;
}
}
if ((options.closeInactiveTransactions()
|| options.warnAndCloseInactiveTransactions())
&& session.state != SessionState.CLOSING) {
final boolean isRemoved = removeFromPool(session);
if (isRemoved) {
session.isRemovedFromPool = true;
numLeakedSessionsRemoved++;
if (longRunningSessionRemovedListener != null) {
longRunningSessionRemovedListener.apply(session);
}
}
iterator.remove();
}
}
}
}
}
}
}

private enum Position {
Expand Down Expand Up @@ -1872,6 +2007,9 @@ private enum Position {
@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

@GuardedBy("lock")
private long numLeakedSessionsRemoved = 0;

private AtomicLong numWaiterTimeouts = new AtomicLong();

@GuardedBy("lock")
Expand All @@ -1885,6 +2023,8 @@ private enum Position {

@VisibleForTesting Function<PooledSession, Void> idleSessionRemovedListener;

@VisibleForTesting Function<PooledSession, Void> longRunningSessionRemovedListener;

private final CountDownLatch waitOnMinSessionsLatch;

/**
Expand All @@ -1895,12 +2035,16 @@ private enum Position {
*/
static SessionPool createPool(
SpannerOptions spannerOptions, SessionClient sessionClient, List<LabelValue> labelValues) {
final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions();

// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
final Clock poolMaintainerClock = sessionPoolOptions.getPoolMaintainerClock();
return createPool(
spannerOptions.getSessionPoolOptions(),
sessionPoolOptions,
spannerOptions.getDatabaseRole(),
((GrpcTransportOptions) spannerOptions.getTransportOptions()).getExecutorFactory(),
sessionClient,
new Clock(),
poolMaintainerClock == null ? new Clock() : poolMaintainerClock,
Metrics.getMetricRegistry(),
labelValues);
}
Expand Down Expand Up @@ -2015,18 +2159,26 @@ int getNumberOfSessionsInUse() {
}
}

void removeFromPool(PooledSession session) {
@VisibleForTesting
double getRatioOfSessionsInUse() {
synchronized (lock) {
final int maxSessions = options.getMaxSessions();
if (maxSessions == 0) {
return 0;
}
return (double) numSessionsInUse / maxSessions;
}
}

boolean removeFromPool(PooledSession session) {
synchronized (lock) {
if (isClosed()) {
decrementPendingClosures(1);
return;
return false;
}
session.markClosing();
allSessions.remove(session);
numIdleSessionsRemoved++;
}
if (idleSessionRemovedListener != null) {
idleSessionRemovedListener.apply(session);
return true;
}
}

Expand All @@ -2036,6 +2188,13 @@ long numIdleSessionsRemoved() {
}
}

@VisibleForTesting
long numLeakedSessionsRemoved() {
synchronized (lock) {
return numLeakedSessionsRemoved;
}
}

@VisibleForTesting
int getNumberOfSessionsInPool() {
synchronized (lock) {
Expand Down

0 comments on commit 423e1a4

Please sign in to comment.