Skip to content

Commit

Permalink
Add retry feature to async feign (#1757)
Browse files Browse the repository at this point in the history
* Define MethodHandler.Factory interface

* Extract AsynchronousMethodHandler from SynchronousMethodHandler

* Genericize AsynchronousMethodHandler for receive requestContext

* Pass requestContext to AsynchronousMethodHandler

* Add retry feature to AsyncFeign

* Remove ReflectiveAsyncFeign

Co-authored-by: Marvin Froeder <velo@users.noreply.github.com>
  • Loading branch information
wplong11 and velo committed Oct 9, 2022
1 parent 58d49da commit 368818a
Show file tree
Hide file tree
Showing 10 changed files with 480 additions and 409 deletions.
160 changes: 60 additions & 100 deletions core/src/main/java/feign/AsyncFeign.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,21 @@
*/
package feign;

import feign.InvocationHandlerFactory.MethodHandler;
import feign.ReflectiveFeign.ParseHandlersByName;
import feign.Logger.Level;
import feign.Request.Options;
import feign.Target.HardCodedTarget;
import feign.codec.Decoder;
import feign.codec.Encoder;
import feign.codec.ErrorDecoder;
import java.io.IOException;
import java.util.Optional;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
Expand All @@ -45,7 +46,7 @@
* be done (for example, creating and submitting a task to an {@link ExecutorService}).
*/
@Experimental
public abstract class AsyncFeign<C> {
public final class AsyncFeign<C> {
public static <C> AsyncBuilder<C> builder() {
return new AsyncBuilder<>();
}
Expand Down Expand Up @@ -191,7 +192,6 @@ public AsyncBuilder<C> invocationHandlerFactory(InvocationHandlerFactory invocat

public AsyncFeign<C> build() {
super.enrich();
ThreadLocal<AsyncInvocation<C>> activeContextHolder = new ThreadLocal<>();

AsyncResponseHandler responseHandler =
(AsyncResponseHandler) Capability.enrich(
Expand All @@ -205,102 +205,25 @@ public AsyncFeign<C> build() {
AsyncResponseHandler.class,
capabilities);

final SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(stageExecution(activeContextHolder, client), retryer,
requestInterceptors,
responseInterceptor, logger, logLevel, dismiss404, closeAfterDecode,
propagationPolicy, true);
final ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder,
stageDecode(activeContextHolder, logger, logLevel, responseHandler), queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
final ReflectiveFeign feign =
new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
return new ReflectiveAsyncFeign<>(feign, defaultContextSupplier, activeContextHolder,
methodInfoResolver);
}

private Client stageExecution(
ThreadLocal<AsyncInvocation<C>> activeContext,
AsyncClient<C> client) {
return (request, options) -> {
final Response result = Response.builder().status(200).request(request).build();

final AsyncInvocation<C> invocationContext = activeContext.get();

invocationContext.setResponseFuture(
client.execute(request, options, Optional.ofNullable(invocationContext.context())));

return result;
};
}

// from SynchronousMethodHandler
long elapsedTime(long start) {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}

private Decoder stageDecode(
ThreadLocal<AsyncInvocation<C>> activeContext,
Logger logger,
Level logLevel,
AsyncResponseHandler responseHandler) {
return (response, type) -> {
final AsyncInvocation<C> invocationContext = activeContext.get();

final CompletableFuture<Object> result = new CompletableFuture<>();

invocationContext
.responseFuture()
.whenComplete(
(r, t) -> {
final long elapsedTime = elapsedTime(invocationContext.startNanos());

if (t != null) {
if (logLevel != Logger.Level.NONE && t instanceof IOException) {
final IOException e = (IOException) t;
logger.logIOException(invocationContext.configKey(), logLevel, e,
elapsedTime);
}
result.completeExceptionally(t);
} else {
responseHandler.handleResponse(
result,
invocationContext.configKey(),
r,
invocationContext.underlyingType(),
elapsedTime);
}
});

result.whenComplete(
(r, t) -> {
if (result.isCancelled()) {
invocationContext.responseFuture().cancel(true);
}
});

if (invocationContext.isAsyncReturnType()) {
return result;
}
try {
return result.join();
} catch (final CompletionException e) {
final Response r = invocationContext.responseFuture().join();
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
throw new AsyncJoinException(r.status(), cause.getMessage(), r.request(), cause);
}
};
final MethodHandler.Factory<C> methodHandlerFactory =
new AsynchronousMethodHandler.Factory<>(
client, retryer, requestInterceptors,
responseHandler, logger, logLevel,
propagationPolicy, methodInfoResolver);
final ParseHandlersByName<C> handlersByName =
new ParseHandlersByName<>(contract, options, encoder,
decoder, queryMapEncoder,
errorDecoder, methodHandlerFactory);
final ReflectiveFeign<C> feign =
new ReflectiveFeign<>(handlersByName, invocationHandlerFactory, queryMapEncoder);
return new AsyncFeign<>(feign, defaultContextSupplier);
}
}

private final Feign feign;
private AsyncContextSupplier<C> defaultContextSupplier;
private final ReflectiveFeign<C> feign;
private final AsyncContextSupplier<C> defaultContextSupplier;

protected AsyncFeign(Feign feign, AsyncContextSupplier<C> defaultContextSupplier) {
private AsyncFeign(ReflectiveFeign<C> feign, AsyncContextSupplier<C> defaultContextSupplier) {
this.feign = feign;
this.defaultContextSupplier = defaultContextSupplier;
}
Expand All @@ -310,8 +233,45 @@ public <T> T newInstance(Target<T> target) {
}

public <T> T newInstance(Target<T> target, C context) {
return wrap(target.type(), feign.newInstance(target), context);
verifyTargetSpecfication(target);
return feign.newInstance(target, context);
}

protected abstract <T> T wrap(Class<T> type, T instance, C context);
private <T> void verifyTargetSpecfication(Target<T> target) {
Class<T> type = target.type();
if (!type.isInterface()) {
throw new IllegalArgumentException("Type must be an interface: " + type);
}

for (final Method m : type.getMethods()) {
final Class<?> retType = m.getReturnType();

if (!CompletableFuture.class.isAssignableFrom(retType)) {
continue; // synchronous case
}

if (retType != CompletableFuture.class) {
throw new IllegalArgumentException("Method return type is not CompleteableFuture: "
+ getFullMethodName(type, retType, m));
}

final Type genRetType = m.getGenericReturnType();

if (!ParameterizedType.class.isInstance(genRetType)) {
throw new IllegalArgumentException("Method return type is not parameterized: "
+ getFullMethodName(type, genRetType, m));
}

if (WildcardType.class
.isInstance(ParameterizedType.class.cast(genRetType).getActualTypeArguments()[0])) {
throw new IllegalArgumentException(
"Wildcards are not supported for return-type parameters: "
+ getFullMethodName(type, genRetType, m));
}
}
}

private String getFullMethodName(Class<?> type, Type retType, Method m) {
return retType.getTypeName() + " " + type.toGenericString() + "." + m.getName();
}
}
64 changes: 0 additions & 64 deletions core/src/main/java/feign/AsyncInvocation.java

This file was deleted.

35 changes: 0 additions & 35 deletions core/src/main/java/feign/AsyncJoinException.java

This file was deleted.

0 comments on commit 368818a

Please sign in to comment.