Skip to content

Commit

Permalink
feat: automatically handle helix rate limits (#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
iProdigy committed Apr 27, 2021
1 parent 5cbd732 commit 68bfe14
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 16 deletions.
3 changes: 3 additions & 0 deletions rest-helix/build.gradle
Expand Up @@ -7,6 +7,9 @@ dependencies {
api group: 'io.github.openfeign', name: 'feign-hystrix'
api group: 'commons-configuration', name: 'commons-configuration'

// Rate Limiting
api group: 'com.github.vladimir-bukhtoyarov', name: 'bucket4j-core'

// Jackson (JSON)
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind'

Expand Down
Expand Up @@ -4,8 +4,11 @@
import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import com.github.twitch4j.common.config.ProxyConfig;
import com.github.twitch4j.common.config.Twitch4JGlobal;
import com.github.twitch4j.common.util.ThreadUtils;
import com.github.twitch4j.common.util.TypeConvert;
import com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor;
import com.github.twitch4j.helix.interceptor.TwitchHelixDecoder;
import com.github.twitch4j.helix.interceptor.TwitchHelixHttpClient;
import com.netflix.config.ConfigurationManager;
import feign.Logger;
import feign.Request;
Expand All @@ -17,7 +20,9 @@
import feign.slf4j.Slf4jLogger;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -82,6 +87,12 @@ public class TwitchHelixBuilder {
@With
private ProxyConfig proxyConfig = null;

/**
* Scheduler Thread Pool Executor
*/
@With
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = null;

/**
* Initialize the builder
*
Expand Down Expand Up @@ -113,19 +124,22 @@ public TwitchHelix build() {
if (proxyConfig != null)
proxyConfig.apply(clientBuilder);

// Executor for rate limiting
if (scheduledThreadPoolExecutor == null)
scheduledThreadPoolExecutor = ThreadUtils.getDefaultScheduledThreadPoolExecutor("twitch4j-" + RandomStringUtils.random(4, true, true), 1);

// Feign
TwitchHelix client = HystrixFeign.builder()
.client(new OkHttpClient(clientBuilder.build()))
TwitchHelixClientIdInterceptor interceptor = new TwitchHelixClientIdInterceptor(this);
return HystrixFeign.builder()
.client(new TwitchHelixHttpClient(new OkHttpClient(clientBuilder.build()), scheduledThreadPoolExecutor, interceptor, timeout))
.encoder(new JacksonEncoder(mapper))
.decoder(new JacksonDecoder(mapper))
.decoder(new TwitchHelixDecoder(mapper, interceptor))
.logger(new Slf4jLogger())
.logLevel(logLevel)
.errorDecoder(new TwitchHelixErrorDecoder(new JacksonDecoder()))
.requestInterceptor(new TwitchHelixClientIdInterceptor(this))
.requestInterceptor(interceptor)
.options(new Request.Options(timeout / 3, TimeUnit.MILLISECONDS, timeout, TimeUnit.MILLISECONDS, true))
.retryer(new Retryer.Default(500, timeout, 2))
.target(TwitchHelix.class, baseUrl);

return client;
}
}
Expand Up @@ -7,10 +7,16 @@
import com.github.twitch4j.helix.TwitchHelixBuilder;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand All @@ -20,6 +26,14 @@
@Slf4j
public class TwitchHelixClientIdInterceptor implements RequestInterceptor {

public static final String AUTH_HEADER = "Authorization";
public static final String BEARER_PREFIX = "Bearer ";

/**
* @see <a href="https://dev.twitch.tv/docs/api/guide#rate-limits">Helix Rate Limit Reference</a>
*/
private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.simple(800, Duration.ofMinutes(1));

/**
* Reference to the Client Builder
*/
Expand All @@ -34,11 +48,19 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
/**
* Access token cache
*/
@Getter(value = AccessLevel.PROTECTED)
private final Cache<String, OAuth2Credential> accessTokenCache = Caffeine.newBuilder()
.expireAfterWrite(15, TimeUnit.MINUTES)
.expireAfterAccess(15, TimeUnit.MINUTES)
.maximumSize(10_000)
.build();

/**
* Rate limit buckets by user/app
*/
private final Cache<String, Bucket> buckets = Caffeine.newBuilder()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build();

/**
* The default app access token that is used if no oauth was passed by the user
*/
Expand All @@ -47,7 +69,7 @@ public class TwitchHelixClientIdInterceptor implements RequestInterceptor {
/**
* The default client id, typically associated with {@link TwitchHelixClientIdInterceptor#defaultAuthToken}
*/
private String defaultClientId;
private volatile String defaultClientId;

/**
* Constructor
Expand All @@ -60,8 +82,10 @@ public TwitchHelixClientIdInterceptor(TwitchHelixBuilder twitchHelixBuilder) {
this.defaultClientId = twitchAPIBuilder.getClientId();
this.defaultAuthToken = twitchHelixBuilder.getDefaultAuthToken();
if (defaultAuthToken != null)
twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken)
.ifPresent(oauth -> this.defaultClientId = (String) oauth.getContext().get("client_id"));
twitchIdentityProvider.getAdditionalCredentialInformation(defaultAuthToken).ifPresent(oauth -> {
this.defaultClientId = (String) oauth.getContext().get("client_id");
accessTokenCache.put(oauth.getAccessToken(), oauth);
});
}

/**
Expand All @@ -74,8 +98,8 @@ public void apply(RequestTemplate template) {
String clientId = this.defaultClientId;

// if a oauth token is passed is has to match that client id, default to global client id otherwise (for ie. token verification)
if (template.headers().containsKey("Authorization")) {
String oauthToken = template.headers().get("Authorization").iterator().next().substring("Bearer ".length());
if (template.headers().containsKey(AUTH_HEADER)) {
String oauthToken = template.headers().get(AUTH_HEADER).iterator().next().substring(BEARER_PREFIX.length());

if (oauthToken.isEmpty()) {
String clientSecret = twitchAPIBuilder.getClientSecret();
Expand All @@ -88,8 +112,8 @@ public void apply(RequestTemplate template) {
throw new RuntimeException("Failed to generate an app access token as no oauth token was passed to this Helix call", e);
}

template.removeHeader("Authorization");
template.header("Authorization", "Bearer " + oauthToken);
template.removeHeader(AUTH_HEADER);
template.header(AUTH_HEADER, BEARER_PREFIX + oauthToken);
} else {
OAuth2Credential verifiedCredential = accessTokenCache.getIfPresent(oauthToken);
if (verifiedCredential == null) {
Expand All @@ -115,11 +139,43 @@ public void apply(RequestTemplate template) {
template.header("User-Agent", twitchAPIBuilder.getUserAgent());
}

public void updateRemaining(String token, int remaining) {
OAuth2Credential credential = accessTokenCache.getIfPresent(token);
if (credential == null) return;

String key = getKey(credential);
if (key == null) return;

Bucket bucket = getOrInitializeBucket(key);
long diff = bucket.getAvailableTokens() - remaining;
if (diff > 0) bucket.tryConsumeAsMuchAsPossible(diff);
}

public void clearDefaultToken() {
this.defaultAuthToken = null;
}

protected String getKey(OAuth2Credential credential) {
String clientId = (String) credential.getContext().get("client_id");
return clientId == null ? null : credential.getUserId() == null ? clientId : clientId + "-" + credential.getUserId();
}

protected Bucket getOrInitializeBucket(String key) {
return buckets.get(key, k -> Bucket4j.builder().addLimit(DEFAULT_BANDWIDTH).build());
}

private OAuth2Credential getOrCreateAuthToken() {
if (defaultAuthToken == null)
synchronized (this) {
if (defaultAuthToken == null)
return (this.defaultAuthToken = twitchIdentityProvider.getAppAccessToken());
if (defaultAuthToken == null) {
String clientId = twitchAPIBuilder.getClientId();
OAuth2Credential token = twitchIdentityProvider.getAppAccessToken();
token.getContext().put("client_id", clientId);
getOrInitializeBucket(clientId);
accessTokenCache.put(token.getAccessToken(), token);
this.defaultClientId = clientId;
return this.defaultAuthToken = token;
}
}

return this.defaultAuthToken;
Expand Down
@@ -0,0 +1,48 @@
package com.github.twitch4j.helix.interceptor;

import com.fasterxml.jackson.databind.ObjectMapper;
import feign.Response;
import feign.jackson.JacksonDecoder;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;

import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.AUTH_HEADER;
import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.BEARER_PREFIX;

public class TwitchHelixDecoder extends JacksonDecoder {

public static final String REMAINING_HEADER = "Ratelimit-Remaining";

private final TwitchHelixClientIdInterceptor interceptor;

public TwitchHelixDecoder(ObjectMapper mapper, TwitchHelixClientIdInterceptor interceptor) {
super(mapper);
this.interceptor = interceptor;
}

@Override
public Object decode(Response response, Type type) throws IOException {
// track rate limit for token
String token = singleFirst(response.request().headers().get(AUTH_HEADER));
if (token != null && token.startsWith(BEARER_PREFIX)) {
String remaining = singleFirst(response.headers().get(REMAINING_HEADER));
if (remaining != null) {
try {
interceptor.updateRemaining(token.substring(BEARER_PREFIX.length()), Integer.parseInt(remaining));
} catch (Exception ignored) {
}
}
}

// delegate to JacksonDecoder
return super.decode(response, type);
}

static String singleFirst(Collection<String> collection) {
if (collection == null || collection.size() != 1) return null;
return collection.toArray(new String[1])[0];
}

}
@@ -0,0 +1,73 @@
package com.github.twitch4j.helix.interceptor;

import com.github.philippheuer.credentialmanager.domain.OAuth2Credential;
import feign.Client;
import feign.Request;
import feign.Response;
import feign.okhttp.OkHttpClient;
import io.github.bucket4j.Bucket;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.AUTH_HEADER;
import static com.github.twitch4j.helix.interceptor.TwitchHelixClientIdInterceptor.BEARER_PREFIX;
import static com.github.twitch4j.helix.interceptor.TwitchHelixDecoder.singleFirst;

@Slf4j
public class TwitchHelixHttpClient implements Client {

private final Client client;
private final ScheduledExecutorService executor;
private final TwitchHelixClientIdInterceptor interceptor;
private final long timeout;

public TwitchHelixHttpClient(OkHttpClient client, ScheduledThreadPoolExecutor executor, TwitchHelixClientIdInterceptor interceptor, Integer timeout) {
this.client = client;
this.executor = executor;
this.interceptor = interceptor;
this.timeout = timeout == null ? 60 * 1000 : timeout.longValue();
}

@Override
public Response execute(Request request, Request.Options options) throws IOException {
// Check whether this request should be delayed to conform to rate limits
String token = singleFirst(request.headers().get(AUTH_HEADER));
if (token != null && token.startsWith(BEARER_PREFIX)) {
OAuth2Credential credential = interceptor.getAccessTokenCache().getIfPresent(token.substring(BEARER_PREFIX.length()));
if (credential != null) {
Bucket bucket = interceptor.getOrInitializeBucket(interceptor.getKey(credential));
if (bucket.tryConsume(1)) {
// no delay needed
return client.execute(request, options);
} else {
try {
// effectively blocking, unfortunately
return bucket.asAsyncScheduler().consume(1, executor)
.thenApplyAsync(v -> {
try {
return client.execute(request, options);
} catch (IOException e) {
log.error("Helix API call execution failed", e);
return null;
}
})
.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Throttled Helix API call timed-out before completion", e);
return null;
}
}
}
}

// Fallback: just run the http request
return client.execute(request, options);
}

}
Expand Up @@ -315,6 +315,7 @@ public TwitchClient build() {
.withUserAgent(userAgent)
.withDefaultAuthToken(defaultAuthToken)
.withRequestQueueSize(requestQueueSize)
.withScheduledThreadPoolExecutor(scheduledThreadPoolExecutor)
.withTimeout(timeout)
.withProxyConfig(proxyConfig)
.withLogLevel(feignLogLevel)
Expand Down

0 comments on commit 68bfe14

Please sign in to comment.