Skip to content

Commit

Permalink
refactor: use xanthic cache facade (#618)
Browse files Browse the repository at this point in the history
* refactor(chat): switch to xanthic

* refactor(eventsub): switch to xanthic

* refactor(gql): switch to xanthic

* refactor(helix): switch to xanthic

* refactor(twitch4j): switch to xanthic

* chore: use xanthic bom

* chore(deps): update xanthic to 0.1.0 stable
  • Loading branch information
iProdigy committed Aug 9, 2022
1 parent 8b79f45 commit 48e1917
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 123 deletions.
7 changes: 3 additions & 4 deletions build.gradle.kts
Expand Up @@ -24,7 +24,6 @@ allprojects {
this as StandardJavadocDocletOptions
links(
"https://javadoc.io/doc/org.jetbrains/annotations/23.0.0",
"https://javadoc.io/doc/com.github.ben-manes.caffeine/caffeine/2.9.3",
"https://javadoc.io/doc/commons-configuration/commons-configuration/1.10",
"https://javadoc.io/doc/com.github.vladimir-bukhtoyarov/bucket4j-core/7.6.0",
// "https://javadoc.io/doc/com.squareup.okhttp3/okhttp/4.9.3", // blocked by https://github.com/square/okhttp/issues/6450
Expand Down Expand Up @@ -79,9 +78,6 @@ subprojects {
// Annotations
api(group = "org.jetbrains", name = "annotations", version = "23.0.0")

// Caching
api(group = "com.github.ben-manes.caffeine", name = "caffeine", version = "2.9.3")

// Apache Commons
api(group = "commons-configuration", name = "commons-configuration", version = "1.10")

Expand Down Expand Up @@ -125,6 +121,9 @@ subprojects {
api(group = "commons-io", name = "commons-io", version = "2.11.0")
api(group = "org.apache.commons", name = "commons-lang3", version = "3.12.0")

// Cache BOM
api(platform("io.github.xanthic.cache:cache-bom:0.1.0"))

// Logging
api(group = "org.slf4j", name = "slf4j-api", version = "1.7.36")

Expand Down
2 changes: 1 addition & 1 deletion chat/build.gradle.kts
Expand Up @@ -4,7 +4,7 @@ dependencies {
api(group = "com.github.vladimir-bukhtoyarov", name = "bucket4j-core")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
39 changes: 21 additions & 18 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
@@ -1,9 +1,5 @@
package com.github.twitch4j.chat;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.philippheuer.credentialmanager.CredentialManager;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
Expand Down Expand Up @@ -31,6 +27,9 @@
import com.github.twitch4j.util.IBackoffStrategy;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -295,9 +294,11 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.validateOnConnect = validateOnConnect;

// init per channel message buckets by channel name
this.bucketByChannelName = Caffeine.newBuilder()
.expireAfterAccess(Math.max(perChannelRateLimit.getRefillPeriodNanos(), Duration.ofSeconds(30L).toNanos()), TimeUnit.NANOSECONDS)
.build();
this.bucketByChannelName = CacheApi.create(spec -> {
spec.maxSize(2048L);
spec.expiryType(ExpiryType.POST_ACCESS);
spec.expiryTime(Duration.ofNanos(Math.max(perChannelRateLimit.getRefillPeriodNanos(), Duration.ofSeconds(30L).toNanos())));
});

// init connection
if (websocketConnection == null) {
Expand Down Expand Up @@ -429,11 +430,13 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
// Initialize joinAttemptsByChannelName (on an attempt expiring without explicit removal, we retry with exponential backoff)
if (maxJoinRetries > 0) {
final long initialWait = Math.max(chatJoinTimeout, 0);
this.joinAttemptsByChannelName = Caffeine.newBuilder()
.expireAfterWrite(initialWait, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.forScheduledExecutorService(taskExecutor)) // required for prompt removals on java 8
.<String, Integer>evictionListener((name, attempts, cause) -> {
if (cause == RemovalCause.EXPIRED && name != null && attempts != null) {
this.joinAttemptsByChannelName = CacheApi.create(spec -> {
spec.maxSize(2048L);
spec.expiryType(ExpiryType.POST_WRITE);
spec.expiryTime(Duration.ofMillis(initialWait));
spec.executor(taskExecutor);
spec.removalListener((name, attempts, cause) -> {
if (cause.isEviction()) {
if (attempts < maxJoinRetries) {
taskExecutor.schedule(() -> {
if (currentChannels.contains(name)) {
Expand All @@ -446,14 +449,14 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
log.warn("Chat connection exhausted retries when attempting to join channel: {}", name);
}
}
})
.build();
});
});
} else {
this.joinAttemptsByChannelName = Caffeine.newBuilder().maximumSize(0).build(); // optimization
this.joinAttemptsByChannelName = CacheApi.create(spec -> spec.maxSize(0L)); // optimization
}

// Remove successfully joined channels from joinAttemptsByChannelName (as further retries are not needed)
Consumer<AbstractChannelEvent> joinListener = e -> joinAttemptsByChannelName.invalidate(e.getChannel().getName().toLowerCase());
Consumer<AbstractChannelEvent> joinListener = e -> joinAttemptsByChannelName.remove(e.getChannel().getName().toLowerCase());
eventManager.onEvent(ChannelStateEvent.class, joinListener::accept);
eventManager.onEvent(ChannelNoticeEvent.class, joinListener::accept);
eventManager.onEvent(UserStateEvent.class, joinListener::accept);
Expand Down Expand Up @@ -696,7 +699,7 @@ protected void issueJoin(String channelName, int attempts) {
BucketUtils.scheduleAgainstBucket(ircJoinBucket, taskExecutor, () -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
joinAttemptsByChannelName.merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
});
}

Expand Down Expand Up @@ -888,7 +891,7 @@ public TMIConnectionState getConnectionState() {
}

private Bucket getChannelMessageBucket(@NotNull String channelName) {
return bucketByChannelName.get(channelName.toLowerCase(), k -> BucketUtils.createBucket(perChannelRateLimit));
return bucketByChannelName.computeIfAbsent(channelName.toLowerCase(), k -> BucketUtils.createBucket(perChannelRateLimit));
}

}
26 changes: 13 additions & 13 deletions chat/src/test/java/com/github/twitch4j/chat/ChatJoinRetryTest.java
Expand Up @@ -57,13 +57,13 @@ public void testChannelJoinSuccess() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a successful join
twitchChat.getEventManager().publish(testIRCMessageEvent("@emote-only=0;followers-only=-1;r9k=0;rituals=0;room-id=149223493;slow=0;subs-only=0 :tmi.twitch.tv ROOMSTATE #"+FAKE_CHANNEL_NAME));

// should be gone from joinAttemptsByChannelName after successful join
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertTrue(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should remain in currentChannels after successful join");

// cleanup
Expand All @@ -82,11 +82,11 @@ public void testChannelJoinFailedByExpire() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// should see a ChannelRemovedPostJoinFailureEvent and entry should have expired
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.RETRIES_EXHAUSTED));
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after it expired");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after it expired");
Assertions.assertFalse(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should be gone from currentChannels after retries have been exhausted");

// cleanup
Expand All @@ -105,16 +105,16 @@ public void testChannelJoinFailedBeforeSuccess() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// wait for 2 failed join attempts
await().until(() -> Integer.valueOf(2).equals(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME)));
await().until(() -> Integer.valueOf(2).equals(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME)));

// fake a successful join
twitchChat.getEventManager().publish(testIRCMessageEvent("@emote-only=0;followers-only=-1;r9k=0;rituals=0;room-id=149223493;slow=0;subs-only=0 :tmi.twitch.tv ROOMSTATE #"+FAKE_CHANNEL_NAME));

// should be gone from joinAttemptsByChannelName after successful join
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertTrue(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should remain in currentChannels after successful join");

// cleanup
Expand All @@ -133,13 +133,13 @@ public void testChannelJoinFailedByBannedUser() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=msg_banned :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :You are permanently banned from talking in "+FAKE_CHANNEL_NAME+"."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the ban notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the ban notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.USER_BANNED));
Expand All @@ -161,13 +161,13 @@ public void testChannelJoinFailedChannelSuspended() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=msg_channel_suspended :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :This channel has been suspended."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.CHANNEL_SUSPENDED));
Expand All @@ -189,13 +189,13 @@ public void testChannelJoinFailedTOS() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=tos_ban :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :The community has closed channel "+FAKE_CHANNEL_NAME+" due to Terms of Service violations."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.CHANNEL_SUSPENDED));
Expand Down
2 changes: 1 addition & 1 deletion eventsub-common/build.gradle.kts
Expand Up @@ -5,7 +5,7 @@ dependencies {
api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
@@ -1,8 +1,9 @@
package com.github.twitch4j.eventsub.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.twitch4j.common.util.CryptoUtils;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -31,7 +32,11 @@ public class EventSubVerifier {
/**
* The Twitch-Eventsub-Message-Id's that have been observed during {@link #RECENT_EVENT}
*/
private final Cache<String, Boolean> RECENT_MESSAGE_IDS = Caffeine.newBuilder().expireAfterWrite(RECENT_EVENT).build();
private final Cache<String, Boolean> RECENT_MESSAGE_IDS = CacheApi.create(spec -> {
spec.expiryType(ExpiryType.POST_WRITE);
spec.expiryTime(RECENT_EVENT);
spec.maxSize(65_536L);
});

/**
* Twitch's prefix for Twitch-Eventsub-Message-Signature
Expand Down Expand Up @@ -64,7 +69,7 @@ public class EventSubVerifier {
* @return whether the message id has not been observed recently
*/
public boolean verifyMessageId(String messageId) {
return messageId != null && !messageId.isEmpty() && RECENT_MESSAGE_IDS.asMap().putIfAbsent(messageId, Boolean.TRUE) == null;
return messageId != null && !messageId.isEmpty() && RECENT_MESSAGE_IDS.putIfAbsent(messageId, Boolean.TRUE) == null;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion graphql/build.gradle.kts
Expand Up @@ -13,7 +13,7 @@ dependencies {
api(group = "com.netflix.hystrix", name = "hystrix-core")

// Caching
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
Expand Up @@ -2,8 +2,6 @@

import com.apollographql.apollo.ApolloClient;
import com.apollographql.apollo.internal.batch.BatchConfig;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.common.annotation.Unofficial;
Expand All @@ -16,10 +14,14 @@
import com.github.twitch4j.graphql.internal.type.UnbanRequestStatus;
import com.github.twitch4j.graphql.internal.type.UnbanRequestsSortOrder;
import com.github.twitch4j.graphql.internal.type.UpdateCommunityPointsCommunityGoalInput;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -97,7 +99,11 @@ public TwitchGraphQL(String baseUrl, String userAgent, EventManager eventManager
this.proxyConfig = proxyConfig;
this.batchingEnabled = batchingEnabled;
this.timeout = timeout;
this.clientsByCredential = Caffeine.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
this.clientsByCredential = CacheApi.create(spec -> {
spec.maxSize(64L);
spec.expiryType(ExpiryType.POST_ACCESS);
spec.expiryTime(Duration.ofMinutes(5L));
});
}

/**
Expand All @@ -109,7 +115,7 @@ public TwitchGraphQL(String baseUrl, String userAgent, EventManager eventManager
private ApolloClient getApolloClient(OAuth2Credential credential) {
if (credential == null) credential = defaultToken;
final String accessToken = credential != null && credential.getAccessToken() != null ? credential.getAccessToken() : "";
return clientsByCredential.get(accessToken, s -> {
return clientsByCredential.computeIfAbsent(accessToken, s -> {
// Http Client
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
.callTimeout(timeout, TimeUnit.MILLISECONDS)
Expand Down

0 comments on commit 48e1917

Please sign in to comment.