From 5b37927584afc8ba032769c1a23769866e22d6fc Mon Sep 17 00:00:00 2001 From: Sidd Date: Fri, 4 Mar 2022 16:15:08 -0800 Subject: [PATCH 1/3] feat: add PubSubListenResponseEvent#getListenRequest --- .../github/twitch4j/pubsub/TwitchPubSub.java | 12 +++++++++++- .../events/PubSubListenResponseEvent.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index 56d39befd..9712a0314 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -43,6 +43,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -748,7 +749,16 @@ public void onTextMessage(WebSocket ws, String text) { } } else if (message.getType().equals(PubSubType.RESPONSE)) { - eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError())); + Supplier findListenRequest = () -> { + for (PubSubRequest topic : subscribedTopics) { + if (topic != null && StringUtils.equals(message.getNonce(), topic.getNonce())) { + return topic; + } + } + return null; + }; + + eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError(), findListenRequest)); // topic subscription success or failed, response to listen command // System.out.println(message.toString()); diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java index 034a3816f..c4abcd385 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java @@ -1,9 +1,15 @@ package com.github.twitch4j.pubsub.events; import com.github.twitch4j.common.events.TwitchEvent; +import com.github.twitch4j.pubsub.domain.PubSubRequest; +import lombok.AccessLevel; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.Value; +import java.util.Optional; +import java.util.function.Supplier; + @Value @EqualsAndHashCode(callSuper = false) public class PubSubListenResponseEvent extends TwitchEvent { @@ -18,6 +24,18 @@ public class PubSubListenResponseEvent extends TwitchEvent { */ String error; + @Getter(AccessLevel.PRIVATE) + Supplier listenRequestSupplier; + + /** + * @return the listen request associated with this response. + * @implNote The current implementation requires unique nonce's across requests (which is the default behavior for the listen methods provided by the library). + * @implSpec This method involves an O(n) operation where n <= 50, so it is best to only call it once when needed. + */ + public Optional getListenRequest() { + return Optional.ofNullable(listenRequestSupplier.get()); + } + public boolean hasError() { return error != null && error.length() > 0; } From b5ca0956f02709e56f91d80b440b107a9cdf2ad7 Mon Sep 17 00:00:00 2001 From: Sidd Date: Fri, 4 Mar 2022 16:49:09 -0800 Subject: [PATCH 2/3] refactor: utilize getListenRequest in TwitchPubSubConnectionPool --- pubsub/build.gradle.kts | 3 --- .../twitch4j/pubsub/PubSubSubscription.java | 2 ++ .../github/twitch4j/pubsub/TwitchPubSub.java | 2 +- .../pubsub/TwitchPubSubConnectionPool.java | 22 +++++-------------- 4 files changed, 8 insertions(+), 21 deletions(-) diff --git a/pubsub/build.gradle.kts b/pubsub/build.gradle.kts index 6a71820a3..2cefce134 100644 --- a/pubsub/build.gradle.kts +++ b/pubsub/build.gradle.kts @@ -6,9 +6,6 @@ dependencies { // Jackson (JSON) api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310") - // Cache - api(group = "com.github.ben-manes.caffeine", name = "caffeine") - // Annotations api(group = "org.jetbrains", name = "annotations") diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java index 56a4cabdd..871ab607a 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/PubSubSubscription.java @@ -2,6 +2,7 @@ import com.github.twitch4j.pubsub.domain.PubSubRequest; import lombok.AccessLevel; +import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -12,6 +13,7 @@ * @see TwitchPubSub#unsubscribeFromTopic(PubSubSubscription) */ @RequiredArgsConstructor +@EqualsAndHashCode public class PubSubSubscription { @Getter(AccessLevel.PACKAGE) private final PubSubRequest request; diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java index 9712a0314..ca4bb39e5 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java @@ -113,7 +113,7 @@ public class TwitchPubSub implements ITwitchPubSub { /** * Command Queue */ - protected final BlockingQueue commandQueue = new ArrayBlockingQueue<>(200); + protected final BlockingQueue commandQueue = new ArrayBlockingQueue<>(128); /** * Holds the subscribed topics in case we need to reconnect diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java index 2a301b70d..51d2065a4 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSubConnectionPool.java @@ -1,7 +1,5 @@ package com.github.twitch4j.pubsub; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.github.twitch4j.common.pool.TwitchModuleConnectionPool; import com.github.twitch4j.common.util.CryptoUtils; import com.github.twitch4j.pubsub.domain.PubSubRequest; @@ -11,7 +9,6 @@ import org.apache.commons.lang3.StringUtils; import java.util.Collection; -import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; /** @@ -26,10 +23,6 @@ public class TwitchPubSubConnectionPool extends TwitchModuleConnectionPool subscriptionsByNonce = Caffeine.newBuilder() - .expireAfterWrite(30, TimeUnit.SECONDS) - .build(); - @Override public PubSubSubscription listenOnTopic(PubSubRequest request) { return this.subscribe(request); @@ -44,10 +37,8 @@ public boolean unsubscribeFromTopic(PubSubSubscription subscription) { public PubSubSubscription subscribe(PubSubRequest pubSubRequest) { final int topics = getTopicCount(pubSubRequest); if (topics <= 0) return null; - final String nonce = injectNonce(pubSubRequest); - final PubSubSubscription subscription = super.subscribe(pubSubRequest); - subscriptionsByNonce.put(nonce, subscription); - return subscription; + injectNonce(pubSubRequest); + return super.subscribe(pubSubRequest); } @Override @@ -62,10 +53,8 @@ protected TwitchPubSub createConnection() { // Reclaim topic headroom upon a failed subscription client.getEventManager().onEvent("twitch4j-pubsub-pool-nonce-tracker", PubSubListenResponseEvent.class, e -> { - if (StringUtils.isNotEmpty(e.getNonce())) { - PubSubSubscription subscription = subscriptionsByNonce.asMap().remove(e.getNonce()); - if (e.hasError() && subscription != null) - unsubscribe(subscription); + if (e.hasError()) { + e.getListenRequest().map(PubSubSubscription::new).ifPresent(this::unsubscribe); } }); @@ -104,10 +93,9 @@ protected int getSubscriptionSize(PubSubRequest pubSubRequest) { return getTopicCount(pubSubRequest); } - private static String injectNonce(PubSubRequest req) { + private static void injectNonce(PubSubRequest req) { if (StringUtils.isBlank(req.getNonce())) req.setNonce(CryptoUtils.generateNonce(30)); - return req.getNonce(); } private static int getTopicCount(PubSubRequest req) { From 7b9a1cccd813982b737a7124447d7b985982566b Mon Sep 17 00:00:00 2001 From: Sidd Date: Tue, 22 Mar 2022 21:45:44 -0700 Subject: [PATCH 3/3] refactor: remove listenRequestSupplier from toString --- .../twitch4j/pubsub/events/PubSubListenResponseEvent.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java index c4abcd385..358d70c4d 100644 --- a/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java +++ b/pubsub/src/main/java/com/github/twitch4j/pubsub/events/PubSubListenResponseEvent.java @@ -5,6 +5,7 @@ import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.ToString; import lombok.Value; import java.util.Optional; @@ -24,6 +25,7 @@ public class PubSubListenResponseEvent extends TwitchEvent { */ String error; + @ToString.Exclude @Getter(AccessLevel.PRIVATE) Supplier listenRequestSupplier;