Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to obtain listen request from pubsub response event #546

Merged
merged 3 commits into from Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions pubsub/build.gradle.kts
Expand Up @@ -6,9 +6,6 @@ dependencies {
// Jackson (JSON)
api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")

// Annotations
api(group = "org.jetbrains", name = "annotations")

Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.github.twitch4j.pubsub.domain.PubSubRequest;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

Expand All @@ -12,6 +13,7 @@
* @see TwitchPubSub#unsubscribeFromTopic(PubSubSubscription)
*/
@RequiredArgsConstructor
@EqualsAndHashCode
public class PubSubSubscription {
@Getter(AccessLevel.PACKAGE)
private final PubSubRequest request;
Expand Down
14 changes: 12 additions & 2 deletions pubsub/src/main/java/com/github/twitch4j/pubsub/TwitchPubSub.java
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -112,7 +113,7 @@ public class TwitchPubSub implements ITwitchPubSub {
/**
* Command Queue
*/
protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(200);
protected final BlockingQueue<String> commandQueue = new ArrayBlockingQueue<>(128);

/**
* Holds the subscribed topics in case we need to reconnect
Expand Down Expand Up @@ -748,7 +749,16 @@ public void onTextMessage(WebSocket ws, String text) {
}

} else if (message.getType().equals(PubSubType.RESPONSE)) {
eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError()));
Supplier<PubSubRequest> findListenRequest = () -> {
for (PubSubRequest topic : subscribedTopics) {
if (topic != null && StringUtils.equals(message.getNonce(), topic.getNonce())) {
return topic;
}
}
return null;
};

eventManager.publish(new PubSubListenResponseEvent(message.getNonce(), message.getError(), findListenRequest));

// topic subscription success or failed, response to listen command
// System.out.println(message.toString());
Expand Down
@@ -1,7 +1,5 @@
package com.github.twitch4j.pubsub;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.twitch4j.common.pool.TwitchModuleConnectionPool;
import com.github.twitch4j.common.util.CryptoUtils;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
Expand All @@ -11,7 +9,6 @@
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

/**
Expand All @@ -26,10 +23,6 @@ public class TwitchPubSubConnectionPool extends TwitchModuleConnectionPool<Twitc

private final String threadPrefix = "twitch4j-pool-" + RandomStringUtils.random(4, true, true) + "-pubsub-";

private final Cache<String, PubSubSubscription> subscriptionsByNonce = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();

@Override
public PubSubSubscription listenOnTopic(PubSubRequest request) {
return this.subscribe(request);
Expand All @@ -44,10 +37,8 @@ public boolean unsubscribeFromTopic(PubSubSubscription subscription) {
public PubSubSubscription subscribe(PubSubRequest pubSubRequest) {
final int topics = getTopicCount(pubSubRequest);
if (topics <= 0) return null;
final String nonce = injectNonce(pubSubRequest);
final PubSubSubscription subscription = super.subscribe(pubSubRequest);
subscriptionsByNonce.put(nonce, subscription);
return subscription;
injectNonce(pubSubRequest);
return super.subscribe(pubSubRequest);
}

@Override
Expand All @@ -62,10 +53,8 @@ protected TwitchPubSub createConnection() {

// Reclaim topic headroom upon a failed subscription
client.getEventManager().onEvent("twitch4j-pubsub-pool-nonce-tracker", PubSubListenResponseEvent.class, e -> {
if (StringUtils.isNotEmpty(e.getNonce())) {
PubSubSubscription subscription = subscriptionsByNonce.asMap().remove(e.getNonce());
if (e.hasError() && subscription != null)
unsubscribe(subscription);
if (e.hasError()) {
e.getListenRequest().map(PubSubSubscription::new).ifPresent(this::unsubscribe);
}
});

Expand Down Expand Up @@ -104,10 +93,9 @@ protected int getSubscriptionSize(PubSubRequest pubSubRequest) {
return getTopicCount(pubSubRequest);
}

private static String injectNonce(PubSubRequest req) {
private static void injectNonce(PubSubRequest req) {
if (StringUtils.isBlank(req.getNonce()))
req.setNonce(CryptoUtils.generateNonce(30));
return req.getNonce();
}

private static int getTopicCount(PubSubRequest req) {
Expand Down
@@ -1,9 +1,16 @@
package com.github.twitch4j.pubsub.events;

import com.github.twitch4j.common.events.TwitchEvent;
import com.github.twitch4j.pubsub.domain.PubSubRequest;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.Value;

import java.util.Optional;
import java.util.function.Supplier;

@Value
@EqualsAndHashCode(callSuper = false)
public class PubSubListenResponseEvent extends TwitchEvent {
Expand All @@ -18,6 +25,19 @@ public class PubSubListenResponseEvent extends TwitchEvent {
*/
String error;

@ToString.Exclude
@Getter(AccessLevel.PRIVATE)
Supplier<PubSubRequest> listenRequestSupplier;

/**
* @return the listen request associated with this response.
* @implNote The current implementation requires unique nonce's across requests (which is the default behavior for the listen methods provided by the library).
* @implSpec This method involves an O(n) operation where n <= 50, so it is best to only call it once when needed.
*/
public Optional<PubSubRequest> getListenRequest() {
return Optional.ofNullable(listenRequestSupplier.get());
}

public boolean hasError() {
return error != null && error.length() > 0;
}
Expand Down