Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry feature to async feign #1757

Merged
merged 11 commits into from
Oct 9, 2022
160 changes: 60 additions & 100 deletions core/src/main/java/feign/AsyncFeign.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
*/
package feign;

import feign.InvocationHandlerFactory.MethodHandler;
import feign.ReflectiveFeign.ParseHandlersByName;
import feign.Logger.Level;
import feign.Request.Options;
import feign.Target.HardCodedTarget;
import feign.codec.Decoder;
import feign.codec.Encoder;
import feign.codec.ErrorDecoder;
import java.io.IOException;
import java.util.Optional;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -45,7 +46,7 @@
* be done (for example, creating and submitting a task to an {@link ExecutorService}).
*/
@Experimental
public abstract class AsyncFeign<C> {
public final class AsyncFeign<C> {
public static <C> AsyncBuilder<C> builder() {
return new AsyncBuilder<>();
}
Expand Down Expand Up @@ -191,7 +192,6 @@ public AsyncBuilder<C> invocationHandlerFactory(InvocationHandlerFactory invocat

public AsyncFeign<C> build() {
super.enrich();
ThreadLocal<AsyncInvocation<C>> activeContextHolder = new ThreadLocal<>();

AsyncResponseHandler responseHandler =
(AsyncResponseHandler) Capability.enrich(
Expand All @@ -205,102 +205,25 @@ public AsyncFeign<C> build() {
AsyncResponseHandler.class,
capabilities);

final SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(stageExecution(activeContextHolder, client), retryer,
requestInterceptors,
responseInterceptor, logger, logLevel, dismiss404, closeAfterDecode,
propagationPolicy, true);
final ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder,
stageDecode(activeContextHolder, logger, logLevel, responseHandler), queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
final ReflectiveFeign feign =
new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
return new ReflectiveAsyncFeign<>(feign, defaultContextSupplier, activeContextHolder,
methodInfoResolver);
}

private Client stageExecution(
ThreadLocal<AsyncInvocation<C>> activeContext,
AsyncClient<C> client) {
return (request, options) -> {
final Response result = Response.builder().status(200).request(request).build();

final AsyncInvocation<C> invocationContext = activeContext.get();

invocationContext.setResponseFuture(
client.execute(request, options, Optional.ofNullable(invocationContext.context())));

return result;
};
}

// from SynchronousMethodHandler
long elapsedTime(long start) {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}

private Decoder stageDecode(
ThreadLocal<AsyncInvocation<C>> activeContext,
Logger logger,
Level logLevel,
AsyncResponseHandler responseHandler) {
return (response, type) -> {
final AsyncInvocation<C> invocationContext = activeContext.get();

final CompletableFuture<Object> result = new CompletableFuture<>();

invocationContext
.responseFuture()
.whenComplete(
(r, t) -> {
final long elapsedTime = elapsedTime(invocationContext.startNanos());

if (t != null) {
if (logLevel != Logger.Level.NONE && t instanceof IOException) {
final IOException e = (IOException) t;
logger.logIOException(invocationContext.configKey(), logLevel, e,
elapsedTime);
}
result.completeExceptionally(t);
} else {
responseHandler.handleResponse(
result,
invocationContext.configKey(),
r,
invocationContext.underlyingType(),
elapsedTime);
}
});

result.whenComplete(
(r, t) -> {
if (result.isCancelled()) {
invocationContext.responseFuture().cancel(true);
}
});

if (invocationContext.isAsyncReturnType()) {
return result;
}
try {
return result.join();
} catch (final CompletionException e) {
final Response r = invocationContext.responseFuture().join();
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
throw new AsyncJoinException(r.status(), cause.getMessage(), r.request(), cause);
}
};
final MethodHandler.Factory<C> methodHandlerFactory =
new AsynchronousMethodHandler.Factory<>(
client, retryer, requestInterceptors,
responseHandler, logger, logLevel,
propagationPolicy, methodInfoResolver);
final ParseHandlersByName<C> handlersByName =
new ParseHandlersByName<>(contract, options, encoder,
decoder, queryMapEncoder,
errorDecoder, methodHandlerFactory);
final ReflectiveFeign<C> feign =
new ReflectiveFeign<>(handlersByName, invocationHandlerFactory, queryMapEncoder);
return new AsyncFeign<>(feign, defaultContextSupplier);
}
}

private final Feign feign;
private AsyncContextSupplier<C> defaultContextSupplier;
private final ReflectiveFeign<C> feign;
private final AsyncContextSupplier<C> defaultContextSupplier;

protected AsyncFeign(Feign feign, AsyncContextSupplier<C> defaultContextSupplier) {
private AsyncFeign(ReflectiveFeign<C> feign, AsyncContextSupplier<C> defaultContextSupplier) {
this.feign = feign;
this.defaultContextSupplier = defaultContextSupplier;
}
Expand All @@ -310,8 +233,45 @@ public <T> T newInstance(Target<T> target) {
}

public <T> T newInstance(Target<T> target, C context) {
return wrap(target.type(), feign.newInstance(target), context);
verifyTargetSpecfication(target);
return feign.newInstance(target, context);
}

protected abstract <T> T wrap(Class<T> type, T instance, C context);
private <T> void verifyTargetSpecfication(Target<T> target) {
Class<T> type = target.type();
if (!type.isInterface()) {
throw new IllegalArgumentException("Type must be an interface: " + type);
}

for (final Method m : type.getMethods()) {
final Class<?> retType = m.getReturnType();

if (!CompletableFuture.class.isAssignableFrom(retType)) {
continue; // synchronous case
}

if (retType != CompletableFuture.class) {
throw new IllegalArgumentException("Method return type is not CompleteableFuture: "
+ getFullMethodName(type, retType, m));
}

final Type genRetType = m.getGenericReturnType();

if (!ParameterizedType.class.isInstance(genRetType)) {
throw new IllegalArgumentException("Method return type is not parameterized: "
+ getFullMethodName(type, genRetType, m));
}

if (WildcardType.class
.isInstance(ParameterizedType.class.cast(genRetType).getActualTypeArguments()[0])) {
throw new IllegalArgumentException(
"Wildcards are not supported for return-type parameters: "
+ getFullMethodName(type, genRetType, m));
}
}
}

private String getFullMethodName(Class<?> type, Type retType, Method m) {
return retType.getTypeName() + " " + type.toGenericString() + "." + m.getName();
}
}
64 changes: 0 additions & 64 deletions core/src/main/java/feign/AsyncInvocation.java

This file was deleted.

35 changes: 0 additions & 35 deletions core/src/main/java/feign/AsyncJoinException.java

This file was deleted.