Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: comply with undocumented helix rate limits #561

Merged
merged 11 commits into from May 4, 2022
Expand Up @@ -20,10 +20,12 @@
import feign.jackson.JacksonEncoder;
import feign.okhttp.OkHttpClient;
import feign.slf4j.Slf4jLogger;
import io.github.bucket4j.Bandwidth;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;

import java.time.Duration;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -48,6 +50,11 @@ public class TwitchHelixBuilder {
*/
public static final String MOCK_BASE_URL = "http://localhost:8080/mock";

/**
* @see <a href="https://dev.twitch.tv/docs/api/guide#rate-limits">Helix Rate Limit Reference</a>
*/
public static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.simple(800, Duration.ofMinutes(1));

/**
* Client Id
*/
Expand Down Expand Up @@ -108,6 +115,12 @@ public class TwitchHelixBuilder {
@With
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;

/**
* Custom Rate Limit to use for Helix calls
*/
@With
private Bandwidth apiRateLimit = DEFAULT_BANDWIDTH;

/**
* Initialize the builder
*
Expand Down Expand Up @@ -149,6 +162,10 @@ public TwitchHelix build() {
if (scheduledThreadPoolExecutor == null)
scheduledThreadPoolExecutor = ThreadUtils.getDefaultScheduledThreadPoolExecutor("twitch4j-" + RandomStringUtils.random(4, true, true), 1);

// Enforce non-null rate limit bandwidth
if (apiRateLimit == null)
apiRateLimit = DEFAULT_BANDWIDTH;

// Feign
TwitchHelixClientIdInterceptor interceptor = new TwitchHelixClientIdInterceptor(this);
return HystrixFeign.builder()
Expand All @@ -157,7 +174,7 @@ public TwitchHelix build() {
.decoder(new TwitchHelixDecoder(mapper, interceptor))
.logger(new Slf4jLogger())
.logLevel(logLevel)
.errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder()))
.errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder(), interceptor))
.requestInterceptor(interceptor)
.options(new Request.Options(timeout / 3, TimeUnit.MILLISECONDS, timeout, TimeUnit.MILLISECONDS, true))
.retryer(new Retryer.Default(500, timeout, 2))
Expand Down
Expand Up @@ -5,11 +5,14 @@
import com.github.twitch4j.common.exception.UnauthorizedException;
import com.github.twitch4j.common.util.TypeConvert;
import com.github.twitch4j.helix.domain.TwitchHelixError;
import com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor;
import feign.Request;
import feign.RequestTemplate;
import feign.Response;
import feign.RetryableException;
import feign.codec.Decoder;
import feign.codec.ErrorDecoder;
import io.github.bucket4j.Bucket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ContextedRuntimeException;
Expand All @@ -23,6 +26,9 @@ public class TwitchHelixErrorDecoder implements ErrorDecoder {
// Decoder
final Decoder decoder;

// Interceptor
final TwitchHelixClientIdInterceptor interceptor;

// Error Decoder
final ErrorDecoder defaultDecoder = new ErrorDecoder.Default();

Expand All @@ -32,17 +38,19 @@ public class TwitchHelixErrorDecoder implements ErrorDecoder {
/**
* Constructor
*
* @param decoder Feign Decoder
* @param decoder Feign Decoder
* @param interceptor Helix Interceptor
*/
public TwitchHelixErrorDecoder(Decoder decoder) {
public TwitchHelixErrorDecoder(Decoder decoder, TwitchHelixClientIdInterceptor interceptor) {
this.decoder = decoder;
this.interceptor = interceptor;
}

/**
* Overwrite the Decode Method to handle custom error cases
*
* @param methodKey Method Key
* @param response Response
* @param response Response
* @return Exception
*/
@Override
Expand All @@ -63,10 +71,18 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);
} else if (response.status() == 429) {
ex = new ContextedRuntimeException("To many requests!")
ex = new ContextedRuntimeException("Too many requests!")
.addContextValue("requestUrl", response.request().url())
.addContextValue("requestMethod", response.request().httpMethod())
.addContextValue("responseBody", responseBody);;
.addContextValue("responseBody", responseBody);

// Deplete ban bucket on 429 (to be safe)
RequestTemplate template = response.request().requestTemplate();
if (template.path().endsWith("/moderation/bans")) {
String channelId = template.queries().get("broadcaster_id").iterator().next();
Bucket modBucket = interceptor.getModerationBucket(channelId);
modBucket.consumeIgnoringRateLimits(Math.max(modBucket.tryConsumeAsMuchAsPossible(), 1)); // intentionally go negative to induce a pause
}
} else if (response.status() == 503) {
// If you get an HTTP 503 (Service Unavailable) error, retry once.
// If that retry also results in an HTTP 503, there probably is something wrong with the downstream service.
Expand All @@ -80,7 +96,8 @@ public Exception decode(String methodKey, Response response) {
.addContextValue("responseBody", responseBody)
.addContextValue("errorType", error.getError())
.addContextValue("errorStatus", error.getStatus())
.addContextValue("errorType", error.getMessage());
.addContextValue("errorType", error.getMessage())
.addContextValue("errorMessage", error.getMessage());
}
} catch (IOException fallbackToDefault) {
ex = defaultDecoder.decode(methodKey, response);
Expand Down
Expand Up @@ -4,12 +4,12 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.twitch4j.auth.providers.TwitchIdentityProvider;
import com.github.twitch4j.common.annotation.Unofficial;
import com.github.twitch4j.helix.TwitchHelixBuilder;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* Injects ClientId Header, the User Agent and other common headers into each API Request
Expand All @@ -30,15 +31,33 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
public static final String BEARER_PREFIX = "Bearer ";

/**
* @see <a href="https://dev.twitch.tv/docs/api/guide#rate-limits">Helix Rate Limit Reference</a>
* Empirically determined rate limit on helix bans and unbans, per channel
*/
private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.simple(800, Duration.ofMinutes(1));
@Unofficial
private static final Bandwidth BANS_BANDWIDTH = Bandwidth.simple(100, Duration.ofSeconds(30));

/**
* Empirically determined rate limit on the helix create clip endpoint, per user
*/
@Unofficial
private static final Bandwidth CLIPS_BANDWIDTH = Bandwidth.simple(600, Duration.ofSeconds(60));

/**
* Empirically determined rate limit on helix add and remove block term, per channel
*/
@Unofficial
private static final Bandwidth TERMS_BANDWIDTH = Bandwidth.simple(60, Duration.ofSeconds(60));

/**
* Reference to the Client Builder
*/
private final TwitchHelixBuilder twitchAPIBuilder;

/**
* Helix Rate Limit
*/
private final Bandwidth apiRateLimit;

/**
* Reference to the twitch identity provider
*/
Expand All @@ -61,6 +80,27 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();

/**
* Moderation API: ban and unban rate limit buckets per channel
*/
private final Cache<String, Bucket> bansByChannelId = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();

/**
* Create Clip API rate limit buckets per user
*/
private final Cache<String, Bucket> clipsByUserId = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();

/**
* Moderation API: add and remove blocked term rate limit buckets per channel
*/
private final Cache<String, Bucket> termsByChannelId = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();

/**
* The default app access token that is used if no oauth was passed by the user
*/
Expand All @@ -80,6 +120,7 @@ public TwitchHelixClientIdInterceptor(TwitchHelixBuilder twitchHelixBuilder) {
this.twitchAPIBuilder = twitchHelixBuilder;
twitchIdentityProvider = new TwitchIdentityProvider(twitchHelixBuilder.getClientId(), twitchHelixBuilder.getClientSecret(), null);
this.defaultClientId = twitchAPIBuilder.getClientId();
this.apiRateLimit = twitchAPIBuilder.getApiRateLimit();
this.defaultAuthToken = twitchHelixBuilder.getDefaultAuthToken();
if (defaultAuthToken != null)
twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken).ifPresent(oauth -> {
Expand Down Expand Up @@ -141,15 +182,11 @@ public void apply(RequestTemplate template) {
}

public void updateRemaining(String token, int remaining) {
OAuth2Credential credential = accessTokenCache.getIfPresent(token);
if (credential == null) return;

String key = getKey(credential);
if (key == null) return;
this.updateRemainingGeneric(token, remaining, this::getKey, this::getOrInitializeBucket);
}

Bucket bucket = getOrInitializeBucket(key);
long diff = bucket.getAvailableTokens() - remaining;
if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff);
public void updateRemainingCreateClip(String token, int remaining) {
this.updateRemainingGeneric(token, remaining, OAuth2Credential::getUserId, this::getClipBucket);
}

public void clearDefaultToken() {
Expand All @@ -162,7 +199,19 @@ protected String getKey(OAuth2Credential credential) {
}

protected Bucket getOrInitializeBucket(String key) {
return buckets.get(key, k -> Bucket.builder().addLimit(DEFAULT_BANDWIDTH).build());
return buckets.get(key, k -> Bucket.builder().addLimit(this.apiRateLimit).build());
}

public Bucket getModerationBucket(String channelId) {
return bansByChannelId.get(channelId, k -> Bucket.builder().addLimit(BANS_BANDWIDTH).build());
}

protected Bucket getClipBucket(String userId) {
return clipsByUserId.get(userId, k -> Bucket.builder().addLimit(CLIPS_BANDWIDTH).build());
}

protected Bucket getTermsBucket(String channelId) {
return termsByChannelId.get(channelId, k -> Bucket.builder().addLimit(TERMS_BANDWIDTH).build());
}

private OAuth2Credential getOrCreateAuthToken() {
Expand All @@ -181,4 +230,17 @@ private OAuth2Credential getOrCreateAuthToken() {

return this.defaultAuthToken;
}

private void updateRemainingGeneric(String token, int remaining, Function<OAuth2Credential, String> credToKey, Function<String, Bucket> keyToBucket) {
OAuth2Credential credential = accessTokenCache.getIfPresent(token);
if (credential == null) return;

String key = credToKey.apply(credential);
if (key == null) return;

Bucket bucket = keyToBucket.apply(key);
long diff = bucket.getAvailableTokens() - remaining;
if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff);
}

}
@@ -1,6 +1,7 @@
package com.github.twitch4j.helix.interceptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import feign.Request;
import feign.Response;
import feign.jackson.JacksonDecoder;

Expand All @@ -27,11 +28,24 @@ public Object decode(Response response, Type type) throws IOException {
// track rate limit for token
String token = singleFirst(response.request().headers().get(AUTH_HEADER));
if (token != null && token.startsWith(BEARER_PREFIX)) {
String remaining = singleFirst(response.headers().get(REMAINING_HEADER));
// Parse remaining
String remainingStr = singleFirst(response.headers().get(REMAINING_HEADER));
Integer remaining;
try {
remaining = Integer.parseInt(remainingStr);
} catch (NumberFormatException ignored) {
remaining = null;
}

// Synchronize library buckets with twitch data
if (remaining != null) {
try {
interceptor.updateRemaining(token.substring(BEARER_PREFIX.length()), Integer.parseInt(remaining));
} catch (Exception ignored) {
String bearer = token.substring(BEARER_PREFIX.length());
if (response.request().httpMethod() == Request.HttpMethod.POST && response.request().requestTemplate().path().endsWith("/clips")) {
// Create Clip has a separate rate limit to synchronize
interceptor.updateRemainingCreateClip(bearer, remaining);
iProdigy marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Normal/global helix rate limit synchronization
interceptor.updateRemaining(bearer, remaining);
}
}
}
Expand Down