diff --git a/pubsub/build.gradle.kts b/pubsub/build.gradle.kts index 6a71820a3..2cefce134 100644 --- a/pubsub/build.gradle.kts +++ b/pubsub/build.gradle.kts @@ -6,9 +6,6 @@ dependencies { // Jackson (JSON) api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310") - // Cache - api(group = "com.github.ben-manes.caffeine", name = "caffeine") - // Annotations api(group = "org.jetbrains", name = "annotations") diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java index 56a4cabdd..871ab607a 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java @@ -2,6 +2,7 @@ import com.github.twitch4j.pubsub.domain.PubSubRequest; import lombok.AccessLevel; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -12,6 +13,7 @@ * @see TwitchPubSub#unsubscribeFromTopic(PubSubSubscription) */ @RequiredArgsConstructor +@EqualsAndHashCode public class PubSubSubscription { @Getter(AccessLevel.PACKAGE) private final PubSubRequest request; diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index 56d39befd..ca4bb39e5 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -112,7 +113,7 @@ public class TwitchPubSub implements ITwitchPubSub { /** * Command Queue */ - protected final BlockingQueue commandQueue = new ArrayBlockingQueue<>(200); + protected final BlockingQueue commandQueue = new ArrayBlockingQueue<>(128); /** * Holds the subscribed topics in case we need to reconnect @@ -748,7 +749,16 @@ public void onTextMessage(WebSocket ws, String text) { } } else if (message.getType().equals(PubSubType.RESPONSE)) { - eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError())); + Supplier findListenRequest = () -> { + for (PubSubRequest topic : subscribedTopics) { + if (topic != null && StringUtils.equals(message.getNonce(), topic.getNonce())) { + return topic; + } + } + return null; + }; + + eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError(), findListenRequest)); // topic subscription success or failed, response to listen command // System.out.println(message.toString()); diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java index 2a301b70d..51d2065a4 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java @@ -1,7 +1,5 @@ package com.github.twitch4j.pubsub; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.github.twitch4j.common.pool.TwitchModuleConnectionPool; import com.github.twitch4j.common.util.CryptoUtils; import com.github.twitch4j.pubsub.domain.PubSubRequest; @@ -11,7 +9,6 @@ import org.apache.commons.lang3.StringUtils; import java.util.Collection; -import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; /** @@ -26,10 +23,6 @@ public class TwitchPubSubConnectionPool extends TwitchModuleConnectionPool subscriptionsByNonce = Caffeine.newBuilder() - .expireAfterWrite(30, TimeUnit.SECONDS) - .build(); - @Override public PubSubSubscription listenOnTopic(PubSubRequest request) { return this.subscribe(request); @@ -44,10 +37,8 @@ public boolean unsubscribeFromTopic(PubSubSubscription subscription) { public PubSubSubscription subscribe(PubSubRequest pubSubRequest) { final int topics = getTopicCount(pubSubRequest); if (topics <= 0) return null; - final String nonce = injectNonce(pubSubRequest); - final PubSubSubscription subscription = super.subscribe(pubSubRequest); - subscriptionsByNonce.put(nonce, subscription); - return subscription; + injectNonce(pubSubRequest); + return super.subscribe(pubSubRequest); } @Override @@ -62,10 +53,8 @@ protected TwitchPubSub createConnection() { // Reclaim topic headroom upon a failed subscription client.getEventManager().onEvent("twitch4j-pubsub-pool-nonce-tracker", PubSubListenResponseEvent.class, e -> { - if (StringUtils.isNotEmpty(e.getNonce())) { - PubSubSubscription subscription = subscriptionsByNonce.asMap().remove(e.getNonce()); - if (e.hasError() && subscription != null) - unsubscribe(subscription); + if (e.hasError()) { + e.getListenRequest().map(PubSubSubscription::new).ifPresent(this::unsubscribe); } }); @@ -104,10 +93,9 @@ protected int getSubscriptionSize(PubSubRequest pubSubRequest) { return getTopicCount(pubSubRequest); } - private static String injectNonce(PubSubRequest req) { + private static void injectNonce(PubSubRequest req) { if (StringUtils.isBlank(req.getNonce())) req.setNonce(CryptoUtils.generateNonce(30)); - return req.getNonce(); } private static int getTopicCount(PubSubRequest req) { diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java index 034a3816f..358d70c4d 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java @@ -1,9 +1,16 @@ package com.github.twitch4j.pubsub.events; import com.github.twitch4j.common.events.TwitchEvent; +import com.github.twitch4j.pubsub.domain.PubSubRequest; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; import lombok.Value; +import java.util.Optional; +import java.util.function.Supplier; + @Value @EqualsAndHashCode(callSuper = false) public class PubSubListenResponseEvent extends TwitchEvent { @@ -18,6 +25,19 @@ public class PubSubListenResponseEvent extends TwitchEvent { */ String error; + @ToString.Exclude + @Getter(AccessLevel.PRIVATE) + Supplier listenRequestSupplier; + + /** + * @return the listen request associated with this response. + * @implNote The current implementation requires unique nonce's across requests (which is the default behavior for the listen methods provided by the library). + * @implSpec This method involves an O(n) operation where n <= 50, so it is best to only call it once when needed. + */ + public Optional getListenRequest() { + return Optional.ofNullable(listenRequestSupplier.get()); + } + public boolean hasError() { return error != null && error.length() > 0; }