Skip to content

Commit

Permalink
feat: add ability to obtain listen request from pubsub response event (
Browse files Browse the repository at this point in the history
  • Loading branch information
iProdigy committed Mar 24, 2022
1 parent be942b7 commit 6e43891
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 22 deletions.
3 changes: 0 additions & 3 deletions pubsub/build.gradle.kts
Expand Up @@ -6,9 +6,6 @@ dependencies {
// Jackson (JSON)
api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")

// Annotations
api(group = "org.jetbrains", name = "annotations")

Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.github.twitch4j.pubsub.domain.PubSubRequest;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

Expand All @@ -12,6 +13,7 @@
* @see TwitchPubSub#unsubscribeFromTopic(PubSubSubscription)
*/
@RequiredArgsConstructor
@EqualsAndHashCode
public class PubSubSubscription {
@Getter(AccessLevel.PACKAGE)
private final PubSubRequest request;
Expand Down
14 changes: 12 additions & 2 deletions pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -112,7 +113,7 @@ public class TwitchPubSub implements ITwitchPubSub {
/**
* Command Queue
*/
protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(200);
protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(128);

/**
* Holds the subscribed topics in case we need to reconnect
Expand Down Expand Up @@ -748,7 +749,16 @@ public void onTextMessage(WebSocket ws, String text) {
}

} else if (message.getType().equals(PubSubType.RESPONSE)) {
eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError()));
Supplier<PubSubRequest> findListenRequest = () -> {
for (PubSubRequest topic : subscribedTopics) {
if (topic != null && StringUtils.equals(message.getNonce(), topic.getNonce())) {
return topic;
}
}
return null;
};

eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError(), findListenRequest));

// topic subscription success or failed, response to listen command
// System.out.println(message.toString());
Expand Down
@@ -1,7 +1,5 @@
package com.github.twitch4j.pubsub;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.twitch4j.common.pool.TwitchModuleConnectionPool;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
Expand All @@ -11,7 +9,6 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

/**
Expand All @@ -26,10 +23,6 @@ public class TwitchPubSubConnectionPool extends TwitchModuleConnectionPool<Twitc

private final String threadPrefix = "twitch4j-pool-" + RandomStringUtils.random(4, true, true) + "-pubsub-";

private final Cache<String, PubSubSubscription> subscriptionsByNonce = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

@Override
public PubSubSubscription listenOnTopic(PubSubRequest request) {
return this.subscribe(request);
Expand All @@ -44,10 +37,8 @@ public boolean unsubscribeFromTopic(PubSubSubscription subscription) {
public PubSubSubscription subscribe(PubSubRequest pubSubRequest) {
final int topics = getTopicCount(pubSubRequest);
if (topics <= 0) return null;
final String nonce = injectNonce(pubSubRequest);
final PubSubSubscription subscription = super.subscribe(pubSubRequest);
subscriptionsByNonce.put(nonce, subscription);
return subscription;
injectNonce(pubSubRequest);
return super.subscribe(pubSubRequest);
}

@Override
Expand All @@ -62,10 +53,8 @@ protected TwitchPubSub createConnection() {

// Reclaim topic headroom upon a failed subscription
client.getEventManager().onEvent("twitch4j-pubsub-pool-nonce-tracker", PubSubListenResponseEvent.class, e -> {
if (StringUtils.isNotEmpty(e.getNonce())) {
PubSubSubscription subscription = subscriptionsByNonce.asMap().remove(e.getNonce());
if (e.hasError() && subscription != null)
unsubscribe(subscription);
if (e.hasError()) {
e.getListenRequest().map(PubSubSubscription::new).ifPresent(this::unsubscribe);
}
});

Expand Down Expand Up @@ -104,10 +93,9 @@ protected int getSubscriptionSize(PubSubRequest pubSubRequest) {
return getTopicCount(pubSubRequest);
}

private static String injectNonce(PubSubRequest req) {
private static void injectNonce(PubSubRequest req) {
if (StringUtils.isBlank(req.getNonce()))
req.setNonce(CryptoUtils.generateNonce(30));
return req.getNonce();
}

private static int getTopicCount(PubSubRequest req) {
Expand Down
@@ -1,9 +1,16 @@
package com.github.twitch4j.pubsub.events;

import com.github.twitch4j.common.events.TwitchEvent;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.Value;

import java.util.Optional;
import java.util.function.Supplier;

@Value
@EqualsAndHashCode(callSuper = false)
public class PubSubListenResponseEvent extends TwitchEvent {
Expand All @@ -18,6 +25,19 @@ public class PubSubListenResponseEvent extends TwitchEvent {
*/
String error;

@ToString.Exclude
@Getter(AccessLevel.PRIVATE)
Supplier<PubSubRequest> listenRequestSupplier;

/**
* @return the listen request associated with this response.
* @implNote The current implementation requires unique nonce's across requests (which is the default behavior for the listen methods provided by the library).
* @implSpec This method involves an O(n) operation where n <= 50, so it is best to only call it once when needed.
*/
public Optional<PubSubRequest> getListenRequest() {
return Optional.ofNullable(listenRequestSupplier.get());
}

public boolean hasError() {
return error != null && error.length() > 0;
}
Expand Down

0 comments on commit 6e43891

Please sign in to comment.