diff --git a/bucket4j-parent/pom.xml b/bucket4j-parent/pom.xml index ab42f6d7..378f6978 100644 --- a/bucket4j-parent/pom.xml +++ b/bucket4j-parent/pom.xml @@ -66,6 +66,7 @@ 5.0.2.RELEASE 2.9.0 3.16.8 + 6.1.8.RELEASE 2.9.3 diff --git a/bucket4j-redis/pom.xml b/bucket4j-redis/pom.xml index cbd016ed..fb30e411 100644 --- a/bucket4j-redis/pom.xml +++ b/bucket4j-redis/pom.xml @@ -43,12 +43,12 @@ test-jar test - - - - - - + + io.lettuce + lettuce-core + ${lettuce.version} + provided + diff --git a/bucket4j-redis/src/main/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManager.java b/bucket4j-redis/src/main/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManager.java new file mode 100644 index 00000000..7d146aae --- /dev/null +++ b/bucket4j-redis/src/main/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManager.java @@ -0,0 +1,173 @@ +/*- + * ========================LICENSE_START================================= + * Bucket4j + * %% + * Copyright (C) 2015 - 2022 Vladimir Bukhtoyarov + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =========================LICENSE_END================================== + */ + +package io.github.bucket4j.redis.lettuce.cas; + +import io.github.bucket4j.distributed.proxy.ClientSideConfig; +import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.AbstractCompareAndSwapBasedProxyManager; +import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.AsyncCompareAndSwapOperation; +import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.CompareAndSwapOperation; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisException; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.ScriptOutputType; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.ByteArrayCodec; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class LettuceBasedProxyManager extends AbstractCompareAndSwapBasedProxyManager { + + private final RedisAsyncCommands commands; + private final long ttlMillis; + + public LettuceBasedProxyManager(RedisAsyncCommands redisAsyncCommands, ClientSideConfig clientSideConfig, Duration ttl) { + super(clientSideConfig); + Objects.requireNonNull(redisAsyncCommands); + this.commands = redisAsyncCommands; + this.ttlMillis = ttl.toMillis(); + } + + public LettuceBasedProxyManager(RedisAsyncCommands redisAsyncCommands, Duration ttl) { + this(redisAsyncCommands, ClientSideConfig.getDefault(), ttl); + } + + public LettuceBasedProxyManager(StatefulRedisConnection statefulRedisConnection, ClientSideConfig clientSideConfig, Duration ttl) { + this(statefulRedisConnection.async(), clientSideConfig, ttl); + } + + public LettuceBasedProxyManager(StatefulRedisConnection statefulRedisConnection, Duration ttl) { + this(statefulRedisConnection, ClientSideConfig.getDefault(), ttl); + } + + public LettuceBasedProxyManager(RedisClient redisClient, Duration ttl) { + this(redisClient, ClientSideConfig.getDefault(), ttl); + } + + public LettuceBasedProxyManager(RedisClient redisClient, ClientSideConfig clientSideConfig, Duration ttl) { + this(redisClient.connect(ByteArrayCodec.INSTANCE), clientSideConfig, ttl); + } + + @Override + protected CompareAndSwapOperation beginCompareAndSwapOperation(byte[] key) { + byte[][] keys = {key}; + return new CompareAndSwapOperation() { + @Override + public Optional getStateData() { + RedisFuture stateFuture = commands.get(key); + return Optional.ofNullable(getFutureValue(stateFuture)); + } + + @Override + public boolean compareAndSwap(byte[] originalData, byte[] newData) { + return getFutureValue(compareAndSwapFuture(key, keys, originalData, newData)); + } + }; + } + + @Override + protected AsyncCompareAndSwapOperation beginAsyncCompareAndSwapOperation(byte[] key) { + byte[][] keys = {key}; + return new AsyncCompareAndSwapOperation() { + @Override + public CompletableFuture> getStateData() { + RedisFuture stateFuture = commands.get(key); + return convertToCompletableFuture(stateFuture) + .thenApply((byte[] resultBytes) -> Optional.ofNullable(resultBytes)); + } + + @Override + public CompletableFuture compareAndSwap(byte[] originalData, byte[] newData) { + return convertToCompletableFuture(compareAndSwapFuture(key, keys, originalData, newData)); + } + }; + } + + @Override + public void removeProxy(byte[] key) { + RedisFuture future = commands.del(key); + getFutureValue(future); + } + + @Override + protected CompletableFuture removeAsync(byte[] key) { + RedisFuture future = commands.del(key); + return convertToCompletableFuture(future).thenApply(bytes -> null); + } + + @Override + public boolean isAsyncModeSupported() { + return true; + } + + private RedisFuture compareAndSwapFuture(byte[] key, byte[][] keys, byte[] originalData, byte[] newData) { + if (originalData == null) { + // nulls are prohibited as values, so "replace" must not be used in such cases + String script = "return redis.call('set', KEYS[1], ARGV[1], 'nx', 'px', ARGV[2])"; + byte[][] params = {newData, encodeLong(ttlMillis)}; + return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params); + } else { + String script = + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "redis.call('psetex', KEYS[1], ARGV[3], ARGV[2]); " + + "return 1; " + + "else " + + "return 0; " + + "end"; + byte[][] params = {originalData, newData, encodeLong(ttlMillis)}; + return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params); + } + } + + private CompletableFuture convertToCompletableFuture(RedisFuture redissonFuture) { + CompletableFuture jdkFuture = new CompletableFuture<>(); + redissonFuture.whenComplete((result, error) -> { + if (error != null) { + jdkFuture.completeExceptionally(error); + } else { + jdkFuture.complete(result); + } + }); + return jdkFuture; + } + + private V getFutureValue(RedisFuture value) { + try { + return value.get(); + } catch (InterruptedException e) { + value.cancel(true); + Thread.currentThread().interrupt(); + throw new RedisException(e); + } catch (ExecutionException e) { + throw e.getCause() instanceof RedisException ? (RedisException) e.getCause() : + new RedisException("Unexpected exception while processing command", e.getCause()); + } + } + + private byte[] encodeLong(Long value) { + return ("" + value).getBytes(StandardCharsets.UTF_8); + } +} \ No newline at end of file diff --git a/bucket4j-redis/src/test/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManagerTest.java b/bucket4j-redis/src/test/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManagerTest.java new file mode 100644 index 00000000..79e4bc74 --- /dev/null +++ b/bucket4j-redis/src/test/java/io/github/bucket4j/redis/lettuce/cas/LettuceBasedProxyManagerTest.java @@ -0,0 +1,60 @@ +package io.github.bucket4j.redis.lettuce.cas; + +import io.github.bucket4j.distributed.proxy.ClientSideConfig; +import io.github.bucket4j.distributed.proxy.ProxyManager; +import io.github.bucket4j.tck.AbstractDistributedBucketTest; +import io.lettuce.core.RedisClient; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.testcontainers.containers.GenericContainer; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.UUID; + +public class LettuceBasedProxyManagerTest extends AbstractDistributedBucketTest { + + private static GenericContainer container; + private static RedisClient redisClient; + + @BeforeClass + public static void setup() { + container = startRedisContainer(); + redisClient = createLettuceClient(container); + } + + @AfterClass + public static void shutdown() { + if (redisClient != null) { + redisClient.shutdown(); + } + if (container != null) { + container.close(); + } + } + + private static RedisClient createLettuceClient(GenericContainer container) { + String redisHost = container.getHost(); + Integer redisPort = container.getMappedPort(6379); + String redisUrl = "redis://" + redisHost + ":" + redisPort; + + return RedisClient.create(redisUrl); + } + + private static GenericContainer startRedisContainer() { + GenericContainer genericContainer = new GenericContainer("redis:7.0.2").withExposedPorts(6379); + genericContainer.start(); + return genericContainer; + } + + @Override + protected ProxyManager getProxyManager() { + return new LettuceBasedProxyManager(redisClient, ClientSideConfig.getDefault(), Duration.ofMinutes(10)); + } + + @Override + protected byte[] generateRandomKey() { + return UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index dd52f445..140288d5 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.github.vladimir-bukhtoyarov bucket4j - 7.5.0 + 7.6.0 pom