From ae0e224307e07125c055aa7db3404afedeca1c7e Mon Sep 17 00:00:00 2001 From: Alexey Scherbakov Date: Fri, 15 Dec 2023 17:12:58 +0300 Subject: [PATCH] IGNITE-17811 Lock table optimizations - Fixes #2720. Signed-off-by: Alexey Scherbakov --- .../cpp/tests/odbc-test/transaction_test.cpp | 2 +- .../benchmark/LockManagerBenchmark.java | 119 +++ .../ignite/distributed/ItLockTableTest.java | 235 ++++++ .../ItTxDistributedTestSingleNode.java | 5 +- ...ributedTestSingleNodeNoCleanupMessage.java | 2 +- .../internal/table/RecordBinaryViewImpl.java | 5 +- .../replicator/PartitionReplicaListener.java | 86 +- .../storage/InternalTableImpl.java | 126 ++- ...titionReplicaListenerIndexLockingTest.java | 2 +- .../PartitionReplicaListenerTest.java | 8 +- .../table/impl/DummyInternalTableImpl.java | 50 +- .../internal/tx/DeadlockPreventionPolicy.java | 5 + .../apache/ignite/internal/tx/LockKey.java | 8 + .../ignite/internal/tx/LockManager.java | 21 +- .../internal/tx/impl/HeapLockManager.java | 309 +++++-- .../tx/impl/HeapUnboundedLockManager.java | 754 ++++++++++++++++++ .../tx/impl/TxCleanupRequestHandler.java | 2 +- .../internal/tx/impl/TxManagerImpl.java | 2 + .../internal/tx/AbstractLockManagerTest.java | 78 +- .../internal/tx/AbstractLockingTest.java | 2 +- .../internal/tx/HeapLockManagerTest.java | 8 +- .../tx/HeapUnboundedLockManagerTest.java | 36 + .../tx/NoWaitDeadlockPreventionTest.java | 2 +- ...NoWaitDeadlockPreventionUnboundedTest.java | 30 + .../tx/NoneDeadlockPreventionTest.java | 2 +- .../NoneDeadlockPreventionUnboundedTest.java | 30 + ...versedDeadlockPreventionUnboundedTest.java | 30 + ...imeoutDeadlockPreventionUnboundedTest.java | 30 + 28 files changed, 1760 insertions(+), 229 deletions(-) create mode 100644 modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java create mode 100644 modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java create mode 100644 modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java diff --git a/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp b/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp index 7b9a4052627..16dace0f3e5 100644 --- a/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp +++ b/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp @@ -575,7 +575,7 @@ TEST_F(transaction_test, transaction_error) { try { insert_test_value(conn2.m_statement, 2, "test_2"); } catch (const odbc_exception &err) { - EXPECT_THAT(err.message, testing::HasSubstr("Failed to acquire a lock due to a conflict")); + EXPECT_THAT(err.message, testing::HasSubstr("Failed to acquire a lock due to a possible deadlock")); // TODO: IGNITE-19944 Propagate SQL errors from engine to driver EXPECT_EQ(err.sql_state, "HY000"); throw; diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java new file mode 100644 index 00000000000..1680ff2c34f --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmark; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.TestHybridClock; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.tx.LockKey; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.LockMode; +import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark lock manager. + */ +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class LockManagerBenchmark { + private LockManager lockManager; + private TransactionIdGenerator generator; + private HybridClock clock; + + /** + * Initializes session and statement. + */ + @Setup + public void setUp() { + lockManager = new HeapLockManager(); + generator = new TransactionIdGenerator(0); + clock = new TestHybridClock(() -> 0L); + } + + /** + * Closes resources. + */ + @TearDown + public void tearDown() throws Exception { + if (!lockManager.isEmpty()) { + throw new AssertionError("Invalid lock manager state"); + } + } + + /** + * Concurrent active transactions. + */ + @Param({"200"}) + private int concTxns; + + /** + * Take and release some locks. + */ + @Benchmark + @Warmup(iterations = 1, time = 3) + @Measurement(iterations = 1, time = 10) + public void lockCommit() { + List ids = new ArrayList<>(concTxns); + + int c = 0; + + for (int i = 0; i < concTxns; i++) { + UUID txId = generator.transactionIdFor(clock.now()); + ids.add(txId); + lockManager.acquire(txId, new LockKey(0, new RowId(0, new UUID(0, c++))), LockMode.X).join(); + } + + for (UUID id : ids) { + lockManager.releaseAll(id); + } + } + + /** + * Benchmark's entry point. + */ + public static void main(String[] args) throws RunnerException { + // TODO JVM args + Options opt = new OptionsBuilder() + .include(".*" + LockManagerBenchmark.class.getSimpleName() + ".*") + .forks(1) + .threads(1) + .mode(Mode.AverageTime) + .build(); + + new Runner(opt).run(); + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java new file mode 100644 index 00000000000..ab97070520f --- /dev/null +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.distributed; + +import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.configuration.GcConfiguration; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; +import org.apache.ignite.internal.tx.HybridTimestampTracker; +import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; +import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState; +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; +import org.apache.ignite.internal.tx.impl.TransactionIdGenerator; +import org.apache.ignite.internal.tx.impl.TxManagerImpl; +import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.ClusterService; +import org.apache.ignite.raft.jraft.test.TestUtils; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test lock table. + */ +@ExtendWith(ConfigurationExtension.class) +public class ItLockTableTest extends IgniteAbstractTest { + private static final IgniteLogger LOG = Loggers.forClass(ItLockTableTest.class); + + private static int EMP_TABLE_ID = 2; + + private static final int CACHE_SIZE = 10; + + private static final String TABLE_NAME = "test"; + + private static SchemaDescriptor TABLE_SCHEMA = new SchemaDescriptor( + 1, + new Column[]{new Column("id".toUpperCase(), NativeTypes.INT32, false)}, + new Column[]{ + new Column("name".toUpperCase(), NativeTypes.STRING, true), + new Column("salary".toUpperCase(), NativeTypes.DOUBLE, true) + } + ); + + protected TableViewInternal testTable; + + protected final TestInfo testInfo; + + //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195 + @InjectConfiguration("mock: { fsync: false }") + protected static RaftConfiguration raftConfiguration; + + @InjectConfiguration + protected static GcConfiguration gcConfig; + + @InjectConfiguration + protected static TransactionConfiguration txConfiguration; + + private ItTxTestCluster txTestCluster; + + private HybridTimestampTracker timestampTracker = new HybridTimestampTracker(); + + /** + * The constructor. + * + * @param testInfo Test info. + */ + public ItLockTableTest(TestInfo testInfo) { + this.testInfo = testInfo; + } + + @BeforeEach + public void before() throws Exception { + txTestCluster = new ItTxTestCluster( + testInfo, + raftConfiguration, + txConfiguration, + workDir, + 1, + 1, + false, + timestampTracker + ) { + @Override + protected TxManagerImpl newTxManager( + ClusterService clusterService, + ReplicaService replicaSvc, + HybridClock clock, + TransactionIdGenerator generator, + ClusterNode node, + PlacementDriver placementDriver + ) { + return new TxManagerImpl( + txConfiguration, + clusterService, + replicaSvc, + new HeapLockManager( + DeadlockPreventionPolicy.NO_OP, + HeapLockManager.SLOTS, + CACHE_SIZE, + new HeapUnboundedLockManager()), + clock, + generator, + placementDriver, + () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + ); + } + }; + txTestCluster.prepareCluster(); + + testTable = txTestCluster.startTable(TABLE_NAME, EMP_TABLE_ID, TABLE_SCHEMA); + + log.info("Tables have been started"); + } + + @AfterEach + public void after() throws Exception { + txTestCluster.shutdownCluster(); + } + + /** + * Test that a lock table behaves correctly in case of lock cache overflow. + */ + @Test + public void testCollision() { + RecordView view = testTable.recordView(); + + int i = 0; + final int count = 1000; + List txns = new ArrayList<>(); + while (i++ < count) { + Transaction tx = txTestCluster.igniteTransactions().begin(); + view.insertAsync(tx, tuple(i, "x-" + i)); + txns.add(tx); + } + + assertTrue(TestUtils.waitForCondition(() -> { + int total = 0; + HeapLockManager lockManager = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager(); + for (int j = 0; j < lockManager.getSlots().length; j++) { + LockState slot = lockManager.getSlots()[j]; + total += slot.waitersCount(); + } + + return total == count && lockManager.available() == 0; + }, 10_000), "Some lockers are missing"); + + int empty = 0; + int coll = 0; + + HeapLockManager lm = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager(); + for (int j = 0; j < lm.getSlots().length; j++) { + LockState slot = lm.getSlots()[j]; + int cnt = slot.waitersCount(); + if (cnt == 0) { + empty++; + } + if (cnt > 1) { + coll += cnt; + } + } + + LOG.info("LockTable [emptySlots={} collisions={}]", empty, coll); + + assertTrue(coll > 0); + + List> finishFuts = new ArrayList<>(); + for (Transaction txn : txns) { + finishFuts.add(txn.commitAsync()); + } + + for (CompletableFuture finishFut : finishFuts) { + finishFut.join(); + } + + assertTrue(TestUtils.waitForCondition(() -> { + int total = 0; + HeapLockManager lockManager = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager(); + for (int j = 0; j < lockManager.getSlots().length; j++) { + LockState slot = lockManager.getSlots()[j]; + total += slot.waitersCount(); + } + + return total == 0 && lockManager.available() == CACHE_SIZE; + }, 10_000), "Illegal lock manager state"); + } + + private static Tuple tuple(int id, String name) { + return Tuple.create() + .set("id", id) + .set("name", name); + } + + private static Tuple keyTuple(int id) { + return Tuple.create() + .set("id", id); + } +} diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java index 28bb29fdba0..2b4f99ce358 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java @@ -75,10 +75,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest { //TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195 @InjectConfiguration("mock: { fsync: false }") - protected static RaftConfiguration raftConfiguration; + protected RaftConfiguration raftConfiguration; @InjectConfiguration - protected static TransactionConfiguration txConfiguration; + protected TransactionConfiguration txConfiguration; /** * Returns a count of nodes. @@ -153,6 +153,7 @@ public void before() throws Exception { @AfterEach public void after() throws Exception { txTestCluster.shutdownCluster(); + Mockito.framework().clearInlineMocks(); } /** diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index 4a33e063f25..1f6c7aa1304 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -253,6 +253,6 @@ public void testTwoReadWriteTransactionsWaitForCleanup() throws TransactionExcep } private static void releaseTxLocks(UUID txId, LockManager lockManager) { - lockManager.locks(txId).forEachRemaining(lockManager::release); + lockManager.releaseAll(txId); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java index 7ee855b2a3b..f1c943de946 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java @@ -429,9 +429,8 @@ public CompletableFuture streamData(Publisher publisher, @Nullable Objects.requireNonNull(publisher); var partitioner = new TupleStreamerPartitionAwarenessProvider(rowConverter.registry(), tbl.partitions()); - StreamerBatchSender batchSender = (partitionId, items) -> withSchemaSync(null, (schemaVersion) -> { - return this.tbl.upsertAll(mapToBinary(items, schemaVersion, false), partitionId); - }); + StreamerBatchSender batchSender = (partitionId, items) -> withSchemaSync(null, + (schemaVersion) -> this.tbl.upsertAll(mapToBinary(items, schemaVersion, false), partitionId)); return DataStreamer.streamData(publisher, options, batchSender, partitioner); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index 99e806dc6dc..7c0be5ba651 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -1187,17 +1187,19 @@ private CompletableFuture> lookupIndex( BinaryTuple exactKey = request.exactKey().asBinaryTuple(); return lockManager.acquire(txId, new LockKey(indexId), LockMode.IS).thenCompose(idxLock -> { // Index IS lock - return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS).thenCompose(tblLock -> { // Table IS lock - return lockManager.acquire(txId, new LockKey(indexId, exactKey.byteBuffer()), LockMode.S) - .thenCompose(indRowLock -> { // Hash index bucket S lock - Cursor cursor = (Cursor) cursors.computeIfAbsent(cursorId, id -> indexStorage.get(exactKey)); + return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS) // Table IS lock + .thenCompose(tblLock -> { + return lockManager.acquire(txId, new LockKey(indexId, exactKey.byteBuffer()), LockMode.S) + .thenCompose(indRowLock -> { // Hash index bucket S lock + Cursor cursor = (Cursor) cursors.computeIfAbsent(cursorId, + id -> indexStorage.get(exactKey)); - var result = new ArrayList(batchCount); + var result = new ArrayList(batchCount); - return continueIndexLookup(txId, cursor, batchCount, result) - .thenApply(ignore -> result); - }); - }); + return continueIndexLookup(txId, cursor, batchCount, result) + .thenApply(ignore -> result); + }); + }); }); } @@ -1230,45 +1232,47 @@ private CompletableFuture> scanSortedIndex( int flags = request.flags(); return lockManager.acquire(txId, new LockKey(indexId), LockMode.IS).thenCompose(idxLock -> { // Index IS lock - return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS).thenCompose(tblLock -> { // Table IS lock - var comparator = new BinaryTupleComparator(indexStorage.indexDescriptor().columns()); + return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IS) // Table IS lock + .thenCompose(tblLock -> { + var comparator = new BinaryTupleComparator(indexStorage.indexDescriptor().columns()); - Predicate isUpperBoundAchieved = indexRow -> { - if (indexRow == null) { - return true; - } + Predicate isUpperBoundAchieved = indexRow -> { + if (indexRow == null) { + return true; + } - if (upperBound == null) { - return false; - } + if (upperBound == null) { + return false; + } - ByteBuffer buffer = upperBound.byteBuffer(); + ByteBuffer buffer = upperBound.byteBuffer(); - if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 0) { - byte boundFlags = buffer.get(0); + if ((flags & SortedIndexStorage.LESS_OR_EQUAL) != 0) { + byte boundFlags = buffer.get(0); - buffer.put(0, (byte) (boundFlags | BinaryTupleCommon.EQUALITY_FLAG)); - } + buffer.put(0, (byte) (boundFlags | BinaryTupleCommon.EQUALITY_FLAG)); + } - return comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0; - }; + return comparator.compare(indexRow.indexColumns().byteBuffer(), buffer) >= 0; + }; - Cursor cursor = (Cursor) cursors.computeIfAbsent(cursorId, - id -> indexStorage.scan( - lowerBound, - // We have to handle upperBound on a level of replication listener, - // for correctness of taking of a range lock. - null, - flags - )); + Cursor cursor = (Cursor) cursors.computeIfAbsent(cursorId, + id -> indexStorage.scan( + lowerBound, + // We have to handle upperBound on a level of replication listener, + // for correctness of taking of a range lock. + null, + flags + )); - SortedIndexLocker indexLocker = (SortedIndexLocker) indexesLockers.get().get(indexId); + SortedIndexLocker indexLocker = (SortedIndexLocker) indexesLockers.get().get(indexId); - var result = new ArrayList(batchCount); + var result = new ArrayList(batchCount); - return continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, cursor, batchCount, result, isUpperBoundAchieved) - .thenApply(ignore -> result); - }); + return continueIndexScan(txId, schemaAwareIndexStorage, indexLocker, cursor, batchCount, result, + isUpperBoundAchieved) + .thenApply(ignore -> result); + }); }); } @@ -1810,7 +1814,7 @@ private static CompletableFuture allOfFuturesExceptionIgnored(List> processReadOnlyDirectMultiEntryAction */ private CompletableFuture processMultiEntryAction(ReadWriteMultiRowReplicaRequest request, String txCoordinatorId) { UUID txId = request.transactionId(); - TablePartitionId committedPartitionId = request.commitPartitionId().asTablePartitionId(); + TablePartitionId commitPartitionId = request.commitPartitionId().asTablePartitionId(); List searchRows = request.binaryRows(); - assert committedPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']'; + assert commitPartitionId != null : "Commit partition is null [type=" + request.requestType() + ']'; switch (request.requestType()) { case RW_DELETE_EXACT_ALL: { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index cfc03ef6012..00300600858 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -111,6 +111,7 @@ import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; @@ -258,13 +259,15 @@ public String name() { * @param tx The transaction, not null if explicit. * @param fac Replica requests factory. * @param noWriteChecker Used to handle operations producing no updates. + * @param retryOnLockConflict {@code True} to retry on lock conflict. * @return The future. */ private CompletableFuture enlistInTx( BinaryRowEx row, @Nullable InternalTransaction tx, IgniteTriFunction fac, - BiPredicate noWriteChecker + BiPredicate noWriteChecker, + boolean retryOnLockConflict ) { // Check whether proposed tx is read-only. Complete future exceptionally if true. // Attempting to enlist a read-only in a read-write transaction does not corrupt the transaction itself, thus read-write transaction @@ -293,10 +296,10 @@ private CompletableFuture enlistInTx( assert !implicit; fut = trackingInvoke(actualTx, partId, term -> fac.apply(actualTx, partGroupId, term), false, primaryReplicaAndTerm, - noWriteChecker); + noWriteChecker, retryOnLockConflict); } else { fut = enlistWithRetry(actualTx, partId, term -> fac.apply(actualTx, partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION, - implicit, noWriteChecker); + implicit, noWriteChecker, retryOnLockConflict); } return postEnlist(fut, false, actualTx, implicit); @@ -310,6 +313,7 @@ private CompletableFuture enlistInTx( * @param fac Replica requests factory. * @param reducer Transform reducer. * @param noOpChecker Used to handle no-op operations (producing no updates). + * @param retryOnLockConflict {@code True} to retry on lock conflict. * @return The future. */ private CompletableFuture enlistInTx( @@ -317,9 +321,10 @@ private CompletableFuture enlistInTx( @Nullable InternalTransaction tx, IgnitePentaFunction< Collection, InternalTransaction, ReplicationGroupId, Long, Boolean, ReplicaRequest - > fac, + > fac, Function, CompletableFuture> reducer, - BiPredicate noOpChecker + BiPredicate noOpChecker, + boolean retryOnLockConflict ) { // Check whether proposed tx is read-only. Complete future exceptionally if true. // Attempting to enlist a read-only in a read-write transaction does not corrupt the transaction itself, thus read-write transaction @@ -355,7 +360,7 @@ private CompletableFuture enlistInTx( assert !implicit; fut = trackingInvoke(actualTx, partitionId, term -> fac.apply(rowBatch.requestedRows, actualTx, partGroupId, term, false), - false, primaryReplicaAndTerm, noOpChecker); + false, primaryReplicaAndTerm, noOpChecker, retryOnLockConflict); } else { fut = enlistWithRetry( actualTx, @@ -363,7 +368,8 @@ private CompletableFuture enlistInTx( term -> fac.apply(rowBatch.requestedRows, actualTx, partGroupId, term, full), ATTEMPTS_TO_ENLIST_PARTITION, full, - noOpChecker + noOpChecker, + retryOnLockConflict ); } @@ -433,7 +439,7 @@ private CompletableFuture> enlistCursorInTx( if (primaryReplicaAndTerm != null) { fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), mapFunc.apply(primaryReplicaAndTerm.get2())); } else { - fut = enlistWithRetry(tx, partId, mapFunc, ATTEMPTS_TO_ENLIST_PARTITION, false, null); + fut = enlistWithRetry(tx, partId, mapFunc, ATTEMPTS_TO_ENLIST_PARTITION, false, null, false); } return postEnlist(fut, false, tx, false); @@ -459,6 +465,7 @@ private CompletableFuture> enlistCursorInTx( * @param attempts Number of attempts. * @param full {@code True} if is a full transaction. * @param noWriteChecker Used to handle operations producing no updates. + * @param retryOnLockConflict {@code True} to retry on lock conflict. * @return The future. */ private CompletableFuture enlistWithRetry( @@ -467,25 +474,31 @@ private CompletableFuture enlistWithRetry( Function mapFunc, int attempts, boolean full, - @Nullable BiPredicate noWriteChecker + @Nullable BiPredicate noWriteChecker, + boolean retryOnLockConflict ) { - return enlist(partId, tx) - .thenCompose(primaryReplicaAndTerm -> trackingInvoke(tx, partId, mapFunc, full, primaryReplicaAndTerm, noWriteChecker)) - .handle((response, e) -> { - if (e == null) { - return completedFuture(response); - } + return (CompletableFuture) enlist(partId, tx) + .thenCompose(primaryReplicaAndTerm -> trackingInvoke(tx, partId, mapFunc, full, primaryReplicaAndTerm, noWriteChecker, + retryOnLockConflict)) + .handle((res0, e) -> { + if (e != null) { + // We can safely retry indefinitely on deadlock prevention. + if (retryOnLockConflict && e.getCause() instanceof LockException) { + return enlistWithRetry(tx, partId, mapFunc, attempts, full, noWriteChecker, true); + } - if (attempts > 0 && e.getCause() instanceof PrimaryReplicaMissException) { - LOG.info("Primary replica for partition {} changed, retrying the request. Remaining attempts: {}", - partId, attempts - 1); + if (attempts > 0 && e.getCause() instanceof PrimaryReplicaMissException) { + LOG.info("Primary replica for partition {} changed, retrying the request. Remaining attempts: {}", + partId, attempts - 1); - return enlistWithRetry(tx, partId, mapFunc, attempts - 1, full, noWriteChecker); - } else { - return CompletableFuture.failedFuture(e); + return enlistWithRetry(tx, partId, mapFunc, attempts - 1, full, noWriteChecker, retryOnLockConflict); + } + + return failedFuture(e); } - }) - .thenCompose(Function.identity()); + + return completedFuture(res0); + }).thenCompose(x -> x); } /** @@ -497,6 +510,7 @@ private CompletableFuture enlistWithRetry( * @param full {@code True} for a full transaction. * @param primaryReplicaAndTerm Replica and term. * @param noWriteChecker Used to handle operations producing no updates. + * @param retryOnLockConflict {@code True} to retry on lock conflics. * @return The future. */ private CompletableFuture trackingInvoke( @@ -505,7 +519,8 @@ private CompletableFuture trackingInvoke( Function mapFunc, boolean full, IgniteBiTuple primaryReplicaAndTerm, - @Nullable BiPredicate noWriteChecker + @Nullable BiPredicate noWriteChecker, + boolean retryOnLockConflict ) { ReplicaRequest request = mapFunc.apply(primaryReplicaAndTerm.get2()); @@ -537,6 +552,17 @@ private CompletableFuture trackingInvoke( } return res; + }).exceptionally(e -> { + if (retryOnLockConflict && e.getCause() instanceof LockException) { + txManager.removeInflight(tx.id()); // Will be retried. + } + + if (e.getCause() instanceof PrimaryReplicaMissException) { + txManager.removeInflight(tx.id()); // Will be retried. + } + + ExceptionUtils.sneakyThrow(e); + return null; // Unreachable. }); } else { return replicaSvc.invoke(primaryReplicaAndTerm.get1(), request); @@ -662,7 +688,7 @@ private CompletableFuture evaluateReadOnlyPrimaryNode( TimeUnit.SECONDS ); - CompletableFuture fut = primaryReplicaFuture.thenCompose(primaryReplica -> { + CompletableFuture fut = primaryReplicaFuture.thenCompose(primaryReplica -> { try { ClusterNode node = clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder()); @@ -754,7 +780,8 @@ public CompletableFuture get(BinaryRowEx keyRow, InternalTransaction .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> false + (res, req) -> false, + false ); } @@ -823,7 +850,7 @@ public CompletableFuture> getAll(Collection keyRows BinaryRowEx firstRow = keyRows.iterator().next(); return evaluateReadOnlyRecipientNode(partitionId(firstRow)) - .thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), recipientNode)); + .thenCompose(recipientNode -> getAll(keyRows, tx.readTimestamp(), recipientNode)); } return enlistInTx( @@ -831,7 +858,8 @@ public CompletableFuture> getAll(Collection keyRows tx, (keyRows0, txo, groupId, term, full) -> readWriteMultiRowPkReplicaRequest(RW_GET_ALL, keyRows0, txo, groupId, term, full), InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder, - (res, req) -> false + (res, req) -> false, + false ); } @@ -948,7 +976,8 @@ public CompletableFuture upsert(BinaryRowEx row, InternalTransaction tx) { .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> false + (res, req) -> false, + false ); } @@ -960,7 +989,8 @@ public CompletableFuture upsertAll(Collection rows, InternalT tx, this::upsertAllInternal, RowBatch::allResultFutures, - (res, req) -> false + (res, req) -> false, + false ); } @@ -976,7 +1006,8 @@ public CompletableFuture upsertAll(Collection rows, int parti term -> upsertAllInternal(rows, tx, partGroupId, term, true), ATTEMPTS_TO_ENLIST_PARTITION, true, - null + null, + true // Allow auto retries for data streamer. ); return postEnlist(fut, false, tx, true); // Will be committed in one RTT. @@ -999,7 +1030,8 @@ public CompletableFuture getAndUpsert(BinaryRowEx row, InternalTransa .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> false + (res, req) -> false, + false ); } @@ -1020,7 +1052,8 @@ public CompletableFuture insert(BinaryRowEx row, InternalTransaction tx .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> !res + (res, req) -> !res, + false ); } @@ -1043,7 +1076,8 @@ public CompletableFuture> insertAll(Collection rows // All values are null, this means nothing was deleted. return true; - } + }, + false ); } @@ -1087,7 +1121,8 @@ public CompletableFuture replace(BinaryRowEx row, InternalTransaction t .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> !res + (res, req) -> !res, + false ); } @@ -1112,7 +1147,8 @@ public CompletableFuture replace(BinaryRowEx oldRow, BinaryRowEx newRow .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> !res + (res, req) -> !res, + false ); } @@ -1133,7 +1169,8 @@ public CompletableFuture getAndReplace(BinaryRowEx row, InternalTrans .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> res == null + (res, req) -> res == null, + false ); } @@ -1154,7 +1191,8 @@ public CompletableFuture delete(BinaryRowEx keyRow, InternalTransaction .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> !res + (res, req) -> !res, + false ); } @@ -1175,7 +1213,8 @@ public CompletableFuture deleteExact(BinaryRowEx oldRow, InternalTransa .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> !res + (res, req) -> !res, + false ); } @@ -1196,7 +1235,8 @@ public CompletableFuture getAndDelete(BinaryRowEx row, InternalTransa .timestampLong(clock.nowLong()) .full(tx == null) .build(), - (res, req) -> res == null + (res, req) -> res == null, + false ); } @@ -1219,7 +1259,8 @@ public CompletableFuture> deleteAll(Collection rows // All values are null, this means nothing was deleted. return true; - } + }, + false ); } @@ -1245,7 +1286,8 @@ public CompletableFuture> deleteAllExact( // All values are null, this means nothing was deleted. return true; - } + }, + false ); } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java index 6de0400b36e..cce80387177 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java @@ -278,7 +278,7 @@ public void beforeTest() { ((TestHashIndexStorage) pkStorage.get().storage()).clear(); TEST_MV_PARTITION_STORAGE.clear(); - locks().forEach(LOCK_MANAGER::release); + LOCK_MANAGER.releaseAll(TRANSACTION_ID); } /** Verifies the mode in which the lock was acquired on the index key for a particular operation. */ diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 8dc8c9ee289..8f4051a4ff7 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -184,6 +184,7 @@ import org.apache.ignite.tx.TransactionException; import org.hamcrest.Matcher; import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -243,7 +244,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } } - lockManager.locks(cmd.txId()).forEachRemaining(lockManager::release); + lockManager.releaseAll(cmd.txId()); } else if (cmd instanceof UpdateCommand) { pendingRows.compute(cmd.txId(), (txId, v) -> { if (v == null) { @@ -509,6 +510,11 @@ public ClusterNode getByConsistentId(String consistentId) { reset(); } + @AfterEach + public void clearMocks() { + Mockito.framework().clearInlineMocks(); + } + private static SchemaDescriptor schemaDescriptorWith(int ver) { return new SchemaDescriptor(ver, new Column[]{ new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index a078d13eb81..0574317e938 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.placementdriver.PlacementDriver; import org.apache.ignite.internal.placementdriver.TestPlacementDriver; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.Peer; @@ -107,10 +108,12 @@ import org.apache.ignite.network.TopologyService; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Dummy table storage implementation. */ +@TestOnly public class DummyInternalTableImpl extends InternalTableImpl { public static final IgniteLogger LOG = Loggers.forClass(DummyInternalTableImpl.class); @@ -118,8 +121,6 @@ public class DummyInternalTableImpl extends InternalTableImpl { public static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("id", "node", ADDR); - private static final TestPlacementDriver TEST_PLACEMENT_DRIVER = new TestPlacementDriver(LOCAL_NODE); - // 2000 was picked to avoid negative time that we get when building read timestamp // in TxManagerImpl.currentReadTimestamp. // We subtract (ReplicaManager.IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS + HybridTimestamp.CLOCK_SKEW) = (1000 + 7) = 1007 @@ -158,29 +159,24 @@ public class DummyInternalTableImpl extends InternalTableImpl { * @param txConfiguration Transaction configuration. */ public DummyInternalTableImpl(ReplicaService replicaSvc, SchemaDescriptor schema, TransactionConfiguration txConfiguration) { - this(replicaSvc, new TestMvPartitionStorage(0), schema, txConfiguration); + this(replicaSvc, new TestMvPartitionStorage(0), schema, new TestPlacementDriver(LOCAL_NODE), txConfiguration); } /** * Creates a new local table. * * @param replicaSvc Replica service. - * @param txManager Transaction manager. - * @param crossTableUsage If this dummy table is going to be used in cross-table tests, it won't mock the calls of - * ReplicaService by itself. - * @param transactionStateResolver Transaction state resolver. - * @param schema Schema descriptor. - * @param tracker Observable timestamp tracker. + * @param storage Storage. + * @param schema Schema. + * @param txConfiguration Transaction configuration. */ public DummyInternalTableImpl( ReplicaService replicaSvc, - TxManager txManager, - boolean crossTableUsage, - @Nullable TransactionStateResolver transactionStateResolver, + MvPartitionStorage storage, SchemaDescriptor schema, - HybridTimestampTracker tracker + TransactionConfiguration txConfiguration ) { - this(replicaSvc, new TestMvPartitionStorage(0), txManager, crossTableUsage, transactionStateResolver, schema, tracker); + this(replicaSvc, storage, schema, new TestPlacementDriver(LOCAL_NODE), txConfiguration); } /** @@ -195,16 +191,18 @@ public DummyInternalTableImpl( ReplicaService replicaSvc, MvPartitionStorage mvPartStorage, SchemaDescriptor schema, + PlacementDriver placementDriver, TransactionConfiguration txConfiguration ) { this( replicaSvc, mvPartStorage, - txManager(replicaSvc, txConfiguration), + txManager(replicaSvc, placementDriver, txConfiguration), false, null, schema, - new HybridTimestampTracker() + new HybridTimestampTracker(), + placementDriver ); } @@ -219,6 +217,7 @@ public DummyInternalTableImpl( * @param transactionStateResolver Transaction state resolver. * @param schema Schema descriptor. * @param tracker Observable timestamp tracker. + * @param placementDriver Placement driver. */ public DummyInternalTableImpl( ReplicaService replicaSvc, @@ -227,7 +226,8 @@ public DummyInternalTableImpl( boolean crossTableUsage, @Nullable TransactionStateResolver transactionStateResolver, SchemaDescriptor schema, - HybridTimestampTracker tracker + HybridTimestampTracker tracker, + PlacementDriver placementDriver ) { super( "test", @@ -241,7 +241,7 @@ public DummyInternalTableImpl( replicaSvc, CLOCK, tracker, - TEST_PLACEMENT_DRIVER + placementDriver ); RaftGroupService svc = raftGroupServiceByPartitionId.get(PART_ID); @@ -379,7 +379,7 @@ public void result(@Nullable Serializable r) { LOCAL_NODE, new AlwaysSyncedSchemaSyncService(), catalogService, - TEST_PLACEMENT_DRIVER, + new TestPlacementDriver(LOCAL_NODE), mock(ClusterNodeResolver.class) ); @@ -424,9 +424,14 @@ public TxManager txManager() { * Creates a {@link TxManager}. * * @param replicaSvc Replica service to use. + * @param placementDriver Placement driver. * @param txConfiguration Transaction configuration. */ - public static TxManagerImpl txManager(ReplicaService replicaSvc, TransactionConfiguration txConfiguration) { + public static TxManagerImpl txManager( + ReplicaService replicaSvc, + PlacementDriver placementDriver, + TransactionConfiguration txConfiguration + ) { TopologyService topologyService = mock(TopologyService.class); when(topologyService.localMember()).thenReturn(LOCAL_NODE); @@ -442,7 +447,7 @@ public static TxManagerImpl txManager(ReplicaService replicaSvc, TransactionConf new HeapLockManager(), CLOCK, new TransactionIdGenerator(0xdeadbeef), - TEST_PLACEMENT_DRIVER, + placementDriver, () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS ); @@ -494,8 +499,7 @@ public void addIndexToWaitIfAbsent(int indexId) { } /** - * Dummy messaging service for tests purposes. - * It does not provide any messaging functionality, but allows to trigger events. + * Dummy messaging service for tests purposes. It does not provide any messaging functionality, but allows to trigger events. */ private static class DummyMessagingService extends AbstractMessagingService { private final String localNodeName; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DeadlockPreventionPolicy.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DeadlockPreventionPolicy.java index eaf376860e0..7b5b02ce316 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DeadlockPreventionPolicy.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/DeadlockPreventionPolicy.java @@ -27,6 +27,11 @@ * See also {@link org.apache.ignite.internal.tx.impl.HeapLockManager}. */ public interface DeadlockPreventionPolicy { + /** + * No-op policy which does nothing to prevent deadlocks. + */ + DeadlockPreventionPolicy NO_OP = new DeadlockPreventionPolicy() {}; + /** * Comparator for transaction ids that allows to set transaction priority, if deadlock prevention policy requires this priority. * The transaction with higher id has lower priority. If this comparator is {@code null} then behavior of any transaction diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java index 3ec60e170f7..9569c04459e 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.tx; +import java.nio.ByteBuffer; import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.util.HashUtils; /** Lock key. */ public class LockKey { @@ -73,6 +75,12 @@ public boolean equals(Object o) { @Override public int hashCode() { + // Apply more efficient hashing to byte buffers to decrease collisions + if (key instanceof ByteBuffer) { + ByteBuffer key1 = (ByteBuffer) key; + return HashUtils.hash32(HashUtils.hash64(key1, 0, key1.capacity(), contextId != null ? contextId.hashCode() : 0)); + } + int result = contextId != null ? contextId.hashCode() : 0; result = 31 * result + (key != null ? key.hashCode() : 0); return result; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java index 54e65adea07..65e6132cd62 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockManager.java @@ -36,14 +36,15 @@ public interface LockManager extends EventProducer acquire(UUID txId, LockKey lockKey, LockMode lockMode); + CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lockMode); /** * Attempts to release the specified lock. * * @param lock Lock to release. */ - public void release(Lock lock); + @TestOnly + void release(Lock lock); /** * Release a lock that is held on the specific mode on the specific key. @@ -60,7 +61,15 @@ public interface LockManager extends EventProducer locks(UUID txId); + @TestOnly + Iterator locks(UUID txId); + + /** + * Release all locks associated with a transaction. + * + * @param txId Tx id. + */ + void releaseAll(UUID txId); /** * Returns a collection of transaction ids that is associated with the specified {@code key}. @@ -69,7 +78,7 @@ public interface LockManager extends EventProducer queue(LockKey key); + Collection queue(LockKey key); /** * Returns a waiter associated with the specified {@code key}. @@ -79,7 +88,7 @@ public interface LockManager extends EventProducerLock waiters are placed in the queue, ordered according to comparator provided by {@link HeapLockManager#deadlockPreventionPolicy}. - * When a new waiter is placed in the queue, it's validated against current lock owner: if there is an owner with a higher transaction id - * lock request is denied. + * When a new waiter is placed in the queue, it's validated against current lock owner: if there is an owner with a higher priority (as + * defined by comparator) lock request is denied. * *

Read lock can be upgraded to write lock (only available for the lowest read-locked entry of * the queue). + * + *

Additionally limits the lock map size. */ public class HeapLockManager extends AbstractEventProducer implements LockManager { - private ConcurrentHashMap locks = new ConcurrentHashMap<>(); + /** + * Table size. TODO make it configurable IGNITE-20694 + */ + public static final int SLOTS = 131072; + /** + * Empty slots. + */ + private final ConcurrentLinkedQueue empty = new ConcurrentLinkedQueue<>(); + + /** + * Mapped slots. + */ + private final ConcurrentHashMap locks; + + /** + * Raw slots. + */ + private final LockState[] slots; + + /** + * The policy. + */ private final DeadlockPreventionPolicy deadlockPreventionPolicy; - /** Executor that is used to fail waiters after timeout. */ + /** + * Executor that is used to fail waiters after timeout. + */ private final Executor delayedExecutor; + /** + * Enlisted transactions. + */ + private final ConcurrentHashMap> txMap = new ConcurrentHashMap<>(1024); + + /** + * Parent lock manager. + * TODO asch Needs optimization https://issues.apache.org/jira/browse/IGNITE-20895 + */ + private final LockManager parentLockManager; + /** * Constructor. */ public HeapLockManager() { - this(new WaitDieDeadlockPreventionPolicy()); + this(new WaitDieDeadlockPreventionPolicy(), SLOTS, SLOTS, new HeapUnboundedLockManager()); + } + + /** + * Constructor. + */ + public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) { + this(deadlockPreventionPolicy, SLOTS, SLOTS, new HeapUnboundedLockManager()); } /** * Constructor. * * @param deadlockPreventionPolicy Deadlock prevention policy. + * @param maxSize Raw slots size. + * @param mapSize Lock map size. */ - public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) { + public HeapLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy, int maxSize, int mapSize, LockManager parentLockManager) { + if (mapSize > maxSize) { + throw new IllegalArgumentException("maxSize=" + maxSize + " < mapSize=" + mapSize); + } + + this.parentLockManager = Objects.requireNonNull(parentLockManager); this.deadlockPreventionPolicy = deadlockPreventionPolicy; this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0 ? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS) : null; + + locks = new ConcurrentHashMap<>(mapSize); + + LockState[] tmp = new LockState[maxSize]; + for (int i = 0; i < tmp.length; i++) { + LockState lockState = new LockState(); + if (i < mapSize) { + empty.add(lockState); + } + tmp[i] = lockState; + } + + slots = tmp; // Atomic init. } @Override public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lockMode) { + if (lockKey.contextId() == null) { // Treat this lock as a hierarchy lock. + return parentLockManager.acquire(txId, lockKey, lockMode); + } + while (true) { LockState state = lockState(lockKey); IgniteBiTuple, LockMode> futureTuple = state.tryAcquire(txId, lockMode); if (futureTuple.get1() == null) { - continue; // Obsolete state. + continue; // State is marked for remove, need retry. } LockMode newLockMode = futureTuple.get2(); @@ -107,11 +179,22 @@ public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lock } @Override + @TestOnly public void release(Lock lock) { LockState state = lockState(lock.lockKey()); if (state.tryRelease(lock.txId())) { - locks.remove(lock.lockKey(), state); + locks.compute(lock.lockKey(), (k, v) -> { + // Mapping may already change. + if (v != state || !v.markedForRemove) { + return v; + } + + // markedForRemove state should be cleared on entry reuse to avoid race. + v.key = null; + empty.add(v); + return null; + }); } } @@ -120,26 +203,61 @@ public void release(UUID txId, LockKey lockKey, LockMode lockMode) { LockState state = lockState(lockKey); if (state.tryRelease(txId, lockMode)) { - locks.remove(lockKey, state); + locks.compute(lockKey, (k, v) -> { + // Mapping may already change. + if (v != state || !v.markedForRemove) { + return v; + } + + v.key = null; + empty.add(v); + return null; + }); + } + } + + @Override + public void releaseAll(UUID txId) { + ConcurrentLinkedQueue states = this.txMap.remove(txId); + + if (states != null) { + for (LockState state : states) { + if (state.tryRelease(txId)) { + LockKey key = state.key; // State may be already invalidated. + if (key != null) { + locks.compute(key, (k, v) -> { + // Mapping may already change. + if (v != state || !v.markedForRemove) { + return v; + } + + v.key = null; + empty.add(v); + return null; + }); + } + } + } } + + parentLockManager.releaseAll(txId); } @Override public Iterator locks(UUID txId) { - // TODO: IGNITE-17811 Use index or similar instead of full locks set iteration. + ConcurrentLinkedQueue lockStates = txMap.get(txId); + + if (lockStates == null) { + return emptyIterator(); + } + List result = new ArrayList<>(); - for (Map.Entry entry : locks.entrySet()) { - Waiter waiter = entry.getValue().waiter(txId); + for (LockState lockState : lockStates) { + Waiter waiter = lockState.waiter(txId); if (waiter != null) { - result.add( - new Lock( - entry.getKey(), - waiter.lockMode(), - txId - ) - ); + result.add(new Lock(lockState.key, waiter.lockMode(), txId)); } } @@ -152,7 +270,29 @@ public Iterator locks(UUID txId) { * @param key The key. */ private LockState lockState(LockKey key) { - return locks.computeIfAbsent(key, k -> new LockState(deadlockPreventionPolicy, delayedExecutor)); + int h = spread(key.hashCode()); + int index = h & (slots.length - 1); + + LockState[] res = new LockState[1]; + + locks.compute(key, (k, v) -> { + if (v == null) { + if (empty.isEmpty()) { + res[0] = slots[index]; + } else { + v = empty.poll(); + v.markedForRemove = false; + v.key = k; + res[0] = v; + } + } else { + res[0] = v; + } + + return v; + }); + + return res[0]; } /** {@inheritDoc} */ @@ -167,28 +307,36 @@ public Waiter waiter(LockKey key, UUID txId) { return lockState(key).waiter(txId); } + /** {@inheritDoc} */ + @Override + public boolean isEmpty() { + for (LockState slot : slots) { + if (!slot.waiters.isEmpty()) { + return false; + } + } + + return true; + } + /** * A lock state. */ - private class LockState { + public class LockState { /** Waiters. */ private final TreeMap waiters; - private final DeadlockPreventionPolicy deadlockPreventionPolicy; - - /** Delayed executor for waiters timeout callback. */ - private final Executor delayedExecutor; - /** Marked for removal flag. */ - private boolean markedForRemove = false; + private volatile boolean markedForRemove = false; - public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, Executor delayedExecutor) { + /** Lock key. */ + private volatile LockKey key; + + LockState() { Comparator txComparator = deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; this.waiters = new TreeMap<>(txComparator); - this.deadlockPreventionPolicy = deadlockPreventionPolicy; - this.delayedExecutor = delayedExecutor; } /** @@ -198,7 +346,7 @@ public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, Executor del * @param lockMode Lock mode. * @return The future or null if state is marked for removal and acquired lock mode. */ - public @Nullable IgniteBiTuple, LockMode> tryAcquire(UUID txId, LockMode lockMode) { + @Nullable IgniteBiTuple, LockMode> tryAcquire(UUID txId, LockMode lockMode) { WaiterImpl waiter = new WaiterImpl(txId, lockMode); synchronized (waiters) { @@ -234,6 +382,11 @@ public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, Executor del setWaiterTimeout(waiter); } + // Put to wait queue, track. + if (prev == null) { + track(waiter.txId); + } + return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); } @@ -241,13 +394,22 @@ public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, Executor del waiters.remove(waiter.txId()); } else if (waiter.hasLockIntent()) { waiter.refuseIntent(); // Restore old lock. + } else { + // Lock granted, track. + if (prev == null) { + track(waiter.txId); + } } } // Notify outside the monitor. waiter.notifyLocked(); - return new IgniteBiTuple(waiter.fut, waiter.lockMode()); + return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); + } + + public synchronized int waitersCount() { + return waiters.size(); } /** @@ -265,7 +427,7 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { conflictFound(waiter.txId(), tmp.txId()); if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { - waiter.fail(lockException(waiter.txId(), tmp)); + waiter.fail(lockException(waiter, tmp)); return true; } @@ -284,7 +446,7 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { if (skipFail) { return false; } else if (deadlockPreventionPolicy.waitTimeout() == 0) { - waiter.fail(lockException(waiter.txId(), tmp)); + waiter.fail(lockException(waiter, tmp)); return true; } else { @@ -301,13 +463,13 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { /** * Create lock exception with given parameters. * - * @param txId Transaction id. - * @param conflictingWaiter Conflicting waiter. + * @param locker Locker. + * @param holder Lock holder. * @return Lock exception. */ - private LockException lockException(UUID txId, WaiterImpl conflictingWaiter) { - return new LockException(ACQUIRE_LOCK_ERR, "Failed to acquire a lock due to a conflict [txId=" - + txId + ", conflictingWaiter=" + conflictingWaiter + ']'); + private LockException lockException(WaiterImpl locker, WaiterImpl holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); } /** @@ -316,7 +478,7 @@ private LockException lockException(UUID txId, WaiterImpl conflictingWaiter) { * @param txId Transaction id. * @return {@code True} if the queue is empty. */ - public boolean tryRelease(UUID txId) { + boolean tryRelease(UUID txId) { Collection toNotify; synchronized (waiters) { @@ -332,20 +494,20 @@ public boolean tryRelease(UUID txId) { } /** - * Releases a specific lock of the key. + * Releases a specific lock of the key, if a key is locked in multiple modes by the same locker. * * @param txId Transaction id. * @param lockMode Lock mode. * @return If the value is true, no one waits of any lock of the key, false otherwise. */ - public boolean tryRelease(UUID txId, LockMode lockMode) { - List toNotify = Collections.emptyList(); + boolean tryRelease(UUID txId, LockMode lockMode) { + List toNotify = emptyList(); synchronized (waiters) { WaiterImpl waiter = waiters.get(txId); if (waiter != null) { - assert lockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : - "The lock mode is not locked [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; + assert LockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : + "The lock is not locked in specified mode [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; LockMode modeFromDowngrade = waiter.recalculateMode(lockMode); @@ -366,8 +528,7 @@ public boolean tryRelease(UUID txId, LockMode lockMode) { } /** - * Releases all locks are held by a specific transaction. - * This method should be invoked synchronously. + * Releases all locks are held by a specific transaction. This method should be invoked synchronously. * * @param txId Transaction id. * @return List of waiters to notify. @@ -376,14 +537,14 @@ private List release(UUID txId) { waiters.remove(txId); if (waiters.isEmpty()) { - markedForRemove = true; + if (key != null) { + markedForRemove = true; + } - return Collections.emptyList(); + return emptyList(); } - List toNotify = unlockCompatibleWaiters(); - - return toNotify; + return unlockCompatibleWaiters(); } /** @@ -393,7 +554,7 @@ private List release(UUID txId) { */ private List unlockCompatibleWaiters() { if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { - return Collections.emptyList(); + return emptyList(); } ArrayList toNotify = new ArrayList<>(); @@ -489,6 +650,18 @@ public Waiter waiter(UUID txId) { } } + private void track(UUID txId) { + txMap.compute(txId, (k, v) -> { + if (v == null) { + v = new ConcurrentLinkedQueue<>(); + } + + v.add(this); + + return v; + }); + } + /** * Notifies about the lock conflict found between transactions. * @@ -506,15 +679,13 @@ private void conflictFound(UUID acquirerTx, UUID holderTx) { private static class WaiterImpl implements Comparable, Waiter { /** * Holding locks by type. - * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. */ - private final Map locks = new HashMap<>(); + private final Map locks = new EnumMap<>(LockMode.class); /** * Lock modes are marked as intended, but have not taken yet. This is NOT specific to intention lock modes, such as IS and IX. - * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. */ - private final Set intendedLocks = new HashSet<>(); + private final Set intendedLocks = EnumSet.noneOf(LockMode.class); /** Locked future. */ @IgniteToStringExclude @@ -563,7 +734,7 @@ void addLock(LockMode lockMode, int increment) { * Removes a lock mode. * * @param lockMode Lock mode. - * @return True if the lock mode was removed, false otherwise. + * @return True if the lock is not locked in the passed mode, false otherwise. */ private boolean removeLock(LockMode lockMode) { Integer counter = locks.get(lockMode); @@ -663,6 +834,7 @@ private void notifyLocked() { } else { assert lockMode != null; + // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-20985 fut.complete(null); } } @@ -737,13 +909,20 @@ public int hashCode() { /** {@inheritDoc} */ @Override public String toString() { - return S.toString(WaiterImpl.class, this, "isDone", fut.isDone()); + return S.toString(WaiterImpl.class, this, "granted", fut.isDone()); } } - /** {@inheritDoc} */ - @Override - public boolean isEmpty() { - return locks.isEmpty(); + private static int spread(int h) { + return (h ^ (h >>> 16)) & 0x7fffffff; + } + + @TestOnly + public LockState[] getSlots() { + return slots; + } + + public int available() { + return empty.size(); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java new file mode 100644 index 00000000000..2aebc53a93d --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapUnboundedLockManager.java @@ -0,0 +1,754 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.event.AbstractEventProducer; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.tostring.IgniteToStringExclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; +import org.apache.ignite.internal.tx.Lock; +import org.apache.ignite.internal.tx.LockException; +import org.apache.ignite.internal.tx.LockKey; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.LockMode; +import org.apache.ignite.internal.tx.Waiter; +import org.apache.ignite.internal.tx.event.LockEvent; +import org.apache.ignite.internal.tx.event.LockEventParameters; +import org.jetbrains.annotations.Nullable; + +/** + * A {@link LockManager} which uses unbounded hashtable implementation. Suitable for holding coarse-grained locks. + */ +public class HeapUnboundedLockManager extends AbstractEventProducer implements LockManager { + /** Locks. */ + private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); + + /** Prevention policy. */ + private final DeadlockPreventionPolicy deadlockPreventionPolicy; + + /** Executor that is used to fail waiters after timeout. */ + private final Executor delayedExecutor; + + /** + * Constructor. + */ + public HeapUnboundedLockManager() { + this(new WaitDieDeadlockPreventionPolicy()); + } + + /** + * Constructor. + * + * @param deadlockPreventionPolicy Deadlock prevention policy. + */ + public HeapUnboundedLockManager(DeadlockPreventionPolicy deadlockPreventionPolicy) { + this.deadlockPreventionPolicy = deadlockPreventionPolicy; + this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0 + ? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS) + : null; + } + + @Override + public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lockMode) { + while (true) { + LockState state = lockState(lockKey); + + IgniteBiTuple, LockMode> futureTuple = state.tryAcquire(txId, lockMode); + + if (futureTuple.get1() == null) { + continue; // Obsolete state. + } + + LockMode newLockMode = futureTuple.get2(); + + return futureTuple.get1().thenApply(res -> new Lock(lockKey, newLockMode, txId)); + } + } + + @Override + public void release(Lock lock) { + LockState state = lockState(lock.lockKey()); + + if (state.tryRelease(lock.txId())) { + locks.remove(lock.lockKey(), state); + } + } + + @Override + public void release(UUID txId, LockKey lockKey, LockMode lockMode) { + LockState state = lockState(lockKey); + + if (state.tryRelease(txId, lockMode)) { + locks.remove(lockKey, state); + } + } + + @Override + public void releaseAll(UUID txId) { + Iterator locks = locks(txId); + + while (locks.hasNext()) { + Lock lock = locks.next(); + release(lock); + } + } + + @Override + public Iterator locks(UUID txId) { + // Decently ok to do full scan here because coarse-grained locks are a few in quantity. + List result = new ArrayList<>(); + + for (Map.Entry entry : locks.entrySet()) { + Waiter waiter = entry.getValue().waiter(txId); + + if (waiter != null) { + result.add( + new Lock( + entry.getKey(), + waiter.lockMode(), + txId + ) + ); + } + } + + return result.iterator(); + } + + /** + * Returns the lock state for the key. + * + * @param key The key. + */ + private LockState lockState(LockKey key) { + return locks.computeIfAbsent(key, k -> new LockState(deadlockPreventionPolicy, delayedExecutor)); + } + + /** {@inheritDoc} */ + @Override + public Collection queue(LockKey key) { + return lockState(key).queue(); + } + + /** {@inheritDoc} */ + @Override + public Waiter waiter(LockKey key, UUID txId) { + return lockState(key).waiter(txId); + } + + /** + * A lock state. + */ + private class LockState { + /** Waiters. */ + private final TreeMap waiters; + + private final DeadlockPreventionPolicy deadlockPreventionPolicy; + + /** Delayed executor for waiters timeout callback. */ + private final Executor delayedExecutor; + + /** Marked for removal flag. */ + private volatile boolean markedForRemove = false; + + public LockState(DeadlockPreventionPolicy deadlockPreventionPolicy, Executor delayedExecutor) { + Comparator txComparator = + deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; + + this.waiters = new TreeMap<>(txComparator); + this.deadlockPreventionPolicy = deadlockPreventionPolicy; + this.delayedExecutor = delayedExecutor; + } + + /** + * Attempts to acquire a lock for the specified {@code key} in specified lock mode. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + * @return The future or null if state is marked for removal and acquired lock mode. + */ + public @Nullable IgniteBiTuple, LockMode> tryAcquire(UUID txId, LockMode lockMode) { + WaiterImpl waiter = new WaiterImpl(txId, lockMode); + + synchronized (waiters) { + if (markedForRemove) { + return new IgniteBiTuple(null, lockMode); + } + + // We always replace the previous waiter with the new one. If the previous waiter has lock intention then incomplete + // lock future is copied to the new waiter. This guarantees that, if the previous waiter was locked concurrently, then + // it doesn't have any lock intentions, and the future is not copied to the new waiter. Otherwise, if there is lock + // intention, this means that the lock future contained in previous waiter, is not going to be completed and can be + // copied safely. + WaiterImpl prev = waiters.put(txId, waiter); + + // Reenter + if (prev != null) { + if (prev.locked() && prev.lockMode().allowReenter(lockMode)) { + waiter.lock(); + + waiter.upgrade(prev); + + return new IgniteBiTuple(completedFuture(null), prev.lockMode()); + } else { + waiter.upgrade(prev); + + assert prev.lockMode() == waiter.lockMode() : + "Lock modes are incorrect [prev=" + prev.lockMode() + ", new=" + waiter.lockMode() + ']'; + } + } + + if (!isWaiterReadyToNotify(waiter, false)) { + if (deadlockPreventionPolicy.waitTimeout() > 0) { + setWaiterTimeout(waiter); + } + + return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); + } + + if (!waiter.locked()) { + waiters.remove(waiter.txId()); + } else if (waiter.hasLockIntent()) { + waiter.refuseIntent(); // Restore old lock. + } + } + + // Notify outside the monitor. + waiter.notifyLocked(); + + return new IgniteBiTuple(waiter.fut, waiter.lockMode()); + } + + /** + * Checks current waiter. It can change the internal state of the waiter. + * + * @param waiter Checked waiter. + * @return True if current waiter ready to notify, false otherwise. + */ + private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { + for (Map.Entry entry : waiters.tailMap(waiter.txId(), false).entrySet()) { + WaiterImpl tmp = entry.getValue(); + LockMode mode = lockedMode(tmp); + + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { + conflictFound(waiter.txId(), tmp.txId()); + + if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { + waiter.fail(lockException(waiter, tmp)); + + return true; + } + + return false; + } + } + + for (Map.Entry entry : waiters.headMap(waiter.txId()).entrySet()) { + WaiterImpl tmp = entry.getValue(); + LockMode mode = lockedMode(tmp); + + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { + conflictFound(waiter.txId(), tmp.txId()); + + if (skipFail) { + return false; + } else if (deadlockPreventionPolicy.waitTimeout() == 0) { + waiter.fail(lockException(waiter, tmp)); + + return true; + } else { + return false; + } + } + } + + waiter.lock(); + + return true; + } + + /** + * Create lock exception with given parameters. + * + * @param locker Locker. + * @param holder Lock holder. + * @return Lock exception. + */ + private LockException lockException(Waiter locker, Waiter holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); + } + + /** + * Attempts to release a lock for the specified {@code key} in exclusive mode. + * + * @param txId Transaction id. + * @return {@code True} if the queue is empty. + */ + public boolean tryRelease(UUID txId) { + Collection toNotify; + + synchronized (waiters) { + toNotify = release(txId); + } + + // Notify outside the monitor. + for (WaiterImpl waiter : toNotify) { + waiter.notifyLocked(); + } + + return markedForRemove; + } + + /** + * Releases a specific lock of the key. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + * @return If the value is true, no one waits of any lock of the key, false otherwise. + */ + public boolean tryRelease(UUID txId, LockMode lockMode) { + List toNotify = Collections.emptyList(); + synchronized (waiters) { + WaiterImpl waiter = waiters.get(txId); + + if (waiter != null) { + assert lockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : + "The lock mode is not locked [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; + + LockMode modeFromDowngrade = waiter.recalculateMode(lockMode); + + if (!waiter.locked() && !waiter.hasLockIntent()) { + toNotify = release(txId); + } else if (modeFromDowngrade != waiter.lockMode()) { + toNotify = unlockCompatibleWaiters(); + } + } + } + + // Notify outside the monitor. + for (WaiterImpl waiter : toNotify) { + waiter.notifyLocked(); + } + + return markedForRemove; + } + + /** + * Releases all locks are held by a specific transaction. + * This method should be invoked synchronously. + * + * @param txId Transaction id. + * @return List of waiters to notify. + */ + private List release(UUID txId) { + waiters.remove(txId); + + if (waiters.isEmpty()) { + markedForRemove = true; + + return Collections.emptyList(); + } + + List toNotify = unlockCompatibleWaiters(); + + return toNotify; + } + + /** + * Unlock compatible waiters. + * + * @return List of waiters to notify. + */ + private List unlockCompatibleWaiters() { + if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { + return Collections.emptyList(); + } + + ArrayList toNotify = new ArrayList<>(); + Set toFail = new HashSet<>(); + + for (Map.Entry entry : waiters.entrySet()) { + WaiterImpl tmp = entry.getValue(); + + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) { + assert !tmp.hasLockIntent() : "This waiter in not locked for notification [waiter=" + tmp + ']'; + + toNotify.add(tmp); + } + } + + if (deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() >= 0) { + for (Map.Entry entry : waiters.entrySet()) { + WaiterImpl tmp = entry.getValue(); + + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) { + assert tmp.hasLockIntent() : "Only failed waiter can be notified here [waiter=" + tmp + ']'; + + toNotify.add(tmp); + toFail.add(tmp.txId()); + } + } + + for (UUID failTx : toFail) { + var w = waiters.get(failTx); + + if (w.locked()) { + w.refuseIntent(); + } else { + waiters.remove(failTx); + } + } + } + + return toNotify; + } + + /** + * Makes the waiter fail after specified timeout (in milliseconds), if intended lock was not acquired within this timeout. + * + * @param waiter Waiter. + */ + private void setWaiterTimeout(WaiterImpl waiter) { + delayedExecutor.execute(() -> { + if (!waiter.fut.isDone()) { + waiter.fut.completeExceptionally(new LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to " + + "timeout [txId=" + waiter.txId() + ", waiter=" + waiter + + ", timeout=" + deadlockPreventionPolicy.waitTimeout() + ']')); + } + }); + } + + /** + * Gets a lock mode for this waiter. + * + * @param waiter Waiter. + * @return Lock mode, which is held by the waiter or {@code null}, if the waiter holds nothing. + */ + private LockMode lockedMode(WaiterImpl waiter) { + LockMode mode = null; + + if (waiter.locked()) { + mode = waiter.lockMode(); + } + + return mode; + } + + /** + * Returns a collection of timestamps that is associated with the specified {@code key}. + * + * @return The waiters queue. + */ + public Collection queue() { + synchronized (waiters) { + return new ArrayList<>(waiters.keySet()); + } + } + + /** + * Returns a waiter for the specified {@code key}. + * + * @param txId Transaction id. + * @return The waiter. + */ + public Waiter waiter(UUID txId) { + synchronized (waiters) { + return waiters.get(txId); + } + } + + /** + * Notifies about the lock conflict found between transactions. + * + * @param acquirerTx Transaction which tries to acquire the lock. + * @param holderTx Transaction which holds the lock. + */ + private void conflictFound(UUID acquirerTx, UUID holderTx) { + fireEvent(LockEvent.LOCK_CONFLICT, new LockEventParameters(acquirerTx, holderTx)); + } + } + + /** + * A waiter implementation. + */ + private static class WaiterImpl implements Comparable, Waiter { + /** + * Holding locks by type. + * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. + */ + private final Map locks = new HashMap<>(); + + /** + * Lock modes are marked as intended, but have not taken yet. This is NOT specific to intention lock modes, such as IS and IX. + * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. + */ + private final Set intendedLocks = new HashSet<>(); + + /** Locked future. */ + @IgniteToStringExclude + private CompletableFuture fut; + + /** Waiter transaction id. */ + private final UUID txId; + + /** The lock mode to intend to hold. This is NOT specific to intention lock modes, such as IS and IX. */ + private LockMode intendedLockMode; + + /** The lock mode. */ + private LockMode lockMode; + + /** + * The filed has a value when the waiter couldn't lock a key. + */ + private LockException ex; + + /** + * The constructor. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + */ + WaiterImpl(UUID txId, LockMode lockMode) { + this.fut = new CompletableFuture<>(); + this.txId = txId; + this.intendedLockMode = lockMode; + + locks.put(lockMode, 1); + intendedLocks.add(lockMode); + } + + /** + * Adds a lock mode. + * + * @param lockMode Lock mode. + * @param increment Value to increment amount. + */ + void addLock(LockMode lockMode, int increment) { + locks.merge(lockMode, increment, Integer::sum); + } + + /** + * Removes a lock mode. + * + * @param lockMode Lock mode. + * @return True if the lock mode was removed, false otherwise. + */ + private boolean removeLock(LockMode lockMode) { + Integer counter = locks.get(lockMode); + + if (counter == null || counter < 2) { + locks.remove(lockMode); + + return true; + } else { + locks.put(lockMode, counter - 1); + + return false; + } + } + + /** + * Recalculates lock mode based of all locks which the waiter has taken. + * + * @param modeToRemove Mode without which, the recalculation will happen. + * @return Previous lock mode. + */ + LockMode recalculateMode(LockMode modeToRemove) { + if (!removeLock(modeToRemove)) { + return lockMode; + } + + return recalculate(); + } + + /** + * Recalculates lock supremums. + * + * @return Previous lock mode. + */ + private LockMode recalculate() { + LockMode newIntendedLockMode = null; + LockMode newLockMode = null; + + for (LockMode mode : locks.keySet()) { + assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + mode + "]"; + + if (intendedLocks.contains(mode)) { + newIntendedLockMode = newIntendedLockMode == null ? mode : LockMode.supremum(newIntendedLockMode, mode); + } else { + newLockMode = newLockMode == null ? mode : LockMode.supremum(newLockMode, mode); + } + } + + LockMode mode = lockMode; + + lockMode = newLockMode; + intendedLockMode = newLockMode != null && newIntendedLockMode != null ? LockMode.supremum(newLockMode, newIntendedLockMode) + : newIntendedLockMode; + + return mode; + } + + /** + * Merge all locks that were held by another waiter to the current one. + * + * @param other Other waiter. + */ + void upgrade(WaiterImpl other) { + intendedLocks.addAll(other.intendedLocks); + + other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), entry.getValue())); + + recalculate(); + + if (other.hasLockIntent()) { + fut = other.fut; + } + } + + /** + * Removes all locks that were intended to hold. + */ + void refuseIntent() { + for (LockMode mode : intendedLocks) { + locks.remove(mode); + } + + intendedLocks.clear(); + intendedLockMode = null; + } + + /** {@inheritDoc} */ + @Override + public int compareTo(WaiterImpl o) { + return txId.compareTo(o.txId); + } + + /** Notifies a future listeners. */ + private void notifyLocked() { + if (ex != null) { + fut.completeExceptionally(ex); + } else { + assert lockMode != null; + + fut.complete(null); + } + } + + /** {@inheritDoc} */ + @Override + public boolean locked() { + return this.lockMode != null; + } + + /** + * Checks is the waiter has any intended to lock a key. + * + * @return True if the waiter has an intended lock, false otherwise. + */ + public boolean hasLockIntent() { + return this.intendedLockMode != null; + } + + /** {@inheritDoc} */ + @Override + public LockMode lockMode() { + return lockMode; + } + + /** {@inheritDoc} */ + @Override + public LockMode intendedLockMode() { + return intendedLockMode; + } + + /** Grant a lock. */ + private void lock() { + lockMode = intendedLockMode; + + intendedLockMode = null; + + intendedLocks.clear(); + } + + /** + * Fails the lock waiter. + * + * @param e Lock exception. + */ + private void fail(LockException e) { + ex = e; + } + + /** {@inheritDoc} */ + @Override + public UUID txId() { + return txId; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + if (!(o instanceof WaiterImpl)) { + return false; + } + + return compareTo((WaiterImpl) o) == 0; + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return txId.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public String toString() { + return S.toString(WaiterImpl.class, this, "isDone", fut.isDone()); + } + } + + /** {@inheritDoc} */ + @Override + public boolean isEmpty() { + return locks.isEmpty(); + } +} diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java index 6671e686032..e79ed8ab4ff 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java @@ -138,7 +138,7 @@ private void processTxCleanup(TxCleanupMessage txCleanupMessage, String senderId } private void releaseTxLocks(UUID txId) { - lockManager.locks(txId).forEachRemaining(lockManager::release); + lockManager.releaseAll(txId); } private NetworkMessage prepareResponse() { diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index bb6467fd640..fcc14cb871a 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -332,6 +332,8 @@ public CompletableFuture finish( Map enlistedGroups, UUID txId ) { + LOG.debug("Finish [commit={}, txId={}, groups={}].", commit, txId, enlistedGroups); + assert enlistedGroups != null; if (enlistedGroups.isEmpty()) { diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java index d719b6c2909..9e4d4367a20 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java @@ -63,11 +63,13 @@ public void before() { protected abstract LockManager newInstance(); + protected abstract LockKey lockKey(); + @Test public void testSingleKeyWrite() { UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut0 = lockManager.acquire(txId1, key, X); @@ -89,7 +91,7 @@ public void testSingleKeyWriteLock() { UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut0 = lockManager.acquire(txId2, key, X); @@ -123,10 +125,10 @@ public void downgradeLockOutOfTurnTest() { UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId0, key, S).join(); - lockManager.acquire(txId2, key, S).join(); + Lock lock = lockManager.acquire(txId2, key, S).join(); CompletableFuture fut0 = lockManager.acquire(txId0, key, X); assertFalse(fut0.isDone()); @@ -139,8 +141,8 @@ public void downgradeLockOutOfTurnTest() { assertFalse(fut0.isDone()); - lockManager.release(lockManager.locks(txId2).next()); - fut0.thenAccept(lock -> lockManager.release(lock)); + lockManager.release(lock); + fut0.thenAccept(l -> lockManager.release(l)); } @Test @@ -149,7 +151,7 @@ public void upgradeLockImmediatelyTest() { UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut = lockManager.acquire(txId0, key, IS); assertTrue(fut.isDone()); @@ -175,7 +177,7 @@ public void testSingleKeyReadWriteLock() { assertTrue(txId3.compareTo(txId2) > 0); assertTrue(txId2.compareTo(txId1) > 0); assertTrue(txId1.compareTo(txId0) > 0); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut3 = lockManager.acquire(txId3, key, S); assertTrue(fut3.isDone()); @@ -220,7 +222,7 @@ public void testSingleKeyReadWriteLock() { public void testSingleKeyReadWriteConflict() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); // Lock in order CompletableFuture fut0 = lockManager.acquire(txId1, key, S); @@ -252,7 +254,7 @@ public void testSingleKeyReadWriteConflict() { @Test public void testSingleKeyReadWriteConflict2() { UUID[] txId = generate(3); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); // Lock in order CompletableFuture fut0 = lockManager.acquire(txId[1], key, S); @@ -275,7 +277,7 @@ public void testSingleKeyReadWriteConflict3() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); // Lock in order CompletableFuture fut0 = lockManager.acquire(txId1, key, S); @@ -301,7 +303,7 @@ public void testSingleKeyReadWriteConflict4() { final UUID txId2 = TestTransactionIds.newTransactionId(); UUID txId3 = TestTransactionIds.newTransactionId(); UUID txId4 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut4 = lockManager.acquire(txId4, key, S); assertTrue(fut4.isDone()); @@ -320,7 +322,7 @@ public void testSingleKeyReadWriteConflict4() { public void testSingleKeyReadWriteConflict5() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId0, key, X).join(); @@ -332,7 +334,7 @@ public void testConflicts() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); List> lockModes = new ArrayList<>(); @@ -375,7 +377,7 @@ public void testSingleKeyWriteWriteConflict() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); // Lock in order CompletableFuture fut0 = lockManager.acquire(txId1, key, X); @@ -398,7 +400,7 @@ public void testSingleKeyWriteWriteConflict2() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); // Lock in order CompletableFuture fut2 = lockManager.acquire(txId2, key, X); @@ -447,7 +449,7 @@ public void testSingleKeyMultithreadedRandom() throws InterruptedException { public void testLockUpgrade() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId0, key, S).join(); @@ -469,7 +471,7 @@ public void testLockUpgrade() { public void testLockUpgrade2() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId0, key, S).join(); @@ -483,7 +485,7 @@ public void testLockUpgrade3() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId1, key, S).join(); @@ -504,7 +506,7 @@ public void testLockUpgrade3() { public void testLockUpgrade4() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); lockManager.acquire(txId0, key, S).join(); @@ -525,7 +527,7 @@ public void testLockUpgrade4() { public void testLockUpgrade5() { UUID txId0 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); for (LockMode lockMode : List.of(IS, IX, SIX, X)) { lockManager.acquire(txId0, key, lockMode).join(); @@ -561,7 +563,7 @@ public void testLockUpgrade5() { @Test public void testReenter() { UUID txId = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); CompletableFuture fut = lockManager.acquire(txId, key, X); assertTrue(fut.isDone()); @@ -593,7 +595,7 @@ public void testAcquireReleasedLock() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); List> lockModes = new ArrayList<>(); @@ -638,7 +640,7 @@ public void testCompatibleLockModes() { UUID txId0 = TestTransactionIds.newTransactionId(); UUID txId1 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); List> lockModes = new ArrayList<>(); @@ -673,7 +675,7 @@ public void testCompatibleLockModes() { public void testPossibleDowngradeLockModes() { UUID txId0 = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); for (LockMode lockMode : List.of(SIX, S, IS, IX)) { CompletableFuture fut0 = lockManager.acquire(txId0, key, X); @@ -703,7 +705,7 @@ public void testPossibleDowngradeLockModes() { @Test public void testAcquireRelease() { UUID txId = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); for (LockMode lockMode : LockMode.values()) { lockManager.acquire(txId, key, lockMode); @@ -718,7 +720,7 @@ public void testAcquireRelease() { @Test public void testAcquireReleaseWhenHoldOther() { UUID txId = TestTransactionIds.newTransactionId(); - LockKey key = new LockKey("test"); + LockKey key = lockKey(); for (LockMode holdLockMode : LockMode.values()) { lockManager.acquire(txId, key, holdLockMode); @@ -744,7 +746,7 @@ public void testAcquireReleaseWhenHoldOther() { @Test public void testReleaseThenReleaseWeakerInHierarchy() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); @@ -774,7 +776,7 @@ public void testReleaseThenReleaseWeakerInHierarchy() { @Test public void testReleaseThenNoReleaseWeakerInHierarchy() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID txId1 = TestTransactionIds.newTransactionId(); UUID txId2 = TestTransactionIds.newTransactionId(); @@ -804,7 +806,7 @@ public void testReleaseThenNoReleaseWeakerInHierarchy() { @Test public void testLockingOverloadAndUpgrade() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -829,7 +831,7 @@ public void testLockingOverloadAndUpgrade() { @Test public void testLockingOverload() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -857,7 +859,7 @@ public void testLockingOverload() { @Test public void testFailUpgrade() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -891,7 +893,7 @@ public void testFailUpgrade() { @Test public void testDowngradeTargetLock() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -920,7 +922,7 @@ public void testDowngradeTargetLock() { @Test public void testFailWait() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -947,7 +949,7 @@ public void testFailWait() { @Test public void testWaitInOrder() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -979,7 +981,7 @@ public void testWaitInOrder() { @Test public void testWaitNotInOrder() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -1011,7 +1013,7 @@ public void testWaitNotInOrder() { @Test public void testWaitFailNotInOrder() { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); UUID tx1 = TestTransactionIds.newTransactionId(); UUID tx2 = TestTransactionIds.newTransactionId(); @@ -1058,7 +1060,7 @@ private void doTestSingleKeyMultithreaded( LongAdder failedLocks, int mode ) throws InterruptedException { - LockKey key = new LockKey("test"); + LockKey key = lockKey(); Thread[] threads = new Thread[Runtime.getRuntime().availableProcessors() * 2]; diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockingTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockingTest.java index e70565bc76d..89c32432e44 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockingTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockingTest.java @@ -49,7 +49,7 @@ protected LockKey key(Object key) { b.putInt(key.hashCode()); b.position(0); - return new LockKey(b); + return new LockKey(0, b); } protected CompletableFuture xlock(UUID tx, LockKey key) { diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java index 2d08c52bf0a..43b2baacf67 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.tx; import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy; /** * Test class for {@link HeapLockManager}. @@ -25,6 +26,11 @@ public class HeapLockManagerTest extends AbstractLockManagerTest { @Override protected LockManager newInstance() { - return new HeapLockManager(); + return new HeapLockManager(new WaitDieDeadlockPreventionPolicy()); + } + + @Override + protected LockKey lockKey() { + return new LockKey(0, "test"); } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java new file mode 100644 index 00000000000..24744f75049 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapUnboundedLockManagerTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; +import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy; + +/** + * Test class for {@link HeapUnboundedLockManager}. + */ +public class HeapUnboundedLockManagerTest extends AbstractLockManagerTest { + @Override + protected LockManager newInstance() { + return new HeapUnboundedLockManager(new WaitDieDeadlockPreventionPolicy()); + } + + @Override + protected LockKey lockKey() { + return new LockKey("test"); + } +} diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionTest.java index 52f258d456e..6319536ee7c 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionTest.java @@ -31,7 +31,7 @@ * another transaction. */ public class NoWaitDeadlockPreventionTest extends AbstractLockingTest { - private DeadlockPreventionPolicy deadlockPreventionPolicy() { + DeadlockPreventionPolicy deadlockPreventionPolicy() { return new DeadlockPreventionPolicy() { @Override public long waitTimeout() { diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java new file mode 100644 index 00000000000..c335950e910 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoWaitDeadlockPreventionUnboundedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; + +/** + * NoWaitDeadlockPreventionUnboundedTest. + */ +public class NoWaitDeadlockPreventionUnboundedTest extends NoWaitDeadlockPreventionTest { + @Override + protected LockManager lockManager() { + return new HeapUnboundedLockManager(deadlockPreventionPolicy()); + } +} diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionTest.java index 96515439860..f5a35109ad9 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionTest.java @@ -29,7 +29,7 @@ public class NoneDeadlockPreventionTest extends AbstractDeadlockPreventionTest { @Override protected DeadlockPreventionPolicy deadlockPreventionPolicy() { - return new DeadlockPreventionPolicy() { }; + return DeadlockPreventionPolicy.NO_OP; } @Test diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java new file mode 100644 index 00000000000..4dcdfadd007 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/NoneDeadlockPreventionUnboundedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; + +/** + * NoneDeadlockPreventionUnboundedTest. + */ +public class NoneDeadlockPreventionUnboundedTest extends NoneDeadlockPreventionTest { + @Override + protected LockManager lockManager() { + return new HeapUnboundedLockManager(deadlockPreventionPolicy()); + } +} diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java new file mode 100644 index 00000000000..f0f66bb8ea5 --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/ReversedDeadlockPreventionUnboundedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; + +/** + * ReversedDeadlockPreventionUnboundedTest. + */ +public class ReversedDeadlockPreventionUnboundedTest extends ReversedDeadlockPreventionTest { + @Override + protected LockManager lockManager() { + return new HeapUnboundedLockManager(deadlockPreventionPolicy()); + } +} diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java new file mode 100644 index 00000000000..3bd121abd5d --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TimeoutDeadlockPreventionUnboundedTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager; + +/** + * TimeoutDeadlockPreventionUnboundedTest. + */ +public class TimeoutDeadlockPreventionUnboundedTest extends TimeoutDeadlockPreventionTest { + @Override + protected LockManager lockManager() { + return new HeapUnboundedLockManager(deadlockPreventionPolicy()); + } +}