Skip to content

Commit

Permalink
IGNITE-17811 Lock table optimizations - Fixes #2720.
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Scherbakov <alexey.scherbakoff@gmail.com>
  • Loading branch information
ascherbakoff committed Dec 15, 2023
1 parent 0bf09bd commit ae0e224
Show file tree
Hide file tree
Showing 28 changed files with 1,760 additions and 229 deletions.
2 changes: 1 addition & 1 deletion modules/platforms/cpp/tests/odbc-test/transaction_test.cpp
Expand Up @@ -575,7 +575,7 @@ TEST_F(transaction_test, transaction_error) {
try {
insert_test_value(conn2.m_statement, 2, "test_2");
} catch (const odbc_exception &err) {
EXPECT_THAT(err.message, testing::HasSubstr("Failed to acquire a lock due to a conflict"));
EXPECT_THAT(err.message, testing::HasSubstr("Failed to acquire a lock due to a possible deadlock"));
// TODO: IGNITE-19944 Propagate SQL errors from engine to driver
EXPECT_EQ(err.sql_state, "HY000");
throw;
Expand Down
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.benchmark;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

/**
* Benchmark lock manager.
*/
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class LockManagerBenchmark {
private LockManager lockManager;
private TransactionIdGenerator generator;
private HybridClock clock;

/**
* Initializes session and statement.
*/
@Setup
public void setUp() {
lockManager = new HeapLockManager();
generator = new TransactionIdGenerator(0);
clock = new TestHybridClock(() -> 0L);
}

/**
* Closes resources.
*/
@TearDown
public void tearDown() throws Exception {
if (!lockManager.isEmpty()) {
throw new AssertionError("Invalid lock manager state");
}
}

/**
* Concurrent active transactions.
*/
@Param({"200"})
private int concTxns;

/**
* Take and release some locks.
*/
@Benchmark
@Warmup(iterations = 1, time = 3)
@Measurement(iterations = 1, time = 10)
public void lockCommit() {
List<UUID> ids = new ArrayList<>(concTxns);

int c = 0;

for (int i = 0; i < concTxns; i++) {
UUID txId = generator.transactionIdFor(clock.now());
ids.add(txId);
lockManager.acquire(txId, new LockKey(0, new RowId(0, new UUID(0, c++))), LockMode.X).join();
}

for (UUID id : ids) {
lockManager.releaseAll(id);
}
}

/**
* Benchmark's entry point.
*/
public static void main(String[] args) throws RunnerException {
// TODO JVM args
Options opt = new OptionsBuilder()
.include(".*" + LockManagerBenchmark.class.getSimpleName() + ".*")
.forks(1)
.threads(1)
.mode(Mode.AverageTime)
.build();

new Runner(opt).run();
}
}
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.distributed;

import static org.apache.ignite.internal.replicator.ReplicaManager.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager.LockState;
import org.apache.ignite.internal.tx.impl.HeapUnboundedLockManager;
import org.apache.ignite.internal.tx.impl.TransactionIdGenerator;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

/**
* Test lock table.
*/
@ExtendWith(ConfigurationExtension.class)
public class ItLockTableTest extends IgniteAbstractTest {
private static final IgniteLogger LOG = Loggers.forClass(ItLockTableTest.class);

private static int EMP_TABLE_ID = 2;

private static final int CACHE_SIZE = 10;

private static final String TABLE_NAME = "test";

private static SchemaDescriptor TABLE_SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("id".toUpperCase(), NativeTypes.INT32, false)},
new Column[]{
new Column("name".toUpperCase(), NativeTypes.STRING, true),
new Column("salary".toUpperCase(), NativeTypes.DOUBLE, true)
}
);

protected TableViewInternal testTable;

protected final TestInfo testInfo;

//TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195
@InjectConfiguration("mock: { fsync: false }")
protected static RaftConfiguration raftConfiguration;

@InjectConfiguration
protected static GcConfiguration gcConfig;

@InjectConfiguration
protected static TransactionConfiguration txConfiguration;

private ItTxTestCluster txTestCluster;

private HybridTimestampTracker timestampTracker = new HybridTimestampTracker();

/**
* The constructor.
*
* @param testInfo Test info.
*/
public ItLockTableTest(TestInfo testInfo) {
this.testInfo = testInfo;
}

@BeforeEach
public void before() throws Exception {
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
txConfiguration,
workDir,
1,
1,
false,
timestampTracker
) {
@Override
protected TxManagerImpl newTxManager(
ClusterService clusterService,
ReplicaService replicaSvc,
HybridClock clock,
TransactionIdGenerator generator,
ClusterNode node,
PlacementDriver placementDriver
) {
return new TxManagerImpl(
txConfiguration,
clusterService,
replicaSvc,
new HeapLockManager(
DeadlockPreventionPolicy.NO_OP,
HeapLockManager.SLOTS,
CACHE_SIZE,
new HeapUnboundedLockManager()),
clock,
generator,
placementDriver,
() -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS
);
}
};
txTestCluster.prepareCluster();

testTable = txTestCluster.startTable(TABLE_NAME, EMP_TABLE_ID, TABLE_SCHEMA);

log.info("Tables have been started");
}

@AfterEach
public void after() throws Exception {
txTestCluster.shutdownCluster();
}

/**
* Test that a lock table behaves correctly in case of lock cache overflow.
*/
@Test
public void testCollision() {
RecordView<Tuple> view = testTable.recordView();

int i = 0;
final int count = 1000;
List<Transaction> txns = new ArrayList<>();
while (i++ < count) {
Transaction tx = txTestCluster.igniteTransactions().begin();
view.insertAsync(tx, tuple(i, "x-" + i));
txns.add(tx);
}

assertTrue(TestUtils.waitForCondition(() -> {
int total = 0;
HeapLockManager lockManager = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager();
for (int j = 0; j < lockManager.getSlots().length; j++) {
LockState slot = lockManager.getSlots()[j];
total += slot.waitersCount();
}

return total == count && lockManager.available() == 0;
}, 10_000), "Some lockers are missing");

int empty = 0;
int coll = 0;

HeapLockManager lm = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager();
for (int j = 0; j < lm.getSlots().length; j++) {
LockState slot = lm.getSlots()[j];
int cnt = slot.waitersCount();
if (cnt == 0) {
empty++;
}
if (cnt > 1) {
coll += cnt;
}
}

LOG.info("LockTable [emptySlots={} collisions={}]", empty, coll);

assertTrue(coll > 0);

List<CompletableFuture<?>> finishFuts = new ArrayList<>();
for (Transaction txn : txns) {
finishFuts.add(txn.commitAsync());
}

for (CompletableFuture<?> finishFut : finishFuts) {
finishFut.join();
}

assertTrue(TestUtils.waitForCondition(() -> {
int total = 0;
HeapLockManager lockManager = (HeapLockManager) txTestCluster.txManagers.get(txTestCluster.localNodeName).lockManager();
for (int j = 0; j < lockManager.getSlots().length; j++) {
LockState slot = lockManager.getSlots()[j];
total += slot.waitersCount();
}

return total == 0 && lockManager.available() == CACHE_SIZE;
}, 10_000), "Illegal lock manager state");
}

private static Tuple tuple(int id, String name) {
return Tuple.create()
.set("id", id)
.set("name", name);
}

private static Tuple keyTuple(int id) {
return Tuple.create()
.set("id", id);
}
}
Expand Up @@ -75,10 +75,10 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {

//TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195
@InjectConfiguration("mock: { fsync: false }")
protected static RaftConfiguration raftConfiguration;
protected RaftConfiguration raftConfiguration;

@InjectConfiguration
protected static TransactionConfiguration txConfiguration;
protected TransactionConfiguration txConfiguration;

/**
* Returns a count of nodes.
Expand Down Expand Up @@ -153,6 +153,7 @@ public void before() throws Exception {
@AfterEach
public void after() throws Exception {
txTestCluster.shutdownCluster();
Mockito.framework().clearInlineMocks();
}

/**
Expand Down
Expand Up @@ -253,6 +253,6 @@ public void testTwoReadWriteTransactionsWaitForCleanup() throws TransactionExcep
}

private static void releaseTxLocks(UUID txId, LockManager lockManager) {
lockManager.locks(txId).forEachRemaining(lockManager::release);
lockManager.releaseAll(txId);
}
}

0 comments on commit ae0e224

Please sign in to comment.