Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use xanthic cache facade #618

Merged
merged 8 commits into from Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions build.gradle.kts
Expand Up @@ -24,7 +24,6 @@ allprojects {
this as StandardJavadocDocletOptions
links(
"https://javadoc.io/doc/org.jetbrains/annotations/23.0.0",
"https://javadoc.io/doc/com.github.ben-manes.caffeine/caffeine/2.9.3",
"https://javadoc.io/doc/commons-configuration/commons-configuration/1.10",
"https://javadoc.io/doc/com.github.vladimir-bukhtoyarov/bucket4j-core/7.6.0",
// "https://javadoc.io/doc/com.squareup.okhttp3/okhttp/4.9.3", // blocked by https://github.com/square/okhttp/issues/6450
Expand Down Expand Up @@ -79,9 +78,6 @@ subprojects {
// Annotations
api(group = "org.jetbrains", name = "annotations", version = "23.0.0")

// Caching
api(group = "com.github.ben-manes.caffeine", name = "caffeine", version = "2.9.3")

// Apache Commons
api(group = "commons-configuration", name = "commons-configuration", version = "1.10")

Expand Down Expand Up @@ -125,6 +121,9 @@ subprojects {
api(group = "commons-io", name = "commons-io", version = "2.11.0")
api(group = "org.apache.commons", name = "commons-lang3", version = "3.12.0")

// Cache BOM
api(platform("io.github.xanthic.cache:cache-bom:0.1.0"))

// Logging
api(group = "org.slf4j", name = "slf4j-api", version = "1.7.36")

Expand Down
2 changes: 1 addition & 1 deletion chat/build.gradle.kts
Expand Up @@ -4,7 +4,7 @@ dependencies {
api(group = "com.github.vladimir-bukhtoyarov", name = "bucket4j-core")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
39 changes: 21 additions & 18 deletions chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java
@@ -1,9 +1,5 @@
package com.github.twitch4j.chat;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.philippheuer.credentialmanager.CredentialManager;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
Expand Down Expand Up @@ -31,6 +27,9 @@
import com.github.twitch4j.util.IBackoffStrategy;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -295,9 +294,11 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
this.validateOnConnect = validateOnConnect;

// init per channel message buckets by channel name
this.bucketByChannelName = Caffeine.newBuilder()
.expireAfterAccess(Math.max(perChannelRateLimit.getRefillPeriodNanos(), Duration.ofSeconds(30L).toNanos()), TimeUnit.NANOSECONDS)
.build();
this.bucketByChannelName = CacheApi.create(spec -> {
spec.maxSize(2048L);
spec.expiryType(ExpiryType.POST_ACCESS);
spec.expiryTime(Duration.ofNanos(Math.max(perChannelRateLimit.getRefillPeriodNanos(), Duration.ofSeconds(30L).toNanos())));
});

// init connection
if (websocketConnection == null) {
Expand Down Expand Up @@ -429,11 +430,13 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
// Initialize joinAttemptsByChannelName (on an attempt expiring without explicit removal, we retry with exponential backoff)
if (maxJoinRetries > 0) {
final long initialWait = Math.max(chatJoinTimeout, 0);
this.joinAttemptsByChannelName = Caffeine.newBuilder()
.expireAfterWrite(initialWait, TimeUnit.MILLISECONDS)
.scheduler(Scheduler.forScheduledExecutorService(taskExecutor)) // required for prompt removals on java 8
.<String, Integer>evictionListener((name, attempts, cause) -> {
if (cause == RemovalCause.EXPIRED && name != null && attempts != null) {
this.joinAttemptsByChannelName = CacheApi.create(spec -> {
spec.maxSize(2048L);
spec.expiryType(ExpiryType.POST_WRITE);
spec.expiryTime(Duration.ofMillis(initialWait));
spec.executor(taskExecutor);
spec.removalListener((name, attempts, cause) -> {
if (cause.isEviction()) {
if (attempts < maxJoinRetries) {
taskExecutor.schedule(() -> {
if (currentChannels.contains(name)) {
Expand All @@ -446,14 +449,14 @@ public TwitchChat(WebsocketConnection websocketConnection, EventManager eventMan
log.warn("Chat connection exhausted retries when attempting to join channel: {}", name);
}
}
})
.build();
});
});
} else {
this.joinAttemptsByChannelName = Caffeine.newBuilder().maximumSize(0).build(); // optimization
this.joinAttemptsByChannelName = CacheApi.create(spec -> spec.maxSize(0L)); // optimization
}

// Remove successfully joined channels from joinAttemptsByChannelName (as further retries are not needed)
Consumer<AbstractChannelEvent> joinListener = e -> joinAttemptsByChannelName.invalidate(e.getChannel().getName().toLowerCase());
Consumer<AbstractChannelEvent> joinListener = e -> joinAttemptsByChannelName.remove(e.getChannel().getName().toLowerCase());
eventManager.onEvent(ChannelStateEvent.class, joinListener::accept);
eventManager.onEvent(ChannelNoticeEvent.class, joinListener::accept);
eventManager.onEvent(UserStateEvent.class, joinListener::accept);
Expand Down Expand Up @@ -696,7 +699,7 @@ protected void issueJoin(String channelName, int attempts) {
BucketUtils.scheduleAgainstBucket(ircJoinBucket, taskExecutor, () -> {
String name = channelName.toLowerCase();
queueCommand("JOIN #" + name);
joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
joinAttemptsByChannelName.merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state
});
}

Expand Down Expand Up @@ -888,7 +891,7 @@ public TMIConnectionState getConnectionState() {
}

private Bucket getChannelMessageBucket(@NotNull String channelName) {
return bucketByChannelName.get(channelName.toLowerCase(), k -> BucketUtils.createBucket(perChannelRateLimit));
return bucketByChannelName.computeIfAbsent(channelName.toLowerCase(), k -> BucketUtils.createBucket(perChannelRateLimit));
}

}
26 changes: 13 additions & 13 deletions chat/src/test/java/com/github/twitch4j/chat/ChatJoinRetryTest.java
Expand Up @@ -57,13 +57,13 @@ public void testChannelJoinSuccess() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a successful join
twitchChat.getEventManager().publish(testIRCMessageEvent("@emote-only=0;followers-only=-1;r9k=0;rituals=0;room-id=149223493;slow=0;subs-only=0 :tmi.twitch.tv ROOMSTATE #"+FAKE_CHANNEL_NAME));

// should be gone from joinAttemptsByChannelName after successful join
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertTrue(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should remain in currentChannels after successful join");

// cleanup
Expand All @@ -82,11 +82,11 @@ public void testChannelJoinFailedByExpire() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// should see a ChannelRemovedPostJoinFailureEvent and entry should have expired
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.RETRIES_EXHAUSTED));
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after it expired");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after it expired");
Assertions.assertFalse(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should be gone from currentChannels after retries have been exhausted");

// cleanup
Expand All @@ -105,16 +105,16 @@ public void testChannelJoinFailedBeforeSuccess() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// wait for 2 failed join attempts
await().until(() -> Integer.valueOf(2).equals(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME)));
await().until(() -> Integer.valueOf(2).equals(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME)));

// fake a successful join
twitchChat.getEventManager().publish(testIRCMessageEvent("@emote-only=0;followers-only=-1;r9k=0;rituals=0;room-id=149223493;slow=0;subs-only=0 :tmi.twitch.tv ROOMSTATE #"+FAKE_CHANNEL_NAME));

// should be gone from joinAttemptsByChannelName after successful join
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after successful join");
Assertions.assertTrue(twitchChat.currentChannels.contains(FAKE_CHANNEL_NAME), "channel should remain in currentChannels after successful join");

// cleanup
Expand All @@ -133,13 +133,13 @@ public void testChannelJoinFailedByBannedUser() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=msg_banned :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :You are permanently banned from talking in "+FAKE_CHANNEL_NAME+"."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the ban notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the ban notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.USER_BANNED));
Expand All @@ -161,13 +161,13 @@ public void testChannelJoinFailedChannelSuspended() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=msg_channel_suspended :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :This channel has been suspended."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.CHANNEL_SUSPENDED));
Expand All @@ -189,13 +189,13 @@ public void testChannelJoinFailedTOS() {
TestUtils.sleepFor(50);

// check that we kept track of the join attempt in joinAttemptsByChannelName
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");
Assertions.assertNotNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be in joinAttemptsByChannelName while the join attempt is in an unknown state");

// fake a banned notice
twitchChat.getEventManager().publish(testIRCMessageEvent("@msg-id=tos_ban :tmi.twitch.tv NOTICE #"+FAKE_CHANNEL_NAME+" :The community has closed channel "+FAKE_CHANNEL_NAME+" due to Terms of Service violations."));

// should be gone from joinAttemptsByChannelName because of the ban notice
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.getIfPresent(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");
Assertions.assertNull(twitchChat.joinAttemptsByChannelName.get(FAKE_CHANNEL_NAME), "channel should be gone from joinAttemptsByChannelName after the channel suspension notice");

// should have received an event about the removal of the channel
verify(twitchChat.getEventManager(), timeout(30_000)).publish(new ChannelJoinFailureEvent(FAKE_CHANNEL_NAME, ChannelJoinFailureEvent.Reason.CHANNEL_SUSPENDED));
Expand Down
2 changes: 1 addition & 1 deletion eventsub-common/build.gradle.kts
Expand Up @@ -5,7 +5,7 @@ dependencies {
api(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310")

// Cache
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
@@ -1,8 +1,9 @@
package com.github.twitch4j.eventsub.util;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.twitch4j.common.util.CryptoUtils;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -31,7 +32,11 @@ public class EventSubVerifier {
/**
* The Twitch-Eventsub-Message-Id's that have been observed during {@link #RECENT_EVENT}
*/
private final Cache<String, Boolean> RECENT_MESSAGE_IDS = Caffeine.newBuilder().expireAfterWrite(RECENT_EVENT).build();
private final Cache<String, Boolean> RECENT_MESSAGE_IDS = CacheApi.create(spec -> {
spec.expiryType(ExpiryType.POST_WRITE);
spec.expiryTime(RECENT_EVENT);
spec.maxSize(65_536L);
});

/**
* Twitch's prefix for Twitch-Eventsub-Message-Signature
Expand Down Expand Up @@ -64,7 +69,7 @@ public class EventSubVerifier {
* @return whether the message id has not been observed recently
*/
public boolean verifyMessageId(String messageId) {
return messageId != null && !messageId.isEmpty() && RECENT_MESSAGE_IDS.asMap().putIfAbsent(messageId, Boolean.TRUE) == null;
return messageId != null && !messageId.isEmpty() && RECENT_MESSAGE_IDS.putIfAbsent(messageId, Boolean.TRUE) == null;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion graphql/build.gradle.kts
Expand Up @@ -13,7 +13,7 @@ dependencies {
api(group = "com.netflix.hystrix", name = "hystrix-core")

// Caching
api(group = "com.github.ben-manes.caffeine", name = "caffeine")
api(group = "io.github.xanthic.cache", name = "cache-provider-caffeine")

// Twitch4J Modules
api(project(":twitch4j-common"))
Expand Down
Expand Up @@ -2,8 +2,6 @@

import com.apollographql.apollo.ApolloClient;
import com.apollographql.apollo.internal.batch.BatchConfig;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.philippheuer.events4j.core.EventManager;
import com.github.twitch4j.common.annotation.Unofficial;
Expand All @@ -16,10 +14,14 @@
import com.github.twitch4j.graphql.internal.type.UnbanRequestStatus;
import com.github.twitch4j.graphql.internal.type.UnbanRequestsSortOrder;
import com.github.twitch4j.graphql.internal.type.UpdateCommunityPointsCommunityGoalInput;
import io.github.xanthic.cache.api.Cache;
import io.github.xanthic.cache.api.domain.ExpiryType;
import io.github.xanthic.cache.core.CacheApi;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import okhttp3.Request;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -97,7 +99,11 @@ public TwitchGraphQL(String baseUrl, String userAgent, EventManager eventManager
this.proxyConfig = proxyConfig;
this.batchingEnabled = batchingEnabled;
this.timeout = timeout;
this.clientsByCredential = Caffeine.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).build();
this.clientsByCredential = CacheApi.create(spec -> {
spec.maxSize(64L);
spec.expiryType(ExpiryType.POST_ACCESS);
spec.expiryTime(Duration.ofMinutes(5L));
});
}

/**
Expand All @@ -109,7 +115,7 @@ public TwitchGraphQL(String baseUrl, String userAgent, EventManager eventManager
private ApolloClient getApolloClient(OAuth2Credential credential) {
if (credential == null) credential = defaultToken;
final String accessToken = credential != null && credential.getAccessToken() != null ? credential.getAccessToken() : "";
return clientsByCredential.get(accessToken, s -> {
return clientsByCredential.computeIfAbsent(accessToken, s -> {
// Http Client
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
.callTimeout(timeout, TimeUnit.MILLISECONDS)
Expand Down