From a1707a9e85be6c1cdd2a1570fdd30ed46fed2e83 Mon Sep 17 00:00:00 2001 From: Sidd Date: Wed, 4 May 2022 10:42:22 -0700 Subject: [PATCH] feat: comply with undocumented helix rate limits (#561) * feat: allow broad helix bandwidth customization * feat: stricter limit for helix ban and unban user * refactor: simplify request channel id extraction * feat: deplete ban bucket on 429 * feat: implement createClip rate limit * feat: implement blocked terms rate limit * refactor: move util methods to common module * refactor: create TwitchHelixRateLimitTracker * refactor: create TwitchHelixTokenManager * fix: ensure error response body stream is closed * fix: comply with slower-than-documented helix refill rate --- .../com/github/twitch4j/chat/TwitchChat.java | 30 ++-- .../twitch4j/chat/TwitchChatBuilder.java | 9 +- .../chat/util/TwitchChatLimitHelper.java | 34 ++-- .../twitch4j/common/util/BucketUtils.java | 91 +++++++++++ .../twitch4j/common/util/SneakySupplier.java | 32 ++++ .../common/util/TwitchLimitRegistry.java | 11 +- .../util/TwitchExtensionsErrorDecoder.java | 10 +- .../twitch4j/helix/TwitchHelixBuilder.java | 31 +++- .../helix/TwitchHelixErrorDecoder.java | 34 ++-- .../helix/domain/SendPubSubMessageInput.java | 2 +- .../TwitchHelixClientIdInterceptor.java | 135 ++-------------- .../helix/interceptor/TwitchHelixDecoder.java | 28 +++- .../interceptor/TwitchHelixHttpClient.java | 95 ++++++++--- .../TwitchHelixRateLimitTracker.java | 150 ++++++++++++++++++ .../interceptor/TwitchHelixTokenManager.java | 120 ++++++++++++++ .../kraken/TwitchKrakenErrorDecoder.java | 16 +- .../TwitchMessagingInterfaceErrorDecoder.java | 10 +- 17 files changed, 612 insertions(+), 226 deletions(-) create mode 100644 common/src/main/java/com/github/twitch4j/common/util/BucketUtils.java create mode 100644 common/src/main/java/com/github/twitch4j/common/util/SneakySupplier.java create mode 100644 rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixRateLimitTracker.java create mode 100644 rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixTokenManager.java diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java index 5ebb40e63..45128d35a 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChat.java @@ -21,6 +21,7 @@ import com.github.twitch4j.chat.events.channel.IRCMessageEvent; import com.github.twitch4j.chat.events.channel.UserStateEvent; import com.github.twitch4j.common.config.ProxyConfig; +import com.github.twitch4j.common.util.BucketUtils; import com.github.twitch4j.common.util.CryptoUtils; import com.github.twitch4j.common.util.EscapeUtils; import com.github.twitch4j.common.util.ExponentialBackoffStrategy; @@ -674,7 +675,7 @@ protected void sendCommand(String command, String... args) { * @param command raw irc command */ public boolean sendRaw(String command) { - return ircMessageBucket.asScheduler().consume(1, taskExecutor).thenRunAsync(() -> queueCommand(command), taskExecutor) != null; + return BucketUtils.scheduleAgainstBucket(ircMessageBucket, taskExecutor, () -> queueCommand(command)) != null; } /** @@ -750,14 +751,11 @@ private void issueJoin(String channelName) { } protected void issueJoin(String channelName, int attempts) { - ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync( - () -> { - String name = channelName.toLowerCase(); - queueCommand("JOIN #" + name); - joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state - }, - taskExecutor - ); + BucketUtils.scheduleAgainstBucket(ircJoinBucket, taskExecutor, () -> { + String name = channelName.toLowerCase(); + queueCommand("JOIN #" + name); + joinAttemptsByChannelName.asMap().merge(name, attempts, Math::max); // mark that a join has been initiated to track later success or failure state + }); } /** @@ -790,9 +788,10 @@ public boolean leaveChannel(String channelName) { } private void issuePart(String channelName) { - ircJoinBucket.asScheduler().consume(1, taskExecutor).thenRunAsync( - () -> queueCommand("PART #" + channelName.toLowerCase()), - taskExecutor + BucketUtils.scheduleAgainstBucket( + ircJoinBucket, + taskExecutor, + () -> queueCommand("PART #" + channelName.toLowerCase()) ); } @@ -833,9 +832,10 @@ public boolean sendMessage(String channel, String message, Map t */ public void sendPrivateMessage(String targetUser, String message) { log.debug("Adding private message for user [{}] with content [{}] to the queue.", targetUser, message); - ircWhisperBucket.asScheduler().consume(1, taskExecutor).thenRunAsync( - () -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message)), - taskExecutor + BucketUtils.scheduleAgainstBucket( + ircWhisperBucket, + taskExecutor, + () -> queueCommand(String.format("PRIVMSG #%s :/w %s %s", chatCredential.getUserName().toLowerCase(), targetUser, message)) ); } diff --git a/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java b/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java index 197f4fa43..59a884079 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java +++ b/chat/src/main/java/com/github/twitch4j/chat/TwitchChatBuilder.java @@ -12,6 +12,7 @@ import com.github.twitch4j.common.config.ProxyConfig; import com.github.twitch4j.common.config.Twitch4JGlobal; import com.github.twitch4j.common.enums.TwitchLimitType; +import com.github.twitch4j.common.util.BucketUtils; import com.github.twitch4j.common.util.EventManagerUtils; import com.github.twitch4j.common.util.ThreadUtils; import com.github.twitch4j.common.util.TwitchLimitRegistry; @@ -269,16 +270,16 @@ public TwitchChat build() { } if (ircMessageBucket == null) - ircMessageBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit)); + ircMessageBucket = userId == null ? BucketUtils.createBucket(this.chatRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_MESSAGE_LIMIT, Collections.singletonList(chatRateLimit)); if (ircWhisperBucket == null) - ircWhisperBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit)); + ircWhisperBucket = userId == null ? BucketUtils.createBucket(this.whisperRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_WHISPER_LIMIT, Arrays.asList(whisperRateLimit)); if (ircJoinBucket == null) - ircJoinBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit)); + ircJoinBucket = userId == null ? BucketUtils.createBucket(this.joinRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_JOIN_LIMIT, Collections.singletonList(joinRateLimit)); if (ircAuthBucket == null) - ircAuthBucket = userId == null ? TwitchChatLimitHelper.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit)); + ircAuthBucket = userId == null ? BucketUtils.createBucket(this.authRateLimit) : TwitchLimitRegistry.getInstance().getOrInitializeBucket(userId, TwitchLimitType.CHAT_AUTH_LIMIT, Collections.singletonList(authRateLimit)); log.debug("TwitchChat: Initializing Module ..."); return new TwitchChat(this.eventManager, this.credentialManager, this.chatAccount, this.baseUrl, this.sendCredentialToThirdPartyHost, this.commandPrefixes, this.chatQueueSize, this.ircMessageBucket, this.ircWhisperBucket, this.ircJoinBucket, this.ircAuthBucket, this.scheduledThreadPoolExecutor, this.chatQueueTimeout, this.proxyConfig, this.autoJoinOwnChannel, this.enableMembershipEvents, this.botOwnerIds, this.removeChannelOnJoinFailure, this.maxJoinRetries, this.chatJoinTimeout, this.wsPingPeriod); diff --git a/chat/src/main/java/com/github/twitch4j/chat/util/TwitchChatLimitHelper.java b/chat/src/main/java/com/github/twitch4j/chat/util/TwitchChatLimitHelper.java index c7acdae92..25035ac9e 100644 --- a/chat/src/main/java/com/github/twitch4j/chat/util/TwitchChatLimitHelper.java +++ b/chat/src/main/java/com/github/twitch4j/chat/util/TwitchChatLimitHelper.java @@ -1,9 +1,9 @@ package com.github.twitch4j.chat.util; import com.github.twitch4j.common.enums.TwitchLimitType; +import com.github.twitch4j.common.util.BucketUtils; import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bucket; -import io.github.bucket4j.local.LocalBucketBuilder; import lombok.experimental.UtilityClass; import java.time.Duration; @@ -103,24 +103,34 @@ public class TwitchChatLimitHelper { */ public final Bandwidth VERIFIED_AUTH_LIMIT = Bandwidth.simple(200, Duration.ofSeconds(10)).withId(AUTH_BANDWIDTH_ID); + /** + * @param limit bandwidth + * @return bucket + * @deprecated in favor of BucketUtils + */ + @Deprecated public Bucket createBucket(Bandwidth limit) { - return Bucket.builder().addLimit(limit).build(); + return BucketUtils.createBucket(limit); } + /** + * @param limits bandwidths + * @return bucket + * @deprecated in favor of BucketUtils + */ + @Deprecated public Bucket createBucket(Bandwidth... limits) { - LocalBucketBuilder builder = Bucket.builder(); - for (Bandwidth limit : limits) { - builder.addLimit(limit); - } - return builder.build(); + return BucketUtils.createBucket(limits); } + /** + * @param limits bandwidths + * @return bucket + * @deprecated in favor of BucketUtils + */ + @Deprecated public Bucket createBucket(Iterable limits) { - LocalBucketBuilder builder = Bucket.builder(); - for (Bandwidth limit : limits) { - builder.addLimit(limit); - } - return builder.build(); + return BucketUtils.createBucket(limits); } } diff --git a/common/src/main/java/com/github/twitch4j/common/util/BucketUtils.java b/common/src/main/java/com/github/twitch4j/common/util/BucketUtils.java new file mode 100644 index 000000000..f28162235 --- /dev/null +++ b/common/src/main/java/com/github/twitch4j/common/util/BucketUtils.java @@ -0,0 +1,91 @@ +package com.github.twitch4j.common.util; + +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import io.github.bucket4j.local.LocalBucketBuilder; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +public class BucketUtils { + + /** + * Creates a bucket with the specified bandwidth. + * + * @param limit the bandwidth + * @return the bucket + */ + @NotNull + public static Bucket createBucket(@NotNull Bandwidth limit) { + return Bucket.builder().addLimit(limit).build(); + } + + /** + * Creates a bucket with the specified bandwidths. + * + * @param limits the bandwidths + * @return the bucket + */ + @NotNull + public static Bucket createBucket(@NotNull Bandwidth... limits) { + LocalBucketBuilder builder = Bucket.builder(); + for (Bandwidth limit : limits) { + builder.addLimit(limit); + } + return builder.build(); + } + + /** + * Creates a bucket with the specified bandwidths. + * + * @param limits the bandwidths + * @return the bucket + */ + @NotNull + public static Bucket createBucket(@NotNull Iterable limits) { + LocalBucketBuilder builder = Bucket.builder(); + for (Bandwidth limit : limits) { + builder.addLimit(limit); + } + return builder.build(); + } + + /** + * Performs the callable after a token has been consumed from the bucket using the executor. + *

+ * Note: ExecutionException should be inspected if the passed action can throw an exception. + * + * @param bucket rate limit bucket + * @param executor scheduled executor service for async calls + * @param call task that requires a bucket point + * @return the future result of the call + */ + @NotNull + public static CompletableFuture scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Callable call) { + if (bucket.tryConsume(1L)) + return CompletableFuture.supplyAsync(new SneakySupplier<>(call)); + + return bucket.asScheduler().consume(1L, executor).thenApplyAsync(v -> new SneakySupplier<>(call).get()); + } + + /** + * Runs the action after a token has been consumed from the bucket using the executor. + *

+ * Note: while the executor is used to consume the bucket token, the action is performed on the fork-join common pool, by default. + * + * @param bucket rate limit bucket + * @param executor scheduled executor service for async calls + * @param action runnable that requires a bucket point + * @return a future to track completion progress + */ + @NotNull + public static CompletableFuture scheduleAgainstBucket(@NotNull Bucket bucket, @NotNull ScheduledExecutorService executor, @NotNull Runnable action) { + if (bucket.tryConsume(1L)) + return CompletableFuture.runAsync(action); + + return bucket.asScheduler().consume(1L, executor).thenRunAsync(action); + } + +} diff --git a/common/src/main/java/com/github/twitch4j/common/util/SneakySupplier.java b/common/src/main/java/com/github/twitch4j/common/util/SneakySupplier.java new file mode 100644 index 000000000..928bf1be6 --- /dev/null +++ b/common/src/main/java/com/github/twitch4j/common/util/SneakySupplier.java @@ -0,0 +1,32 @@ +package com.github.twitch4j.common.util; + +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +/** + * A supplier that can sneakily throw exceptions. + *

+ * This class should be used sparingly (to avoid hackiness) and carefully (to ensure bubbled exceptions are properly handled). + * + * @param the return type of values provided by the supplier + */ +@RequiredArgsConstructor +public final class SneakySupplier implements Supplier { + + /** + * The action to compute the supplied value, possibly throwing an exception. + */ + @NotNull + private final Callable callable; + + @Override + @SneakyThrows + public T get() { + return callable.call(); + } + +} diff --git a/common/src/main/java/com/github/twitch4j/common/util/TwitchLimitRegistry.java b/common/src/main/java/com/github/twitch4j/common/util/TwitchLimitRegistry.java index 8644fd6a1..69f779896 100644 --- a/common/src/main/java/com/github/twitch4j/common/util/TwitchLimitRegistry.java +++ b/common/src/main/java/com/github/twitch4j/common/util/TwitchLimitRegistry.java @@ -5,7 +5,6 @@ import io.github.bucket4j.Bucket; import io.github.bucket4j.BucketConfiguration; import io.github.bucket4j.TokensInheritanceStrategy; -import io.github.bucket4j.local.LocalBucketBuilder; import lombok.NonNull; import java.util.Collections; @@ -51,7 +50,7 @@ public void setLimit(@NonNull String userId, @NonNull TwitchLimitType limitType, return bucket; } - return constructBucket(limit); + return BucketUtils.createBucket(limit); }); } @@ -76,19 +75,13 @@ public Optional getBucket(@NonNull String userId, @NonNull TwitchLimitTy * @return the shared rate limit bucket for this user and limit type */ public Bucket getOrInitializeBucket(@NonNull String userId, @NonNull TwitchLimitType limitType, @NonNull List limit) { - return getBucketsByUser(userId).computeIfAbsent(limitType, l -> constructBucket(limit)); + return getBucketsByUser(userId).computeIfAbsent(limitType, l -> BucketUtils.createBucket(limit)); } private Map getBucketsByUser(String userId) { return limits.computeIfAbsent(userId, s -> Collections.synchronizedMap(new EnumMap<>(TwitchLimitType.class))); } - private static Bucket constructBucket(List limits) { - LocalBucketBuilder builder = Bucket.builder(); - limits.forEach(builder::addLimit); - return builder.build(); - } - /** * @return the single thread-safe instance of the limit registry. */ diff --git a/rest-extensions/src/main/java/com/github/twitch4j/extensions/util/TwitchExtensionsErrorDecoder.java b/rest-extensions/src/main/java/com/github/twitch4j/extensions/util/TwitchExtensionsErrorDecoder.java index e6abb64c2..c8734d198 100644 --- a/rest-extensions/src/main/java/com/github/twitch4j/extensions/util/TwitchExtensionsErrorDecoder.java +++ b/rest-extensions/src/main/java/com/github/twitch4j/extensions/util/TwitchExtensionsErrorDecoder.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.exception.ContextedRuntimeException; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; @RequiredArgsConstructor @@ -39,8 +40,8 @@ public class TwitchExtensionsErrorDecoder implements ErrorDecoder { public Exception decode(String methodKey, Response response) { Exception ex; - try { - String responseBody = response.body() == null ? "" : IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name()); + try (InputStream is = response.body() == null ? null : response.body().asInputStream()) { + String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8); if (response.status() == 401) { ex = new UnauthorizedException() @@ -53,7 +54,7 @@ public Exception decode(String methodKey, Response response) { .addContextValue("requestMethod", response.request().httpMethod()) .addContextValue("responseBody", responseBody); } else if (response.status() == 429) { - ex = new ContextedRuntimeException("To many requests!") + ex = new ContextedRuntimeException("Too many requests!") .addContextValue("requestUrl", response.request().url()) .addContextValue("requestMethod", response.request().httpMethod()) .addContextValue("responseBody", responseBody); @@ -70,7 +71,8 @@ public Exception decode(String methodKey, Response response) { .addContextValue("responseBody", responseBody) .addContextValue("errorType", error.getError()) .addContextValue("errorStatus", error.getStatus()) - .addContextValue("errorType", error.getMessage()); + .addContextValue("errorType", error.getMessage()) + .addContextValue("errorMessage", error.getMessage()); } } catch (IOException fallbackToDefault) { ex = defaultDecoder.decode(methodKey, response); diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java index 0cd8b4439..3869cf99e 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixBuilder.java @@ -8,9 +8,11 @@ import com.github.twitch4j.common.util.TypeConvert; import com.github.twitch4j.helix.domain.CustomReward; import com.github.twitch4j.helix.interceptor.CustomRewardEncodeMixIn; +import com.github.twitch4j.helix.interceptor.TwitchHelixTokenManager; import com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor; import com.github.twitch4j.helix.interceptor.TwitchHelixDecoder; import com.github.twitch4j.helix.interceptor.TwitchHelixHttpClient; +import com.github.twitch4j.helix.interceptor.TwitchHelixRateLimitTracker; import com.netflix.config.ConfigurationManager; import feign.Logger; import feign.Request; @@ -20,10 +22,13 @@ import feign.jackson.JacksonEncoder; import feign.okhttp.OkHttpClient; import feign.slf4j.Slf4jLogger; +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Refill; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; +import java.time.Duration; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,6 +53,11 @@ public class TwitchHelixBuilder { */ public static final String MOCK_BASE_URL = "http://localhost:8080/mock"; + /** + * @see Helix Rate Limit Reference + */ + public static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.classic(800, Refill.greedy(600, Duration.ofMinutes(1))); + /** * Client Id */ @@ -108,6 +118,12 @@ public class TwitchHelixBuilder { @With private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null; + /** + * Custom Rate Limit to use for Helix calls + */ + @With + private Bandwidth apiRateLimit = DEFAULT_BANDWIDTH; + /** * Initialize the builder * @@ -149,16 +165,21 @@ public TwitchHelix build() { if (scheduledThreadPoolExecutor == null) scheduledThreadPoolExecutor = ThreadUtils.getDefaultScheduledThreadPoolExecutor("twitch4j-" + RandomStringUtils.random(4, true, true), 1); + // Enforce non-null rate limit bandwidth + if (apiRateLimit == null) + apiRateLimit = DEFAULT_BANDWIDTH; + // Feign - TwitchHelixClientIdInterceptor interceptor = new TwitchHelixClientIdInterceptor(this); + TwitchHelixTokenManager tokenManager = new TwitchHelixTokenManager(clientId, clientSecret, defaultAuthToken); + TwitchHelixRateLimitTracker rateLimitTracker = new TwitchHelixRateLimitTracker(apiRateLimit, tokenManager); return HystrixFeign.builder() - .client(new TwitchHelixHttpClient(new OkHttpClient(clientBuilder.build()), scheduledThreadPoolExecutor, interceptor, timeout)) + .client(new TwitchHelixHttpClient(new OkHttpClient(clientBuilder.build()), scheduledThreadPoolExecutor, tokenManager, rateLimitTracker, timeout)) .encoder(new JacksonEncoder(serializer)) - .decoder(new TwitchHelixDecoder(mapper, interceptor)) + .decoder(new TwitchHelixDecoder(mapper, rateLimitTracker)) .logger(new Slf4jLogger()) .logLevel(logLevel) - .errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder())) - .requestInterceptor(interceptor) + .errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder(), rateLimitTracker)) + .requestInterceptor(new TwitchHelixClientIdInterceptor(userAgent, tokenManager)) .options(new Request.Options(timeout / 3, TimeUnit.MILLISECONDS, timeout, TimeUnit.MILLISECONDS, true)) .retryer(new Retryer.Default(500, timeout, 2)) .target(TwitchHelix.class, baseUrl); diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixErrorDecoder.java b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixErrorDecoder.java index d4693ec68..dac1068d4 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixErrorDecoder.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/TwitchHelixErrorDecoder.java @@ -5,7 +5,9 @@ import com.github.twitch4j.common.exception.UnauthorizedException; import com.github.twitch4j.common.util.TypeConvert; import com.github.twitch4j.helix.domain.TwitchHelixError; +import com.github.twitch4j.helix.interceptor.TwitchHelixRateLimitTracker; import feign.Request; +import feign.RequestTemplate; import feign.Response; import feign.RetryableException; import feign.codec.Decoder; @@ -15,6 +17,7 @@ import org.apache.commons.lang3.exception.ContextedRuntimeException; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; @Slf4j @@ -23,6 +26,9 @@ public class TwitchHelixErrorDecoder implements ErrorDecoder { // Decoder final Decoder decoder; + // Rate Limit Tracker + final TwitchHelixRateLimitTracker rateLimitTracker; + // Error Decoder final ErrorDecoder defaultDecoder = new ErrorDecoder.Default(); @@ -32,25 +38,27 @@ public class TwitchHelixErrorDecoder implements ErrorDecoder { /** * Constructor * - * @param decoder Feign Decoder + * @param decoder Feign Decoder + * @param rateLimitTracker Helix Rate Limit Tracker */ - public TwitchHelixErrorDecoder(Decoder decoder) { + public TwitchHelixErrorDecoder(Decoder decoder, TwitchHelixRateLimitTracker rateLimitTracker) { this.decoder = decoder; + this.rateLimitTracker = rateLimitTracker; } /** * Overwrite the Decode Method to handle custom error cases * * @param methodKey Method Key - * @param response Response + * @param response Response * @return Exception */ @Override public Exception decode(String methodKey, Response response) { - Exception ex = null; + Exception ex; - try { - String responseBody = response.body() == null ? "" : IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name()); + try (InputStream is = response.body() == null ? null : response.body().asInputStream()) { + String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8); if (response.status() == 401) { ex = new UnauthorizedException() @@ -63,10 +71,17 @@ public Exception decode(String methodKey, Response response) { .addContextValue("requestMethod", response.request().httpMethod()) .addContextValue("responseBody", responseBody); } else if (response.status() == 429) { - ex = new ContextedRuntimeException("To many requests!") + ex = new ContextedRuntimeException("Too many requests!") .addContextValue("requestUrl", response.request().url()) .addContextValue("requestMethod", response.request().httpMethod()) - .addContextValue("responseBody", responseBody);; + .addContextValue("responseBody", responseBody); + + // Deplete ban bucket on 429 (to be safe) + RequestTemplate template = response.request().requestTemplate(); + if (template.path().endsWith("/moderation/bans")) { + String channelId = template.queries().get("broadcaster_id").iterator().next(); + rateLimitTracker.markDepletedBanBucket(channelId); + } } else if (response.status() == 503) { // If you get an HTTP 503 (Service Unavailable) error, retry once. // If that retry also results in an HTTP 503, there probably is something wrong with the downstream service. @@ -80,7 +95,8 @@ public Exception decode(String methodKey, Response response) { .addContextValue("responseBody", responseBody) .addContextValue("errorType", error.getError()) .addContextValue("errorStatus", error.getStatus()) - .addContextValue("errorType", error.getMessage()); + .addContextValue("errorType", error.getMessage()) + .addContextValue("errorMessage", error.getMessage()); } } catch (IOException fallbackToDefault) { ex = defaultDecoder.decode(methodKey, response); diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/domain/SendPubSubMessageInput.java b/rest-helix/src/main/java/com/github/twitch4j/helix/domain/SendPubSubMessageInput.java index 155d2a14e..99e240592 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/domain/SendPubSubMessageInput.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/domain/SendPubSubMessageInput.java @@ -42,7 +42,7 @@ public class SendPubSubMessageInput { /** * Strings for valid PubSub targets. - * Valid values: "broadcast", "global", "whisper-" + * Valid values: "broadcast", "global", "{@literal whisper-}" */ @Singular @JsonProperty("target") diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java index 767a8cc68..d2e430c25 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixClientIdInterceptor.java @@ -1,25 +1,11 @@ package com.github.twitch4j.helix.interceptor; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; import com.github.philippheuer.credentialmanager.domain.OAuth2Credential; -import com.github.twitch4j.auth.providers.TwitchIdentityProvider; -import com.github.twitch4j.helix.TwitchHelixBuilder; import feign.RequestInterceptor; import feign.RequestTemplate; -import io.github.bucket4j.Bandwidth; -import io.github.bucket4j.Bucket; -import io.github.bucket4j.Bucket4j; -import lombok.AccessLevel; -import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - /** * Injects ClientId Header, the User Agent and other common headers into each API Request */ @@ -30,62 +16,21 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor { public static final String BEARER_PREFIX = "Bearer "; /** - * @see Helix Rate Limit Reference + * User Agent */ - private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.simple(800, Duration.ofMinutes(1)); - - /** - * Reference to the Client Builder - */ - private final TwitchHelixBuilder twitchAPIBuilder; - - /** - * Reference to the twitch identity provider - */ - @Setter - private TwitchIdentityProvider twitchIdentityProvider; + private final String userAgent; /** * Access token cache */ - @Getter(value = AccessLevel.PROTECTED) - private final Cache accessTokenCache = Caffeine.newBuilder() - .expireAfterAccess(15, TimeUnit.MINUTES) - .maximumSize(10_000) - .build(); - - /** - * Rate limit buckets by user/app - */ - private final Cache buckets = Caffeine.newBuilder() - .expireAfterAccess(1, TimeUnit.MINUTES) - .build(); - - /** - * The default app access token that is used if no oauth was passed by the user - */ - private volatile OAuth2Credential defaultAuthToken; - - /** - * The default client id, typically associated with {@link TwitchHelixClientIdInterceptor#defaultAuthToken} - */ - private volatile String defaultClientId; + private final TwitchHelixTokenManager tokenManager; /** * Constructor - * - * @param twitchHelixBuilder Twitch Client Builder */ - public TwitchHelixClientIdInterceptor(TwitchHelixBuilder twitchHelixBuilder) { - this.twitchAPIBuilder = twitchHelixBuilder; - twitchIdentityProvider = new TwitchIdentityProvider(twitchHelixBuilder.getClientId(), twitchHelixBuilder.getClientSecret(), null); - this.defaultClientId = twitchAPIBuilder.getClientId(); - this.defaultAuthToken = twitchHelixBuilder.getDefaultAuthToken(); - if (defaultAuthToken != null) - twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken).ifPresent(oauth -> { - this.defaultClientId = (String) oauth.getContext().get("client_id"); - accessTokenCache.put(oauth.getAccessToken(), oauth); - }); + public TwitchHelixClientIdInterceptor(String userAgent, TwitchHelixTokenManager tokenManager) { + this.userAgent = userAgent; + this.tokenManager = tokenManager; } /** @@ -95,19 +40,16 @@ public TwitchHelixClientIdInterceptor(TwitchHelixBuilder twitchHelixBuilder) { */ @Override public void apply(RequestTemplate template) { - String clientId = this.defaultClientId; + String clientId = tokenManager.getDefaultClientId(); // if a oauth token is passed is has to match that client id, default to global client id otherwise (for ie. token verification) if (template.headers().containsKey(AUTH_HEADER)) { String oauthToken = template.headers().get(AUTH_HEADER).iterator().next().substring(BEARER_PREFIX.length()); if (oauthToken.isEmpty()) { - String clientSecret = twitchAPIBuilder.getClientSecret(); - if (defaultAuthToken == null && (StringUtils.isEmpty(clientId) || StringUtils.isEmpty(clientSecret) || clientSecret.charAt(0) == '*')) - throw new RuntimeException("Necessary OAuth token was missing from Helix call, without the means to generate one!"); - try { - oauthToken = getOrCreateAuthToken().getAccessToken(); + oauthToken = tokenManager.getDefaultAuthToken().getAccessToken(); + clientId = tokenManager.getDefaultClientId(); } catch (Exception e) { throw new RuntimeException("Failed to generate an app access token as no oauth token was passed to this Helix call", e); } @@ -115,20 +57,8 @@ public void apply(RequestTemplate template) { template.removeHeader(AUTH_HEADER); template.header(AUTH_HEADER, BEARER_PREFIX + oauthToken); } else if (!StringUtils.contains(oauthToken, '.')) { - OAuth2Credential verifiedCredential = accessTokenCache.getIfPresent(oauthToken); - if (verifiedCredential == null) { - log.debug("Getting matching client-id for authorization token {}", oauthToken.substring(0, 5)); - - Optional requestedCredential = twitchIdentityProvider.getAdditionalCredentialInformation(new OAuth2Credential("twitch", oauthToken)); - if (!requestedCredential.isPresent()) { - throw new RuntimeException("Failed to get the client_id for the provided authentication token, the authentication token may be invalid!"); - } - - verifiedCredential = requestedCredential.get(); - accessTokenCache.put(oauthToken, verifiedCredential); - } - - clientId = (String) verifiedCredential.getContext().get("client_id"); + OAuth2Credential verifiedCredential = tokenManager.getOrPopulateCache(oauthToken); + clientId = TwitchHelixTokenManager.extractClientId(verifiedCredential); } log.debug("Setting new client-id {} for token {}", clientId, oauthToken.substring(0, 5)); @@ -137,48 +67,7 @@ public void apply(RequestTemplate template) { // set headers if (!template.headers().containsKey("Client-Id")) template.header("Client-Id", clientId); - template.header("User-Agent", twitchAPIBuilder.getUserAgent()); - } - - public void updateRemaining(String token, int remaining) { - OAuth2Credential credential = accessTokenCache.getIfPresent(token); - if (credential == null) return; - - String key = getKey(credential); - if (key == null) return; - - Bucket bucket = getOrInitializeBucket(key); - long diff = bucket.getAvailableTokens() - remaining; - if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff); - } - - public void clearDefaultToken() { - this.defaultAuthToken = null; - } - - protected String getKey(OAuth2Credential credential) { - String clientId = (String) credential.getContext().get("client_id"); - return clientId == null ? null : credential.getUserId() == null ? clientId : clientId + "-" + credential.getUserId(); - } - - protected Bucket getOrInitializeBucket(String key) { - return buckets.get(key, k -> Bucket.builder().addLimit(DEFAULT_BANDWIDTH).build()); + template.header("User-Agent", userAgent); } - private OAuth2Credential getOrCreateAuthToken() { - if (defaultAuthToken == null) - synchronized (this) { - if (defaultAuthToken == null) { - String clientId = twitchAPIBuilder.getClientId(); - OAuth2Credential token = twitchIdentityProvider.getAppAccessToken(); - token.getContext().put("client_id", clientId); - getOrInitializeBucket(clientId); - accessTokenCache.put(token.getAccessToken(), token); - this.defaultClientId = clientId; - return this.defaultAuthToken = token; - } - } - - return this.defaultAuthToken; - } } diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java index fefc32b4b..6f1199f94 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixDecoder.java @@ -1,6 +1,7 @@ package com.github.twitch4j.helix.interceptor; import com.fasterxml.jackson.databind.ObjectMapper; +import feign.Request; import feign.Response; import feign.jackson.JacksonDecoder; @@ -15,11 +16,11 @@ public class TwitchHelixDecoder extends JacksonDecoder { public static final String REMAINING_HEADER = "Ratelimit-Remaining"; - private final TwitchHelixClientIdInterceptor interceptor; + private final TwitchHelixRateLimitTracker rateLimitTracker; - public TwitchHelixDecoder(ObjectMapper mapper, TwitchHelixClientIdInterceptor interceptor) { + public TwitchHelixDecoder(ObjectMapper mapper, TwitchHelixRateLimitTracker rateLimitTracker) { super(mapper); - this.interceptor = interceptor; + this.rateLimitTracker = rateLimitTracker; } @Override @@ -27,11 +28,24 @@ public Object decode(Response response, Type type) throws IOException { // track rate limit for token String token = singleFirst(response.request().headers().get(AUTH_HEADER)); if (token != null && token.startsWith(BEARER_PREFIX)) { - String remaining = singleFirst(response.headers().get(REMAINING_HEADER)); + // Parse remaining + String remainingStr = singleFirst(response.headers().get(REMAINING_HEADER)); + Integer remaining; + try { + remaining = Integer.parseInt(remainingStr); + } catch (NumberFormatException ignored) { + remaining = null; + } + + // Synchronize library buckets with twitch data if (remaining != null) { - try { - interceptor.updateRemaining(token.substring(BEARER_PREFIX.length()), Integer.parseInt(remaining)); - } catch (Exception ignored) { + String bearer = token.substring(BEARER_PREFIX.length()); + if (response.request().httpMethod() == Request.HttpMethod.POST && response.request().requestTemplate().path().endsWith("/clips")) { + // Create Clip has a separate rate limit to synchronize + rateLimitTracker.updateRemainingCreateClip(bearer, remaining); + } else { + // Normal/global helix rate limit synchronization + rateLimitTracker.updateRemaining(bearer, remaining); } } } diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java index e342a1a89..283843ff1 100644 --- a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixHttpClient.java @@ -1,6 +1,7 @@ package com.github.twitch4j.helix.interceptor; import com.github.philippheuer.credentialmanager.domain.OAuth2Credential; +import com.github.twitch4j.common.util.BucketUtils; import feign.Client; import feign.Request; import feign.Response; @@ -9,6 +10,8 @@ import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -24,13 +27,15 @@ public class TwitchHelixHttpClient implements Client { private final Client client; private final ScheduledExecutorService executor; - private final TwitchHelixClientIdInterceptor interceptor; + private final TwitchHelixTokenManager tokenManager; + private final TwitchHelixRateLimitTracker rateLimitTracker; private final long timeout; - public TwitchHelixHttpClient(OkHttpClient client, ScheduledThreadPoolExecutor executor, TwitchHelixClientIdInterceptor interceptor, Integer timeout) { + public TwitchHelixHttpClient(OkHttpClient client, ScheduledThreadPoolExecutor executor, TwitchHelixTokenManager tokenManager, TwitchHelixRateLimitTracker rateLimitTracker, Integer timeout) { this.client = client; this.executor = executor; - this.interceptor = interceptor; + this.tokenManager = tokenManager; + this.rateLimitTracker = rateLimitTracker; this.timeout = timeout == null ? 60 * 1000 : timeout.longValue(); } @@ -39,35 +44,73 @@ public Response execute(Request request, Request.Options options) throws IOExcep // Check whether this request should be delayed to conform to rate limits String token = singleFirst(request.headers().get(AUTH_HEADER)); if (token != null && token.startsWith(BEARER_PREFIX)) { - OAuth2Credential credential = interceptor.getAccessTokenCache().getIfPresent(token.substring(BEARER_PREFIX.length())); + OAuth2Credential credential = tokenManager.getIfPresent(token.substring(BEARER_PREFIX.length())); if (credential != null) { - Bucket bucket = interceptor.getOrInitializeBucket(interceptor.getKey(credential)); - if (bucket.tryConsume(1)) { - // no delay needed - return client.execute(request, options); - } else { - try { - // effectively blocking, unfortunately - return bucket.asScheduler().consume(1, executor) - .thenApplyAsync(v -> { - try { - return client.execute(request, options); - } catch (IOException e) { - log.error("Helix API call execution failed", e); - return null; - } - }) - .get(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.error("Throttled Helix API call timed-out before completion", e); - return null; - } - } + // First consume from helix global rate limit (800/min by default) + Bucket bucket = rateLimitTracker.getOrInitializeBucket(rateLimitTracker.getPrimaryBucketKey(credential)); + return executeAgainstBucket(bucket, () -> delegatedExecute(request, options)); } } // Fallback: just run the http request + return delegatedExecute(request, options); + } + + /** + * After the helix rate limit has been evaluated, check for any other endpoint-specific limits before actually executing the request. + * + * @param request feign request + * @param options feign request options + * @return feign response + * @throws IOException on network errors + */ + private Response delegatedExecute(Request request, Request.Options options) throws IOException { + String templatePath = request.requestTemplate().path(); + + // Moderation API: banUser and unbanUser share a bucket per channel id + if (templatePath.endsWith("/moderation/bans")) { + // Obtain the channel id + String channelId = request.requestTemplate().queries().getOrDefault("broadcaster_id", Collections.emptyList()).iterator().next(); + + // Conform to endpoint-specific bucket + Bucket modBucket = rateLimitTracker.getModerationBucket(channelId); + return executeAgainstBucket(modBucket, () -> client.execute(request, options)); + } + + // Moderation API: addBlockedTerm and removeBlockedTerm share a bucket per channel id + if (templatePath.endsWith("/moderation/blocked_terms") && (request.httpMethod() == Request.HttpMethod.POST || request.httpMethod() == Request.HttpMethod.DELETE)) { + // Obtain the channel id + String channelId = request.requestTemplate().queries().getOrDefault("broadcaster_id", Collections.emptyList()).iterator().next(); + + // Conform to endpoint-specific bucket + Bucket termsBucket = rateLimitTracker.getTermsBucket(channelId); + return executeAgainstBucket(termsBucket, () -> client.execute(request, options)); + } + + // Clips API: createClip has a stricter bucket that applies per user id + if (request.httpMethod() == Request.HttpMethod.POST && templatePath.endsWith("/clips")) { + // Obtain user id + String token = request.headers().get(AUTH_HEADER).iterator().next().substring(BEARER_PREFIX.length()); + OAuth2Credential cred = tokenManager.getIfPresent(token); + String userId = cred != null ? cred.getUserId() : ""; + + // Conform to endpoint-specific bucket + Bucket clipBucket = rateLimitTracker.getClipBucket(userId != null ? userId : ""); + return executeAgainstBucket(clipBucket, () -> client.execute(request, options)); + } + + // no endpoint-specific rate limiting was needed; simply perform network request now return client.execute(request, options); } + private T executeAgainstBucket(Bucket bucket, Callable call) throws IOException { + try { + return BucketUtils.scheduleAgainstBucket(bucket, executor, call).get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + if (e.getCause() instanceof IOException) throw (IOException) e.getCause(); + log.error("Throttled Helix API call timed-out before completion", e); + return null; + } + } + } diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixRateLimitTracker.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixRateLimitTracker.java new file mode 100644 index 000000000..6ffd868fa --- /dev/null +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixRateLimitTracker.java @@ -0,0 +1,150 @@ +package com.github.twitch4j.helix.interceptor; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.philippheuer.credentialmanager.domain.OAuth2Credential; +import com.github.twitch4j.common.annotation.Unofficial; +import com.github.twitch4j.common.util.BucketUtils; +import io.github.bucket4j.Bandwidth; +import io.github.bucket4j.Bucket; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +@RequiredArgsConstructor +@SuppressWarnings("ConstantConditions") +public final class TwitchHelixRateLimitTracker { + + /** + * Empirically determined rate limit on helix bans and unbans, per channel + */ + @Unofficial + private static final Bandwidth BANS_BANDWIDTH = Bandwidth.simple(100, Duration.ofSeconds(30)); + + /** + * Empirically determined rate limit on the helix create clip endpoint, per user + */ + @Unofficial + private static final Bandwidth CLIPS_BANDWIDTH = Bandwidth.simple(600, Duration.ofSeconds(60)); + + /** + * Empirically determined rate limit on helix add and remove block term, per channel + */ + @Unofficial + private static final Bandwidth TERMS_BANDWIDTH = Bandwidth.simple(60, Duration.ofSeconds(60)); + + /** + * Rate limit buckets by user/app + */ + private final Cache primaryBuckets = Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(); + + /** + * Moderation API: ban and unban rate limit buckets per channel + */ + private final Cache bansByChannelId = Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(); + + /** + * Create Clip API rate limit buckets per user + */ + private final Cache clipsByUserId = Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(); + + /** + * Moderation API: add and remove blocked term rate limit buckets per channel + */ + private final Cache termsByChannelId = Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.MINUTES) + .build(); + + /** + * The primary (global helix) rate limit bandwidth to use + */ + private final Bandwidth apiRateLimit; + + /** + * Twitch Helix Token Manager + */ + private final TwitchHelixTokenManager tokenManager; + + /* + * Primary (global helix) rate limit bucket finder + */ + + @NotNull + Bucket getOrInitializeBucket(@NotNull String key) { + return primaryBuckets.get(key, k -> BucketUtils.createBucket(this.apiRateLimit)); + } + + @NotNull + String getPrimaryBucketKey(@NotNull OAuth2Credential credential) { + // App access tokens share the same bucket for a given client id + // User access tokens share the same bucket for a given client id & user id pair + // For this method to work, credential must have been augmented with information from getAdditionalCredentialInformation (which is done by the interceptor) + // Thus, this logic yields the key that is associated with each primary helix bucket + String clientId = TwitchHelixTokenManager.extractClientId(credential); + return clientId == null ? "" : StringUtils.isEmpty(credential.getUserId()) ? clientId : clientId + "-" + credential.getUserId(); + } + + /* + * Secondary (endpoint-specific) rate limit buckets + */ + + @NotNull + @Unofficial + Bucket getModerationBucket(@NotNull String channelId) { + return bansByChannelId.get(channelId, k -> BucketUtils.createBucket(BANS_BANDWIDTH)); + } + + @NotNull + @Unofficial + Bucket getClipBucket(@NotNull String userId) { + return clipsByUserId.get(userId, k -> BucketUtils.createBucket(CLIPS_BANDWIDTH)); + } + + @NotNull + @Unofficial + Bucket getTermsBucket(@NotNull String channelId) { + return termsByChannelId.get(channelId, k -> BucketUtils.createBucket(TERMS_BANDWIDTH)); + } + + /* + * Methods to conservatively update remaining points in rate limit buckets, based on incoming twitch statistics + */ + + public void updateRemaining(@NotNull String token, int remaining) { + this.updateRemainingGeneric(token, remaining, this::getPrimaryBucketKey, this::getOrInitializeBucket); + } + + public void updateRemainingCreateClip(@NotNull String token, int remaining) { + this.updateRemainingGeneric(token, remaining, OAuth2Credential::getUserId, this::getClipBucket); + } + + @Unofficial + public void markDepletedBanBucket(@NotNull String channelId) { + // Called upon a 429 for banUser or unbanUser + Bucket modBucket = this.getModerationBucket(channelId); + modBucket.consumeIgnoringRateLimits(Math.max(modBucket.tryConsumeAsMuchAsPossible(), 1)); // intentionally go negative to induce a pause + } + + private void updateRemainingGeneric(String token, int remaining, Function credToKey, Function keyToBucket) { + OAuth2Credential credential = tokenManager.getIfPresent(token); + if (credential == null) return; + + String key = credToKey.apply(credential); + if (key == null) return; + + Bucket bucket = keyToBucket.apply(key); + long diff = bucket.getAvailableTokens() - remaining; + if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff); + } + +} diff --git a/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixTokenManager.java b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixTokenManager.java new file mode 100644 index 000000000..c6302fcab --- /dev/null +++ b/rest-helix/src/main/java/com/github/twitch4j/helix/interceptor/TwitchHelixTokenManager.java @@ -0,0 +1,120 @@ +package com.github.twitch4j.helix.interceptor; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.philippheuer.credentialmanager.domain.OAuth2Credential; +import com.github.twitch4j.auth.providers.TwitchIdentityProvider; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.TimeUnit; + +@Slf4j +public final class TwitchHelixTokenManager { + + private static final String CLIENT_ID_CONTEXT = "client_id"; + + /** + * Access token cache + */ + private final Cache accessTokenCache = Caffeine.newBuilder() + .expireAfterAccess(15, TimeUnit.MINUTES) + .maximumSize(10_000) + .build(); + + /** + * Reference to the twitch identity provider + */ + private final TwitchIdentityProvider twitchIdentityProvider; + + /** + * The client id provided in the helix builder + */ + private final String clientId; + + /** + * The client secret provided in the helix builder + */ + private final String clientSecret; + + /** + * The client id associated with defaultAuthToken + */ + @Getter(AccessLevel.PACKAGE) + private volatile String defaultClientId; + + /** + * The default token that is used if no access token was directly provided to a helix call + */ + private volatile OAuth2Credential defaultAuthToken; + + public TwitchHelixTokenManager(String clientId, String clientSecret, OAuth2Credential defaultAuthToken) { + this.clientId = clientId; + this.clientSecret = clientSecret; + this.twitchIdentityProvider = new TwitchIdentityProvider(clientId, clientSecret, null); + this.defaultClientId = clientId; + this.defaultAuthToken = defaultAuthToken; + + if (defaultAuthToken != null) { + twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken).ifPresent(oauth -> { + populateCache(oauth); + this.defaultClientId = extractClientId(oauth); + this.defaultAuthToken = oauth; + }); + } + } + + OAuth2Credential getDefaultAuthToken() { + if (defaultAuthToken == null) { + synchronized (this) { + if (defaultAuthToken == null) { + checkClientCredentialsParameters(); + OAuth2Credential token = twitchIdentityProvider.getAppAccessToken(); + populateCache(token); + this.defaultClientId = (String) token.getContext().computeIfAbsent(CLIENT_ID_CONTEXT, s -> clientId); + return this.defaultAuthToken = token; + } + } + } + + return defaultAuthToken; + } + + @Nullable + OAuth2Credential getIfPresent(@NotNull String accessToken) { + return accessTokenCache.getIfPresent(accessToken); + } + + @NotNull + OAuth2Credential getOrPopulateCache(@NotNull String accessToken) { + OAuth2Credential verifiedCredential = getIfPresent(accessToken); + + if (verifiedCredential == null) { + log.debug("Getting matching client-id for authorization token {}", accessToken.substring(0, 5)); + verifiedCredential = twitchIdentityProvider + .getAdditionalCredentialInformation(new OAuth2Credential(twitchIdentityProvider.getProviderName(), accessToken)) + .orElseThrow(() -> new RuntimeException("Failed to get the client_id for the provided authentication token, the authentication token may be invalid!")); + populateCache(verifiedCredential); + } + + return verifiedCredential; + } + + private void populateCache(@NotNull OAuth2Credential enrichedCredential) { + accessTokenCache.put(enrichedCredential.getAccessToken(), enrichedCredential); + } + + private void checkClientCredentialsParameters() { + if (StringUtils.isEmpty(defaultClientId) || StringUtils.isEmpty(clientSecret) || clientSecret.charAt(0) == '*') + throw new RuntimeException("Necessary OAuth token was missing from Helix call, without the means to generate one!"); + } + + static String extractClientId(@NotNull OAuth2Credential credential) { + return (String) credential.getContext().get(CLIENT_ID_CONTEXT); + } + +} diff --git a/rest-kraken/src/main/java/com/github/twitch4j/kraken/TwitchKrakenErrorDecoder.java b/rest-kraken/src/main/java/com/github/twitch4j/kraken/TwitchKrakenErrorDecoder.java index 73c2b1b31..7eb6a3c48 100644 --- a/rest-kraken/src/main/java/com/github/twitch4j/kraken/TwitchKrakenErrorDecoder.java +++ b/rest-kraken/src/main/java/com/github/twitch4j/kraken/TwitchKrakenErrorDecoder.java @@ -15,6 +15,7 @@ import org.apache.commons.lang3.exception.ContextedRuntimeException; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; @Slf4j @@ -42,15 +43,15 @@ public TwitchKrakenErrorDecoder(Decoder decoder) { * Overwrite the Decode Method to handle custom error cases * * @param methodKey Method Key - * @param response Response + * @param response Response * @return Exception */ @Override public Exception decode(String methodKey, Response response) { - Exception ex = null; + Exception ex; - try { - String responseBody = response.body() == null ? "" : IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name()); + try (InputStream is = response.body() == null ? null : response.body().asInputStream()) { + String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8); if (response.status() == 401) { ex = new UnauthorizedException() @@ -63,10 +64,10 @@ public Exception decode(String methodKey, Response response) { .addContextValue("requestMethod", response.request().httpMethod()) .addContextValue("responseBody", responseBody); } else if (response.status() == 429) { - ex = new ContextedRuntimeException("To many requests!") + ex = new ContextedRuntimeException("Too many requests!") .addContextValue("requestUrl", response.request().url()) .addContextValue("requestMethod", response.request().httpMethod()) - .addContextValue("responseBody", responseBody);; + .addContextValue("responseBody", responseBody); } else if (response.status() == 503) { // If you get an HTTP 503 (Service Unavailable) error, retry once. // If that retry also results in an HTTP 503, there probably is something wrong with the downstream service. @@ -80,7 +81,8 @@ public Exception decode(String methodKey, Response response) { .addContextValue("responseBody", responseBody) .addContextValue("errorType", error.getError()) .addContextValue("errorStatus", error.getStatus()) - .addContextValue("errorType", error.getMessage()); + .addContextValue("errorType", error.getMessage()) + .addContextValue("errorMessage", error.getMessage()); } } catch (IOException fallbackToDefault) { ex = defaultDecoder.decode(methodKey, response); diff --git a/rest-tmi/src/main/java/com/github/twitch4j/tmi/TwitchMessagingInterfaceErrorDecoder.java b/rest-tmi/src/main/java/com/github/twitch4j/tmi/TwitchMessagingInterfaceErrorDecoder.java index 94bd8ea6b..bfb7c0fd4 100644 --- a/rest-tmi/src/main/java/com/github/twitch4j/tmi/TwitchMessagingInterfaceErrorDecoder.java +++ b/rest-tmi/src/main/java/com/github/twitch4j/tmi/TwitchMessagingInterfaceErrorDecoder.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.exception.ContextedRuntimeException; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; public class TwitchMessagingInterfaceErrorDecoder implements ErrorDecoder { @@ -40,13 +41,13 @@ public TwitchMessagingInterfaceErrorDecoder(Decoder decoder) { * Overwrite the Decode Method to handle custom error cases * * @param methodKey Method Key - * @param response Response + * @param response Response * @return Exception */ @Override public Exception decode(String methodKey, Response response) { - try { - String responseBody = IOUtils.toString(response.body().asInputStream(), StandardCharsets.UTF_8.name()); + try (InputStream is = response.body() == null ? null : response.body().asInputStream()) { + String responseBody = is == null ? "" : IOUtils.toString(is, StandardCharsets.UTF_8); if (response.status() == 401) { throw new UnauthorizedException() @@ -72,7 +73,8 @@ public Exception decode(String methodKey, Response response) { .addContextValue("responseBody", responseBody) .addContextValue("errorType", error.getError()) .addContextValue("errorStatus", error.getStatus()) - .addContextValue("errorType", error.getMessage()); + .addContextValue("errorType", error.getMessage()) + .addContextValue("errorMessage", error.getMessage()); } catch (IOException fallbackToDefault) { return defaultDecoder.decode(methodKey, response); }