Skip to content

Commit

Permalink
spring-projectsGH-3444: Custom TTL per LOCK in RedisLockRegistry and …
Browse files Browse the repository at this point in the history
…JdbcLockRegistry
  • Loading branch information
EddieChoCho committed Mar 29, 2024
1 parent c155d5d commit 5e6ae90
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.locks;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
* A {@link Lock} implementing this interface supports the spring distributed locks with custom time-to-live value per lock
*
* @author Eddie Cho
*
* @since 6.3
*/
public interface CustomTtlLock extends Lock {

/**
* Attempt to acquire a lock with a specific time-to-live
* @param time the maximum time to wait for the lock unit
* @param unit the time unit of the time argument
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException -
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
*/
boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException;

/**
* Attempt to acquire a lock with a specific time-to-live
* @param customTtl the specific time-to-live for the lock status data
* @param customTtlUnit the time unit of the customTtl argument
*/
void lock(long customTtl, TimeUnit customTtlUnit);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* A {@link LockRegistry} implementing this interface supports the CustomTtlLock
*
* @author Eddie Cho
*
* @since 6.3
*/
package org.springframework.integration.support.locks;

public interface CustomTtlLockRegistry extends LockRegistry {

CustomTtlLock obtainCustomTtlLock(Object lockKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.integration.support.locks.CustomTtlLock;
import org.springframework.integration.support.locks.CustomTtlLockRegistry;
import org.springframework.integration.support.locks.ExpirableLockRegistry;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -90,7 +92,7 @@
* @since 4.0
*
*/
public final class RedisLockRegistry implements ExpirableLockRegistry, DisposableBean {
public final class RedisLockRegistry implements ExpirableLockRegistry, CustomTtlLockRegistry, DisposableBean {

private static final Log LOGGER = LogFactory.getLog(RedisLockRegistry.class);

Expand Down Expand Up @@ -225,6 +227,11 @@ public void setRedisLockType(RedisLockType redisLockType) {

@Override
public Lock obtain(Object lockKey) {
return this.obtainCustomTtlLock(lockKey);
}

@Override
public CustomTtlLock obtainCustomTtlLock(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
this.lock.lock();
Expand Down Expand Up @@ -296,7 +303,7 @@ private Function<String, RedisLock> getRedisLockConstructor(RedisLockType redisL
};
}

private abstract class RedisLock implements Lock {
private abstract class RedisLock implements CustomTtlLock {

private static final String OBTAIN_LOCK_SCRIPT = """
local lockClientId = redis.call('GET', KEYS[1])
Expand Down Expand Up @@ -334,11 +341,12 @@ public long getLockedAt() {
/**
* Attempt to acquire a lock in redis.
* @param time the maximum time(milliseconds) to wait for the lock, -1 infinity
* @param expireAfter the time-to-live(milliseconds) for the lock status data
* @return true if the lock was acquired and false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException –
* if the current thread is interrupted while acquiring the lock (and interruption of lock acquisition is supported)
*/
protected abstract boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException;
protected abstract boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException;

/**
* Unlock the lock using the unlink method in redis.
Expand All @@ -352,10 +360,16 @@ public long getLockedAt() {

@Override
public final void lock() {
this.lock(RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);
}

@Override
public void lock(long customTtl, TimeUnit customTtlUnit) {
this.localLock.lock();
while (true) {
try {
if (tryRedisLock(-1L)) {
long customTtlInMilliseconds = TimeUnit.MILLISECONDS.convert(customTtl, customTtlUnit);
if (tryRedisLock(-1L, customTtlInMilliseconds)) {
return;
}
}
Expand All @@ -382,7 +396,7 @@ public final void lockInterruptibly() throws InterruptedException {
this.localLock.lockInterruptibly();
while (true) {
try {
if (tryRedisLock(-1L)) {
if (tryRedisLock(-1L, RedisLockRegistry.this.expireAfter)) {
return;
}
}
Expand Down Expand Up @@ -411,12 +425,18 @@ public final boolean tryLock() {

@Override
public final boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return this.tryLock(time, unit, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);
}

@Override
public boolean tryLock(long time, TimeUnit unit, long customTtl, TimeUnit customTtlUnit) throws InterruptedException {
if (!this.localLock.tryLock(time, unit)) {
return false;
}
try {
long waitTime = TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired = tryRedisLock(waitTime);
long customTtlInMilliseconds = TimeUnit.MILLISECONDS.convert(customTtl, customTtlUnit);
boolean acquired = tryRedisLock(waitTime, customTtlInMilliseconds);
if (!acquired) {
this.localLock.unlock();
}
Expand All @@ -429,8 +449,8 @@ public final boolean tryLock(long time, TimeUnit unit) throws InterruptedExcepti
return false;
}

private boolean tryRedisLock(long time) throws ExecutionException, InterruptedException {
final boolean acquired = tryRedisLockInner(time);
private boolean tryRedisLock(long time, long expireAfter) throws ExecutionException, InterruptedException {
final boolean acquired = tryRedisLockInner(time, expireAfter);
if (acquired) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Acquired lock; " + this);
Expand All @@ -440,11 +460,11 @@ private boolean tryRedisLock(long time) throws ExecutionException, InterruptedEx
return acquired;
}

protected final Boolean obtainLock() {
protected final Boolean obtainLock(long expireAfter) {
return RedisLockRegistry.this.redisTemplate
.execute(OBTAIN_LOCK_REDIS_SCRIPT, Collections.singletonList(this.lockKey),
RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
String.valueOf(expireAfter));
}

@Override
Expand Down Expand Up @@ -598,8 +618,8 @@ private RedisPubSubLock(String path) {
}

@Override
protected boolean tryRedisLockInner(long time) throws ExecutionException, InterruptedException {
return subscribeLock(time);
protected boolean tryRedisLockInner(long time, long expireAfter) throws ExecutionException, InterruptedException {
return subscribeLock(time, expireAfter);
}

@Override
Expand All @@ -618,9 +638,9 @@ private boolean removeLockKeyWithScript(RedisScript<Boolean> redisScript) {
RedisLockRegistry.this.clientId, RedisLockRegistry.this.unLockChannelKey));
}

private boolean subscribeLock(long time) throws ExecutionException, InterruptedException {
private boolean subscribeLock(long time, long expireAfter) throws ExecutionException, InterruptedException {
final long expiredTime = System.currentTimeMillis() + time;
if (obtainLock()) {
if (obtainLock(expireAfter)) {
return true;
}

Expand All @@ -635,7 +655,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
Future<String> future =
RedisLockRegistry.this.unlockNotifyMessageListener.subscribeLock(this.lockKey);
//DCL
if (obtainLock()) {
if (obtainLock(expireAfter)) {
return true;
}
try {
Expand All @@ -645,7 +665,7 @@ private boolean subscribeLock(long time) throws ExecutionException, InterruptedE
}
catch (TimeoutException ignore) {
}
if (obtainLock()) {
if (obtainLock(expireAfter)) {
return true;
}
}
Expand Down Expand Up @@ -737,18 +757,18 @@ private RedisSpinLock(String path) {
}

@Override
protected boolean tryRedisLockInner(long time) throws InterruptedException {
protected boolean tryRedisLockInner(long time, long expireAfter) throws InterruptedException {
long now = System.currentTimeMillis();
if (time == -1L) {
while (!obtainLock()) {
while (!obtainLock(expireAfter)) {
Thread.sleep(100); //NOSONAR
}
return true;
}
else {
long expire = now + TimeUnit.MILLISECONDS.convert(time, TimeUnit.MILLISECONDS);
boolean acquired;
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
while (!(acquired = obtainLock(expireAfter)) && System.currentTimeMillis() < expire) { //NOSONAR
Thread.sleep(100); //NOSONAR
}
return acquired;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.integration.redis.RedisContainerTest;
import org.springframework.integration.redis.util.RedisLockRegistry.RedisLockType;
import org.springframework.integration.support.locks.CustomTtlLock;
import org.springframework.integration.test.util.TestUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;

/**
Expand All @@ -64,6 +66,7 @@
* @author Unseok Kim
* @author Artem Vozhdayenko
* @author Anton Gabov
* @author Eddie Cho
*
* @since 4.0
*
Expand Down Expand Up @@ -115,6 +118,64 @@ void testLock(RedisLockType testRedisLockType) {
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException {
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
registry.setRedisLockType(testRedisLockType);
for (int i = 0; i < 3; i++) {
CustomTtlLock lock = registry.obtainCustomTtlLock("foo");
lock.lock(500, TimeUnit.MILLISECONDS);
try {
assertThat(getRedisLockRegistryLocks(registry)).hasSize(1);
Thread.sleep(400);
}
finally {
lock.unlock();
}
}
registry.expireUnusedOlderThan(-1000);
assertThat(getRedisLockRegistryLocks(registry)).isEmpty();
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testTryLockWithCustomTtl(RedisLockType testRedisLockType) throws InterruptedException {
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
registry.setRedisLockType(testRedisLockType);
for (int i = 0; i < 3; i++) {
CustomTtlLock lock = registry.obtainCustomTtlLock("foo");
lock.tryLock(100, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS);
try {
assertThat(getRedisLockRegistryLocks(registry)).hasSize(1);
Thread.sleep(400);
}
finally {
lock.unlock();
}
}
registry.expireUnusedOlderThan(-1000);
assertThat(getRedisLockRegistryLocks(registry)).isEmpty();
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testUnlock_lockStatusIsExpired_IllegalStateExceptionWillBeThrown(RedisLockType testRedisLockType) throws InterruptedException {
RedisLockRegistry registry = new RedisLockRegistry(redisConnectionFactory, this.registryKey, 100);
registry.setRedisLockType(testRedisLockType);
Lock lock = registry.obtain("foo");
try {
lock.lock();
Thread.sleep(200);
}
finally {
assertThatThrownBy(lock::unlock).isInstanceOf(IllegalStateException.class);
}
registry.destroy();
}

@ParameterizedTest
@EnumSource(RedisLockType.class)
void testLockInterruptibly(RedisLockType testRedisLockType) throws Exception {
Expand Down

0 comments on commit 5e6ae90

Please sign in to comment.