Skip to content

Commit

Permalink
feat: delay transaction start option (#2462)
Browse files Browse the repository at this point in the history
* feat: delay transaction start option

Adds an opt-in to delay the actual start of a read/write transaction
until the first write operation. This reduces lock contention and can
reduce latency for read/write transactions that execute (many) read
operations before any write operations, at the expense of a lower
transaction isolation level. Typical workloads that benefit from this
option are ORMs (e.g. Hibernate). The application must be able to handle
the lower isolation level.

* fix: clirr check

* fix: only create tx manager if needed

* test: add integration tests

* test: skip concurrency tests on emulator

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
olavloite and gcf-owl-bot[bot] committed Jun 2, 2023
1 parent ee6548c commit f1cbd16
Show file tree
Hide file tree
Showing 23 changed files with 36,044 additions and 31,572 deletions.
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -347,4 +347,16 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void rollbackToSavepoint(java.lang.String)</method>
</difference>

<!-- Delay start transaction -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setDelayTransactionStartUntilFirstWrite(boolean)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>
</differences>
Expand Up @@ -99,10 +99,12 @@ public boolean isActive() {
abstract void checkAborted();

/**
* Check that the current transaction actually has a valid underlying transaction. If not, the
* method will throw a {@link SpannerException}.
* Check that the current transaction actually has a valid underlying transaction and creates it
* if necessary. If the transaction does not have a valid underlying transaction and/or is not in
* a state that allows the creation of a transaction, the method will throw a {@link
* SpannerException}.
*/
abstract void checkValidTransaction(CallType callType);
abstract void checkOrCreateValidTransaction(ParsedStatement statement, CallType callType);

/** Returns the {@link ReadContext} that can be used for queries on this transaction. */
abstract ReadContext getReadContext();
Expand All @@ -114,7 +116,7 @@ public ApiFuture<ResultSet> executeQueryAsync(
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(statement.isQuery(), "Statement is not a query");
checkValidTransaction(callType);
checkOrCreateValidTransaction(statement, callType);
return executeStatementAsync(
callType,
statement,
Expand Down
Expand Up @@ -563,6 +563,29 @@ default RpcPriority getRPCPriority() {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Sets whether this connection should delay the actual start of a read/write transaction until
* the first write operation is observed on that transaction. All read operations that are
* executed before the first write operation in the transaction will be executed as if the
* connection was in auto-commit mode. This can reduce locking, especially for transactions that
* execute a large number of reads before any writes, at the expense of a lower transaction
* isolation.
*
* <p>NOTE: This will make read/write transactions non-serializable.
*/
default void setDelayTransactionStartUntilFirstWrite(
boolean delayTransactionStartUntilFirstWrite) {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* @return true if this connection delays the actual start of a read/write transaction until the
* first write operation on that transaction.
*/
default boolean isDelayTransactionStartUntilFirstWrite() {
throw new UnsupportedOperationException("Unimplemented");
}

/**
* Commits the current transaction of this connection. All mutations that have been buffered
* during the current transaction will be written to the database.
Expand Down
Expand Up @@ -193,6 +193,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private boolean autocommit;
private boolean readOnly;
private boolean returnCommitStats;
private boolean delayTransactionStartUntilFirstWrite;

private UnitOfWork currentUnitOfWork = null;
/**
Expand Down Expand Up @@ -239,6 +240,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.rpcPriority = options.getRPCPriority();
this.returnCommitStats = options.isReturnCommitStats();
this.delayTransactionStartUntilFirstWrite = options.isDelayTransactionStartUntilFirstWrite();
this.ddlClient = createDdlClient();
setDefaultTransactionOptions();
}
Expand Down Expand Up @@ -744,6 +746,22 @@ public boolean isReturnCommitStats() {
return this.returnCommitStats;
}

@Override
public void setDelayTransactionStartUntilFirstWrite(
boolean delayTransactionStartUntilFirstWrite) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"Cannot set DelayTransactionStartUntilFirstWrite while a transaction is active");
this.delayTransactionStartUntilFirstWrite = delayTransactionStartUntilFirstWrite;
}

@Override
public boolean isDelayTransactionStartUntilFirstWrite() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.delayTransactionStartUntilFirstWrite;
}

/** Resets this connection to its default transaction options. */
private void setDefaultTransactionOptions() {
if (transactionStack.isEmpty()) {
Expand Down Expand Up @@ -1376,6 +1394,7 @@ UnitOfWork createNewUnitOfWork() {
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
.setDatabaseClient(dbClient)
.setDelayTransactionStartUntilFirstWrite(delayTransactionStartUntilFirstWrite)
.setRetryAbortsInternally(retryAbortsInternally)
.setSavepointSupport(savepointSupport)
.setReturnCommitStats(returnCommitStats)
Expand Down
Expand Up @@ -172,6 +172,7 @@ public String[] getValidValues() {
private static final RpcPriority DEFAULT_RPC_PRIORITY = null;
private static final boolean DEFAULT_RETURN_COMMIT_STATS = false;
private static final boolean DEFAULT_LENIENT = false;
private static final boolean DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE = false;
private static final boolean DEFAULT_TRACK_SESSION_LEAKS = true;
private static final boolean DEFAULT_TRACK_CONNECTION_LEAKS = true;

Expand Down Expand Up @@ -220,6 +221,9 @@ public String[] getValidValues() {
private static final String DIALECT_PROPERTY_NAME = "dialect";
/** Name of the 'databaseRole' connection property. */
public static final String DATABASE_ROLE_PROPERTY_NAME = "databaseRole";
/** Name of the 'delay transaction start until first write' property. */
public static final String DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE_NAME =
"delayTransactionStartUntilFirstWrite";
/** Name of the 'trackStackTraceOfSessionCheckout' connection property. */
public static final String TRACK_SESSION_LEAKS_PROPERTY_NAME = "trackSessionLeaks";
/** Name of the 'trackStackTraceOfConnectionCreation' connection property. */
Expand Down Expand Up @@ -294,6 +298,14 @@ public String[] getValidValues() {
ConnectionProperty.createStringProperty(
DATABASE_ROLE_PROPERTY_NAME,
"Sets the database role to use for this connection. The default is privileges assigned to IAM role"),
ConnectionProperty.createBooleanProperty(
DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE_NAME,
"Enabling this option will delay the actual start of a read/write transaction until the first write operation is seen in that transaction. "
+ "All reads that happen before the first write in a transaction will instead be executed as if the connection was in auto-commit mode. "
+ "Enabling this option will make read/write transactions lose their SERIALIZABLE isolation level. Read operations that are executed after "
+ "the first write operation in a read/write transaction will be executed using the read/write transaction. Enabling this mode can reduce locking "
+ "and improve performance for applications that can handle the lower transaction isolation semantics.",
DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE),
ConnectionProperty.createBooleanProperty(
TRACK_SESSION_LEAKS_PROPERTY_NAME,
"Capture the call stack of the thread that checked out a session of the session pool. This will "
Expand Down Expand Up @@ -568,6 +580,7 @@ public static Builder newBuilder() {
private final boolean returnCommitStats;
private final boolean autoConfigEmulator;
private final RpcPriority rpcPriority;
private final boolean delayTransactionStartUntilFirstWrite;
private final boolean trackSessionLeaks;
private final boolean trackConnectionLeaks;

Expand Down Expand Up @@ -614,6 +627,7 @@ private ConnectionOptions(Builder builder) {
this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri);
this.host = determineHost(matcher, autoConfigEmulator, usePlainText);
this.rpcPriority = parseRPCPriority(this.uri);
this.delayTransactionStartUntilFirstWrite = parseDelayTransactionStartUntilFirstWrite(this.uri);
this.trackSessionLeaks = parseTrackSessionLeaks(this.uri);
this.trackConnectionLeaks = parseTrackConnectionLeaks(this.uri);

Expand Down Expand Up @@ -867,6 +881,14 @@ static boolean parseLenient(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_LENIENT;
}

@VisibleForTesting
static boolean parseDelayTransactionStartUntilFirstWrite(String uri) {
String value = parseUriProperty(uri, DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE_NAME);
return value != null
? Boolean.parseBoolean(value)
: DEFAULT_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
}

@VisibleForTesting
static boolean parseTrackSessionLeaks(String uri) {
String value = parseUriProperty(uri, TRACK_SESSION_LEAKS_PROPERTY_NAME);
Expand Down Expand Up @@ -1119,6 +1141,14 @@ RpcPriority getRPCPriority() {
return rpcPriority;
}

/**
* Whether connections created by this {@link ConnectionOptions} should delay the actual start of
* a read/write transaction until the first write operation.
*/
boolean isDelayTransactionStartUntilFirstWrite() {
return delayTransactionStartUntilFirstWrite;
}

boolean isTrackConnectionLeaks() {
return this.trackConnectionLeaks;
}
Expand Down
Expand Up @@ -76,6 +76,11 @@ interface ConnectionStatementExecutor {

StatementResult statementShowReturnCommitStats();

StatementResult statementSetDelayTransactionStartUntilFirstWrite(
Boolean delayTransactionStartUntilFirstWrite);

StatementResult statementShowDelayTransactionStartUntilFirstWrite();

StatementResult statementSetStatementTag(String tag);

StatementResult statementShowStatementTag();
Expand Down
Expand Up @@ -25,6 +25,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_AUTOCOMMIT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_AUTOCOMMIT_DML_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DEFAULT_TRANSACTION_ISOLATION;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_OPTIMIZER_STATISTICS_PACKAGE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_OPTIMIZER_VERSION;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_READONLY;
Expand All @@ -41,6 +42,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT_DML_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_COMMIT_RESPONSE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_COMMIT_TIMESTAMP;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_OPTIMIZER_STATISTICS_PACKAGE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_OPTIMIZER_VERSION;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READONLY;
Expand Down Expand Up @@ -313,6 +315,22 @@ public StatementResult statementShowReturnCommitStats() {
SHOW_RETURN_COMMIT_STATS);
}

@Override
public StatementResult statementSetDelayTransactionStartUntilFirstWrite(
Boolean delayTransactionStartUntilFirstWrite) {
getConnection().setDelayTransactionStartUntilFirstWrite(delayTransactionStartUntilFirstWrite);
return noResult(SET_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE);
}

@Override
public StatementResult statementShowDelayTransactionStartUntilFirstWrite() {
return resultSet(
String.format(
"%sDELAY_TRANSACTION_START_UNTIL_FIRST_WRITE", getNamespace(connection.getDialect())),
getConnection().isDelayTransactionStartUntilFirstWrite(),
SHOW_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE);
}

@Override
public StatementResult statementSetStatementTag(String tag) {
getConnection().setStatementTag("".equals(tag) ? null : tag);
Expand Down
Expand Up @@ -57,7 +57,7 @@ public void retry(AbortedException aborted) throws AbortedException {
.invokeInterceptors(
RUN_BATCH_STATEMENT, StatementExecutionStep.RETRY_STATEMENT, transaction);
try {
transaction.getReadContext().batchUpdate(statements);
transaction.getTransactionContext().batchUpdate(statements);
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void retry(AbortedException aborted) throws AbortedException {
transaction
.getStatementExecutor()
.invokeInterceptors(statement, StatementExecutionStep.RETRY_STATEMENT, transaction);
transaction.getReadContext().executeUpdate(statement.getStatement());
transaction.getTransactionContext().executeUpdate(statement.getStatement());
} catch (AbortedException e) {
// Propagate abort to force a new retry.
throw e;
Expand Down
Expand Up @@ -96,7 +96,7 @@ void checkAborted() {
}

@Override
void checkValidTransaction(CallType callType) {
void checkOrCreateValidTransaction(ParsedStatement statement, CallType callType) {
if (transaction == null) {
transaction = dbClient.readOnlyTransaction(readOnlyStaleness);
}
Expand Down

0 comments on commit f1cbd16

Please sign in to comment.