diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java index 55acf44abb..f4b59505b0 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java @@ -140,6 +140,7 @@ import com.google.spanner.executor.v1.SpannerAsyncActionResponse; import com.google.spanner.executor.v1.StartBatchTransactionAction; import com.google.spanner.executor.v1.StartTransactionAction; +import com.google.spanner.executor.v1.TransactionExecutionOptions; import com.google.spanner.executor.v1.UpdateCloudBackupAction; import com.google.spanner.executor.v1.UpdateCloudDatabaseDdlAction; import com.google.spanner.executor.v1.UpdateCloudInstanceAction; @@ -227,13 +228,16 @@ private static class ReadWriteTransaction { private Mode finishMode; private SpannerException error; private final String transactionSeed; + private final boolean optimistic; // Set to true when the transaction runner completed, one of these three could happen: runner // committed, abandoned or threw an error. private boolean runnerCompleted; - public ReadWriteTransaction(DatabaseClient dbClient, String transactionSeed) { + public ReadWriteTransaction( + DatabaseClient dbClient, String transactionSeed, boolean optimistic) { this.dbClient = dbClient; this.transactionSeed = transactionSeed; + this.optimistic = optimistic; this.runnerCompleted = false; } @@ -318,7 +322,10 @@ public void startRWTransaction() throws Exception { Runnable runnable = () -> { try { - runner = dbClient.readWriteTransaction(); + runner = + optimistic + ? dbClient.readWriteTransaction(Options.optimisticLock()) + : dbClient.readWriteTransaction(); LOGGER.log(Level.INFO, String.format("Ready to run callable %s\n", transactionSeed)); runner.run(callable); transactionSucceeded(runner.getCommitTimestamp().toProto()); @@ -537,7 +544,8 @@ public synchronized void startReadOnlyTxn( } /** Start a read-write transaction. */ - public synchronized void startReadWriteTxn(DatabaseClient dbClient, Metadata metadata) + public synchronized void startReadWriteTxn( + DatabaseClient dbClient, Metadata metadata, TransactionExecutionOptions options) throws Exception { if ((rwTxn != null) || (roTxn != null) || (batchTxn != null)) { throw SpannerExceptionFactory.newSpannerException( @@ -548,7 +556,7 @@ public synchronized void startReadWriteTxn(DatabaseClient dbClient, Metadata met String.format( "There's no active transaction, safe to create rwTxn: %s\n", getTransactionSeed())); this.metadata = metadata; - rwTxn = new ReadWriteTransaction(dbClient, transactionSeed); + rwTxn = new ReadWriteTransaction(dbClient, transactionSeed, options.getOptimistic()); LOGGER.log( Level.INFO, String.format( @@ -2246,7 +2254,7 @@ private Status executeStartTxn( Level.INFO, "Starting read-write transaction %s\n", executionContext.getTransactionSeed()); - executionContext.startReadWriteTxn(dbClient, metadata); + executionContext.startReadWriteTxn(dbClient, metadata, action.getExecutionOptions()); } executionContext.setDatabaseClient(dbClient); executionContext.initReadState();