diff --git a/gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java b/gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java index 335a70966..e5084b753 100644 --- a/gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java +++ b/gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcDirectServerStreamingCallableTest.java @@ -145,7 +145,7 @@ public void testServerStreaming() throws Exception { streamingCallable.call(DEFAULT_REQUEST, moneyObserver); - latch.await(20, TimeUnit.SECONDS); + Truth.assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue(); Truth.assertThat(moneyObserver.error).isNull(); Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE); } @@ -157,13 +157,13 @@ public void testManualFlowControl() throws Exception { streamingCallable.call(DEFAULT_REQUEST, moneyObserver); - latch.await(500, TimeUnit.MILLISECONDS); + Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isFalse(); Truth.assertWithMessage("Received response before requesting it") .that(moneyObserver.response) .isNull(); moneyObserver.controller.request(1); - latch.await(500, TimeUnit.MILLISECONDS); + Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue(); Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE); Truth.assertThat(moneyObserver.completed).isTrue(); @@ -178,7 +178,7 @@ public void testCancelClientCall() throws Exception { moneyObserver.controller.cancel(); moneyObserver.controller.request(1); - latch.await(500, TimeUnit.MILLISECONDS); + Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue(); Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class); Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream"); @@ -190,7 +190,7 @@ public void testOnResponseError() throws Throwable { MoneyObserver moneyObserver = new MoneyObserver(true, latch); streamingCallable.call(ERROR_REQUEST, moneyObserver); - latch.await(500, TimeUnit.MILLISECONDS); + Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue(); Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class); Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode()) diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMessageHttpResponseParser.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMessageHttpResponseParser.java index 8c5a5c806..643f1fbda 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMessageHttpResponseParser.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMessageHttpResponseParser.java @@ -40,6 +40,7 @@ import com.google.protobuf.TypeRegistry; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.Reader; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; @@ -91,25 +92,28 @@ ApiMessageHttpResponseParser.Builder newBuilder() { @Override public ResponseT parse(InputStream httpResponseBody) { + return parse(httpResponseBody, null); + } + + @Override + public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) { + return parse(new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), registry); + } + + @Override + public ResponseT parse(Reader httpResponseBody, TypeRegistry registry) { if (getResponseInstance() == null) { return null; } else { Type responseType = getResponseInstance().getClass(); try { - return getResponseMarshaller() - .fromJson( - new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), responseType); + return getResponseMarshaller().fromJson(httpResponseBody, responseType); } catch (JsonIOException | JsonSyntaxException e) { throw new RestSerializationException(e); } } } - @Override - public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) { - return parse(httpResponseBody); - } - @Override public String serialize(ResponseT response) { return getResponseMarshaller().toJson(response); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMethodDescriptor.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMethodDescriptor.java index 665aea66f..fcd3c6b33 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMethodDescriptor.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ApiMethodDescriptor.java @@ -37,12 +37,18 @@ @AutoValue /* Method descriptor for messages to be transmitted over HTTP. */ public abstract class ApiMethodDescriptor { + public enum MethodType { + UNARY, + CLIENT_STREAMING, + SERVER_STREAMING, + BIDI_STREAMING, + UNKNOWN; + } public abstract String getFullMethodName(); public abstract HttpRequestFormatter getRequestFormatter(); - @Nullable public abstract HttpResponseParser getResponseParser(); /** Return the HTTP method for this request message type. */ @@ -55,8 +61,11 @@ public abstract class ApiMethodDescriptor { @Nullable public abstract PollingRequestFactory getPollingRequestFactory(); + public abstract MethodType getType(); + public static Builder newBuilder() { - return new AutoValue_ApiMethodDescriptor.Builder(); + return new AutoValue_ApiMethodDescriptor.Builder() + .setType(MethodType.UNARY); } @AutoValue.Builder @@ -78,6 +87,8 @@ public abstract Builder setOperationSnapshotFactory( public abstract Builder setPollingRequestFactory( PollingRequestFactory pollingRequestFactory); + public abstract Builder setType(MethodType type); + public abstract ApiMethodDescriptor build(); } } diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonApiExceptionFactory.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonApiExceptionFactory.java new file mode 100644 index 000000000..a48f0b5a2 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonApiExceptionFactory.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.client.http.HttpResponseException; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import java.util.concurrent.CancellationException; + +class HttpJsonApiExceptionFactory { + private final Set retryableCodes; + + HttpJsonApiExceptionFactory(Set retryableCodes) { + this.retryableCodes = ImmutableSet.copyOf(retryableCodes); + } + + ApiException create(Throwable throwable) { + if (throwable instanceof HttpResponseException) { + HttpResponseException e = (HttpResponseException) throwable; + StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode()); + boolean canRetry = retryableCodes.contains(statusCode.getCode()); + String message = e.getStatusMessage(); + return createApiException(throwable, statusCode, message, canRetry); + } else if (throwable instanceof HttpJsonStatusRuntimeException) { + HttpJsonStatusRuntimeException e = (HttpJsonStatusRuntimeException) throwable; + StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode()); + return createApiException( + throwable, statusCode, e.getMessage(), retryableCodes.contains(statusCode.getCode())); + } else if (throwable instanceof CancellationException) { + return ApiExceptionFactory.createException( + throwable, HttpJsonStatusCode.of(Code.CANCELLED), false); + } else if (throwable instanceof ApiException) { + return (ApiException) throwable; + } else { + // Do not retry on unknown throwable, even when UNKNOWN is in retryableCodes + return ApiExceptionFactory.createException( + throwable, HttpJsonStatusCode.of(StatusCode.Code.UNKNOWN), false); + } + } + + private ApiException createApiException( + Throwable throwable, StatusCode statusCode, String message, boolean canRetry) { + return message == null + ? ApiExceptionFactory.createException(throwable, statusCode, canRetry) + : ApiExceptionFactory.createException(message, throwable, statusCode, canRetry); + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallContext.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallContext.java index 0d3b00898..9433d1c00 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallContext.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallContext.java @@ -62,23 +62,39 @@ @BetaApi public final class HttpJsonCallContext implements ApiCallContext { private final HttpJsonChannel channel; - private final Duration timeout; - private final Instant deadline; - private final Credentials credentials; + private final HttpJsonCallOptions callOptions; + @Nullable private final Duration timeout; + @Nullable private final Duration streamWaitTimeout; + @Nullable private final Duration streamIdleTimeout; private final ImmutableMap> extraHeaders; private final ApiCallContextOptions options; private final ApiTracer tracer; - private final RetrySettings retrySettings; - private final ImmutableSet retryableCodes; + @Nullable private final RetrySettings retrySettings; + @Nullable private final ImmutableSet retryableCodes; /** Returns an empty instance. */ public static HttpJsonCallContext createDefault() { return new HttpJsonCallContext( null, + HttpJsonCallOptions.newBuilder().build(), null, null, null, - ImmutableMap.>of(), + ImmutableMap.of(), + ApiCallContextOptions.getDefaultOptions(), + null, + null, + null); + } + + public static HttpJsonCallContext of(HttpJsonChannel channel, HttpJsonCallOptions options) { + return new HttpJsonCallContext( + channel, + options, + null, + null, + null, + ImmutableMap.of(), ApiCallContextOptions.getDefaultOptions(), null, null, @@ -87,18 +103,20 @@ public static HttpJsonCallContext createDefault() { private HttpJsonCallContext( HttpJsonChannel channel, + HttpJsonCallOptions callOptions, Duration timeout, - Instant deadline, - Credentials credentials, + Duration streamWaitTimeout, + Duration streamIdleTimeout, ImmutableMap> extraHeaders, ApiCallContextOptions options, ApiTracer tracer, RetrySettings defaultRetrySettings, Set defaultRetryableCodes) { this.channel = channel; + this.callOptions = callOptions; this.timeout = timeout; - this.deadline = deadline; - this.credentials = credentials; + this.streamWaitTimeout = streamWaitTimeout; + this.streamIdleTimeout = streamIdleTimeout; this.extraHeaders = extraHeaders; this.options = options; this.tracer = tracer; @@ -146,19 +164,22 @@ public HttpJsonCallContext merge(ApiCallContext inputCallContext) { newChannel = this.channel; } + // Do deep merge of callOptions + HttpJsonCallOptions newCallOptions = callOptions.merge(httpJsonCallContext.callOptions); + Duration newTimeout = httpJsonCallContext.timeout; if (newTimeout == null) { newTimeout = this.timeout; } - Instant newDeadline = httpJsonCallContext.deadline; - if (newDeadline == null) { - newDeadline = this.deadline; + Duration newStreamWaitTimeout = httpJsonCallContext.streamWaitTimeout; + if (newStreamWaitTimeout == null) { + newStreamWaitTimeout = streamWaitTimeout; } - Credentials newCredentials = httpJsonCallContext.credentials; - if (newCredentials == null) { - newCredentials = this.credentials; + Duration newStreamIdleTimeout = httpJsonCallContext.streamIdleTimeout; + if (newStreamIdleTimeout == null) { + newStreamIdleTimeout = streamIdleTimeout; } ImmutableMap> newExtraHeaders = @@ -183,9 +204,10 @@ public HttpJsonCallContext merge(ApiCallContext inputCallContext) { return new HttpJsonCallContext( newChannel, + newCallOptions, newTimeout, - newDeadline, - newCredentials, + newStreamWaitTimeout, + newStreamIdleTimeout, newExtraHeaders, newOptions, newTracer, @@ -195,16 +217,9 @@ public HttpJsonCallContext merge(ApiCallContext inputCallContext) { @Override public HttpJsonCallContext withCredentials(Credentials newCredentials) { - return new HttpJsonCallContext( - this.channel, - this.timeout, - this.deadline, - newCredentials, - this.extraHeaders, - this.options, - this.tracer, - this.retrySettings, - this.retryableCodes); + HttpJsonCallOptions.Builder builder = + callOptions != null ? callOptions.toBuilder() : HttpJsonCallOptions.newBuilder(); + return withCallOptions(builder.setCredentials(newCredentials).build()); } @Override @@ -232,9 +247,10 @@ public HttpJsonCallContext withTimeout(Duration timeout) { return new HttpJsonCallContext( this.channel, + this.callOptions, timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, this.tracer, @@ -249,25 +265,65 @@ public Duration getTimeout() { } @Override - public ApiCallContext withStreamWaitTimeout(@Nonnull Duration streamWaitTimeout) { - throw new UnsupportedOperationException("Http/json transport does not support streaming"); + public HttpJsonCallContext withStreamWaitTimeout(@Nullable Duration streamWaitTimeout) { + if (streamWaitTimeout != null) { + Preconditions.checkArgument( + streamWaitTimeout.compareTo(Duration.ZERO) >= 0, "Invalid timeout: < 0 s"); + } + + return new HttpJsonCallContext( + this.channel, + this.callOptions, + this.timeout, + streamWaitTimeout, + this.streamIdleTimeout, + this.extraHeaders, + this.options, + this.tracer, + this.retrySettings, + this.retryableCodes); } - @Nullable + /** + * The stream wait timeout set for this context. + * + * @see ApiCallContext#withStreamWaitTimeout(Duration) + */ @Override + @Nullable public Duration getStreamWaitTimeout() { - throw new UnsupportedOperationException("Http/json transport does not support streaming"); + return streamWaitTimeout; } @Override - public ApiCallContext withStreamIdleTimeout(@Nonnull Duration streamIdleTimeout) { - throw new UnsupportedOperationException("Http/json transport does not support streaming"); + public HttpJsonCallContext withStreamIdleTimeout(@Nullable Duration streamIdleTimeout) { + if (streamIdleTimeout != null) { + Preconditions.checkArgument( + streamIdleTimeout.compareTo(Duration.ZERO) >= 0, "Invalid timeout: < 0 s"); + } + + return new HttpJsonCallContext( + this.channel, + this.callOptions, + this.timeout, + this.streamWaitTimeout, + streamIdleTimeout, + this.extraHeaders, + this.options, + this.tracer, + this.retrySettings, + this.retryableCodes); } - @Nullable + /** + * The stream idle timeout set for this context. + * + * @see ApiCallContext#withStreamIdleTimeout(Duration) + */ @Override + @Nullable public Duration getStreamIdleTimeout() { - throw new UnsupportedOperationException("Http/json transport does not support streaming"); + return streamIdleTimeout; } @BetaApi("The surface for extra headers is not stable yet and may change in the future.") @@ -278,9 +334,10 @@ public ApiCallContext withExtraHeaders(Map> extraHeaders) { Headers.mergeHeaders(this.extraHeaders, extraHeaders); return new HttpJsonCallContext( this.channel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, newExtraHeaders, this.options, this.tracer, @@ -300,9 +357,10 @@ public ApiCallContext withOption(Key key, T value) { ApiCallContextOptions newOptions = options.withOption(key, value); return new HttpJsonCallContext( this.channel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, newOptions, this.tracer, @@ -320,12 +378,20 @@ public HttpJsonChannel getChannel() { return channel; } + public HttpJsonCallOptions getCallOptions() { + return callOptions; + } + + @Deprecated + @Nullable public Instant getDeadline() { - return deadline; + return getCallOptions() != null ? getCallOptions().getDeadline() : null; } + @Deprecated + @Nullable public Credentials getCredentials() { - return credentials; + return getCallOptions() != null ? getCallOptions().getCredentials() : null; } @Override @@ -337,9 +403,10 @@ public RetrySettings getRetrySettings() { public HttpJsonCallContext withRetrySettings(RetrySettings retrySettings) { return new HttpJsonCallContext( this.channel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, this.tracer, @@ -356,9 +423,10 @@ public Set getRetryableCodes() { public HttpJsonCallContext withRetryableCodes(Set retryableCodes) { return new HttpJsonCallContext( this.channel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, this.tracer, @@ -369,9 +437,10 @@ public HttpJsonCallContext withRetryableCodes(Set retryableCode public HttpJsonCallContext withChannel(HttpJsonChannel newChannel) { return new HttpJsonCallContext( newChannel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, this.tracer, @@ -379,12 +448,13 @@ public HttpJsonCallContext withChannel(HttpJsonChannel newChannel) { this.retryableCodes); } - public HttpJsonCallContext withDeadline(Instant newDeadline) { + public HttpJsonCallContext withCallOptions(HttpJsonCallOptions newCallOptions) { return new HttpJsonCallContext( this.channel, + newCallOptions, this.timeout, - newDeadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, this.tracer, @@ -392,6 +462,13 @@ public HttpJsonCallContext withDeadline(Instant newDeadline) { this.retryableCodes); } + @Deprecated + public HttpJsonCallContext withDeadline(Instant newDeadline) { + HttpJsonCallOptions.Builder builder = + callOptions != null ? callOptions.toBuilder() : HttpJsonCallOptions.newBuilder(); + return withCallOptions(builder.setDeadline(newDeadline).build()); + } + @Nonnull @Override public ApiTracer getTracer() { @@ -408,9 +485,10 @@ public HttpJsonCallContext withTracer(@Nonnull ApiTracer newTracer) { return new HttpJsonCallContext( this.channel, + this.callOptions, this.timeout, - this.deadline, - this.credentials, + this.streamWaitTimeout, + this.streamIdleTimeout, this.extraHeaders, this.options, newTracer, @@ -428,9 +506,8 @@ public boolean equals(Object o) { } HttpJsonCallContext that = (HttpJsonCallContext) o; return Objects.equals(this.channel, that.channel) + && Objects.equals(this.callOptions, that.callOptions) && Objects.equals(this.timeout, that.timeout) - && Objects.equals(this.deadline, that.deadline) - && Objects.equals(this.credentials, that.credentials) && Objects.equals(this.extraHeaders, that.extraHeaders) && Objects.equals(this.options, that.options) && Objects.equals(this.tracer, that.tracer) @@ -442,9 +519,8 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash( channel, + callOptions, timeout, - deadline, - credentials, extraHeaders, options, tracer, diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java index beb5ff98b..dbb3cb625 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java @@ -40,6 +40,8 @@ @BetaApi @AutoValue public abstract class HttpJsonCallOptions { + public static final HttpJsonCallOptions DEFAULT = newBuilder().build(); + @Nullable public abstract Instant getDeadline(); @@ -49,10 +51,37 @@ public abstract class HttpJsonCallOptions { @Nullable public abstract TypeRegistry getTypeRegistry(); + public abstract Builder toBuilder(); + public static Builder newBuilder() { return new AutoValue_HttpJsonCallOptions.Builder(); } + public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) { + if (inputOptions == null) { + return this; + } + + Builder builder = this.toBuilder(); + + Instant newDeadline = inputOptions.getDeadline(); + if (newDeadline != null) { + builder.setDeadline(newDeadline); + } + + Credentials newCredentials = inputOptions.getCredentials(); + if (newCredentials != null) { + builder.setCredentials(newCredentials); + } + + TypeRegistry newTypeRegistry = inputOptions.getTypeRegistry(); + if (newTypeRegistry != null) { + builder.setTypeRegistry(newTypeRegistry); + } + + return builder.build(); + } + @AutoValue.Builder public abstract static class Builder { public abstract Builder setDeadline(Instant value); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java index c6b4c5763..d951a90a5 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallableFactory.java @@ -39,6 +39,8 @@ import com.google.api.gax.rpc.OperationCallSettings; import com.google.api.gax.rpc.OperationCallable; import com.google.api.gax.rpc.PagedCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; import com.google.api.gax.tracing.SpanName; @@ -173,6 +175,28 @@ OperationCallable createOperationCallable( return operationCallable.withDefaultCallContext(clientContext.getDefaultCallContext()); } + @BetaApi("The surface for streaming is not stable yet and may change in the future.") + public static + ServerStreamingCallable createServerStreamingCallable( + HttpJsonCallSettings httpJsoncallSettings, + ServerStreamingCallSettings streamingCallSettings, + ClientContext clientContext) { + + ServerStreamingCallable callable = + new HttpJsonDirectServerStreamingCallable<>(httpJsoncallSettings.getMethodDescriptor()); + + callable = + new HttpJsonExceptionServerStreamingCallable<>( + callable, streamingCallSettings.getRetryableCodes()); + + if (clientContext.getStreamWatchdog() != null) { + callable = Callables.watched(callable, streamingCallSettings, clientContext); + } + + callable = Callables.retrying(callable, streamingCallSettings, clientContext); + return callable.withDefaultCallContext(clientContext.getDefaultCallContext()); + } + @InternalApi("Visible for testing") static SpanName getSpanName(@Nonnull ApiMethodDescriptor methodDescriptor) { Matcher matcher = FULL_METHOD_NAME_REGEX.matcher(methodDescriptor.getFullMethodName()); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonChannel.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonChannel.java index 01cd47cdd..558816c4d 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonChannel.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonChannel.java @@ -35,6 +35,10 @@ /** HttpJsonChannel contains the functionality to issue http-json calls. */ @BetaApi public interface HttpJsonChannel { + HttpJsonClientCall newCall( + ApiMethodDescriptor methodDescriptor, HttpJsonCallOptions callOptions); + + @Deprecated ApiFuture issueFutureUnaryCall( HttpJsonCallOptions callOptions, RequestT request, diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCall.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCall.java new file mode 100644 index 000000000..16ecd6795 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCall.java @@ -0,0 +1,158 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.core.BetaApi; +import javax.annotation.Nullable; + +// This class mimics the structure and behavior of the corresponding ClientCall from gRPC package as +// closely as possible. +/** + * An instance of a call to a remote method. A call will send zero or more request messages to the + * server and receive zero or more response messages back. + * + *

Instances are created by a {@link HttpJsonChannel} and used by stubs to invoke their remote + * behavior. + * + *

{@link #start} must be called prior to calling any other methods, with the exception of {@link + * #cancel}. Whereas {@link #cancel} must not be followed by any other methods, but can be called + * more than once, while only the first one has effect. + * + *

Methods are potentially blocking but are designed to execute quickly. The implementations of + * this class are expected to be thread-safe. + * + *

There is a race between {@link #cancel} and the completion/failure of the RPC in other ways. + * If {@link #cancel} won the race, {@link HttpJsonClientCall.Listener#onClose Listener.onClose()} + * is called with {@code statusCode} corresponding to {@link + * com.google.api.gax.rpc.StatusCode.Code#CANCELLED CANCELLED}. Otherwise, {@link + * HttpJsonClientCall.Listener#onClose Listener.onClose()} is called with whatever status the RPC + * was finished. We ensure that at most one is called. + * + * @param type of message sent to the server + * @param type of message received one or more times from the server + */ +@BetaApi +public abstract class HttpJsonClientCall { + /** + * Callbacks for receiving metadata, response messages and completion status from the server. + * + *

Implementations are discouraged to block for extended periods of time. Implementations are + * not required to be thread-safe, but they must not be thread-hostile. The caller is free to call + * an instance from multiple threads, but only one call simultaneously. + */ + @BetaApi + public abstract static class Listener { + /** + * The response headers have been received. Headers always precede messages. + * + * @param responseHeaders containing metadata sent by the server at the start of the response + */ + public void onHeaders(HttpJsonMetadata responseHeaders) {} + + /** + * A response message has been received. May be called zero or more times depending on whether + * the call response is empty, a single message or a stream of messages. + * + * @param message returned by the server + */ + public void onMessage(T message) {} + + /** + * The ClientCall has been closed. Any additional calls to the {@code ClientCall} will not be + * processed by the server. No further receiving will occur and no further notifications will be + * made. + * + *

This method should not throw. If this method throws, there is no way to be notified of the + * exception. Implementations should therefore be careful of exceptions which can accidentally + * leak resources. + * + * @param statusCode the HTTP status code representing the result of the remote call + * @param trailers metadata provided at call completion + */ + public void onClose(int statusCode, HttpJsonMetadata trailers) {} + } + + /** + * Start a call, using {@code responseListener} for processing response messages. + * + *

It must be called prior to any other method on this class, except for {@link #cancel} which + * may be called at any time. + * + * @param responseListener receives response messages + * @param requestHeaders which can contain extra call metadata, e.g. authentication credentials. + */ + public abstract void start(Listener responseListener, HttpJsonMetadata requestHeaders); + + /** + * Requests up to the given number of messages from the call to be delivered to {@link + * HttpJsonClientCall.Listener#onMessage(Object)}. No additional messages will be delivered. + * + *

Message delivery is guaranteed to be sequential in the order received. In addition, the + * listener methods will not be accessed concurrently. While it is not guaranteed that the same + * thread will always be used, it is guaranteed that only a single thread will access the listener + * at a time. + * + *

If called multiple times, the number of messages able to delivered will be the sum of the + * calls. + * + *

This method is safe to call from multiple threads without external synchronization. + * + * @param numMessages the requested number of messages to be delivered to the listener. Must be + * non-negative. + */ + public abstract void request(int numMessages); + + /** + * Prevent any further processing for this {@code HttpJsonClientCall}. No further messages may be + * sent or will be received. The server is not informed of cancellations. Cancellation is + * permitted even if previously {@link #halfClose}d. Cancelling an already {@code cancel()}ed + * {@code ClientCall} has no effect. + * + *

No other methods on this class can be called after this method has been called. + * + * @param message if not {@code null}, will appear as the description of the CANCELLED status + * @param cause if not {@code null}, will appear as the cause of the CANCELLED status + */ + public abstract void cancel(@Nullable String message, @Nullable Throwable cause); + + /** + * Close the call for request message sending. Incoming response messages are unaffected. This + * should be called when no more messages will be sent from the client. + */ + public abstract void halfClose(); + + /** + * Send a request message to the server. May be called zero or more times but for unary and server + * streaming calls it must be called not more than once. + * + * @param message message to be sent to the server. + */ + public abstract void sendMessage(RequestT message); +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java new file mode 100644 index 000000000..42be4d28c --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java @@ -0,0 +1,503 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.client.http.HttpTransport; +import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType; +import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener; +import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayDeque; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +/** + * This class servers as main implementation of {@link HttpJsonClientCall} for rest transport and is + * expected to be used for every REST call. It currently supports unary and server-streaming + * workflows. The overall behavior and surface of the class mimics as close as possible behavior of + * the corresponding ClientCall implementation in gRPC transport. + * + *

This class is thread-safe. + * + * @param call request type + * @param call response type + */ +final class HttpJsonClientCallImpl + extends HttpJsonClientCall implements ResultListener { + // + // A lock to guard the state of this call (and the response stream). + // + private final Object lock = new Object(); + + // An active delivery loop marker. + @GuardedBy("lock") + private boolean inDelivery = false; + + // A queue to keep "scheduled" calls to HttpJsonClientCall.Listener in a form of tasks. + // It may seem like an overkill, but it exists to implement the following listeners contract: + // - onHeaders() must be called before any onMessage(); + // - onClose() must be the last call made, no onMessage() or onHeaders() are allowed after that; + // - while methods on the same listener may be called from different threads they must never be + // called simultaneously; + // - listeners should not be called under the internal lock of the client call to reduce risk of + // deadlocking and minimize time spent under lock; + // - a specialized notifications' dispatcher thread may be used in the future to send + // notifications (not the case right now). + @GuardedBy("lock") + private final Queue> pendingNotifications = new ArrayDeque<>(); + + // + // Immutable API method-specific data. + // + private final HttpJsonCallOptions callOptions; + private final String endpoint; + private final ApiMethodDescriptor methodDescriptor; + private final HttpTransport httpTransport; + private final Executor executor; + private final HttpJsonMetadata defaultHeaders; + + // + // Request-specific data (provided by client code) before we get a response. + // + @GuardedBy("lock") + private HttpJsonMetadata requestHeaders; + + @GuardedBy("lock") + private Listener listener; + + @GuardedBy("lock") + private int pendingNumMessages; + + // + // Response-specific data (received from server). + // + @GuardedBy("lock") + private HttpRequestRunnable requestRunnable; + + @GuardedBy("lock") + private RunnableResult runnableResult; + + @GuardedBy("lock") + private ProtoMessageJsonStreamIterator responseStreamIterator; + + @GuardedBy("lock") + private boolean closed; + + HttpJsonClientCallImpl( + ApiMethodDescriptor methodDescriptor, + String endpoint, + HttpJsonCallOptions callOptions, + HttpTransport httpTransport, + Executor executor, + HttpJsonMetadata defaultHeaders) { + this.methodDescriptor = methodDescriptor; + this.endpoint = endpoint; + this.callOptions = callOptions; + this.httpTransport = httpTransport; + this.executor = executor; + this.closed = false; + this.defaultHeaders = defaultHeaders; + } + + @Override + public void setResult(RunnableResult runnableResult) { + Preconditions.checkNotNull(runnableResult); + synchronized (lock) { + if (closed) { + return; + } + Preconditions.checkState(this.runnableResult == null, "The call result is already set"); + this.runnableResult = runnableResult; + if (runnableResult.getResponseHeaders() != null) { + pendingNotifications.offer( + new OnHeadersNotificationTask<>(listener, runnableResult.getResponseHeaders())); + } + } + + // trigger delivery loop if not already running + deliver(); + } + + @Override + public void start(Listener responseListener, HttpJsonMetadata requestHeaders) { + Preconditions.checkNotNull(responseListener); + Preconditions.checkNotNull(requestHeaders); + synchronized (lock) { + if (closed) { + return; + } + Preconditions.checkState(this.listener == null, "The call is already started"); + this.listener = responseListener; + Map mergedHeaders = + ImmutableMap.builder() + .putAll(defaultHeaders.getHeaders()) + .putAll(requestHeaders.getHeaders()) + .build(); + this.requestHeaders = requestHeaders.toBuilder().setHeaders(mergedHeaders).build(); + } + } + + @Override + public void request(int numMessages) { + if (numMessages < 0) { + throw new IllegalArgumentException("numMessages must be non-negative"); + } + synchronized (lock) { + if (closed) { + return; + } + pendingNumMessages += numMessages; + } + + // trigger delivery loop if not already running + deliver(); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + Throwable actualCause = cause; + if (actualCause == null) { + actualCause = new CancellationException(message); + } + + synchronized (lock) { + close(499, message, actualCause, true); + } + + // trigger delivery loop if not already running + deliver(); + } + + @Override + public void sendMessage(RequestT message) { + Preconditions.checkNotNull(message); + HttpRequestRunnable localRunnable; + synchronized (lock) { + if (closed) { + return; + } + Preconditions.checkState(listener != null, "The call hasn't been started"); + Preconditions.checkState( + requestRunnable == null, + "The message has already been sent. Bidirectional streaming calls are not supported"); + + requestRunnable = + new HttpRequestRunnable<>( + message, + methodDescriptor, + endpoint, + callOptions, + httpTransport, + requestHeaders, + this); + localRunnable = requestRunnable; + } + executor.execute(localRunnable); + } + + @Override + public void halfClose() { + // no-op for now, as halfClose makes sense only for bidirectional streams. + } + + private void deliver() { + // A flag stored in method stack space to detect when we enter a delivery loop (regardless if + // it is a concurrent thread or a recursive call execution of delivery() method within the same + // thread). + boolean newActiveDeliveryLoop = true; + boolean allMessagesConsumed = false; + while (true) { + // The try block around listener notification logic. We need to keep this + // block inside the loop to make sure that in case onMessage() call throws, we close the + // request properly and call onClose() method on listener once eventually (because the + // listener can be called only inside this loop). + try { + // Check if there is only one delivery loop active. Exit if a competing delivery loop + // detected (either in a concurrent thread or in a previous recursive call to this method in + // the same thread). The last-standing delivery loop will do all the job. Even if something + // in this loop throws, the code will first go through this block before exiting the loop to + // make sure that the activeDeliveryLoops counter stays correct. + // + // Note, we must enter the loop before doing the check. + synchronized (lock) { + if (inDelivery && newActiveDeliveryLoop) { + // EXIT delivery loop because another active delivery loop has been detected. + break; + } + newActiveDeliveryLoop = false; + inDelivery = true; + } + + if (Thread.interrupted()) { + // The catch block below will properly cancel the call. Note Thread.interrupted() clears + // the interruption flag on this thread, so we don't throw forever. + throw new InterruptedException("Message delivery has been interrupted"); + } + + // All listeners must be called under delivery loop (but outside the lock) to ensure that no + // two notifications come simultaneously from two different threads and that we do not go + // indefinitely deep in the stack if delivery logic is called recursively via listeners. + notifyListeners(); + + // The synchronized block around message reading and cancellation notification processing + // logic + synchronized (lock) { + if (allMessagesConsumed) { + // allMessagesProcessed was set to true on previous loop iteration. We do it this + // way to make sure that notifyListeners() is called in between consuming the last + // message in a stream and closing the call. + // This is to make sure that onMessage() for the last message in a stream is called + // before closing this call, because that last onMessage() listener execution may change + // how the call has to be closed (normally or cancelled). + + // Close the call normally. + // once close() is called we will never ever enter this again, because `close` flag + // will be set to true by the close() method. If the call is already closed, close() + // will have no effect. + allMessagesConsumed = false; + close( + runnableResult.getStatusCode(), + runnableResult.getTrailers().getStatusMessage(), + runnableResult.getTrailers().getException(), + false); + } + + // Attempt to terminate the delivery loop if: + // `runnableResult == null` => there is no response from the server yet; + // `pendingNumMessages <= 0` => we have already delivered as much as has been asked; + // `closed` => this call has been closed already; + if (runnableResult == null || pendingNumMessages <= 0 || closed) { + // The loop terminates only when there are no pending notifications left. The check + // happens under the lock, so no other thread may add a listener notification task in + // the middle of this logic. + if (pendingNotifications.isEmpty()) { + // EXIT delivery loop because there is no more work left to do. This is expected to be + // the only active delivery loop. + inDelivery = false; + break; + } else { + // We still have some stuff in notiticationTasksQueue so continue the loop, most + // likely we will finally terminate on the next cycle. + continue; + } + } + pendingNumMessages--; + allMessagesConsumed = consumeMessageFromStream(); + } + } catch (Throwable e) { + // Exceptions in message delivery result into cancellation of the call to stay consistent + // with other transport implementations. + HttpJsonStatusRuntimeException ex = + new HttpJsonStatusRuntimeException(499, "Exception in message delivery", e); + // If we are already closed the exception will be swallowed, which is the best thing we + // can do in such an unlikely situation (otherwise we would stay forever in the delivery + // loop). + synchronized (lock) { + // Close the call immediately marking it cancelled. If already closed close() will have no + // effect. + close(ex.getStatusCode(), ex.getMessage(), ex, true); + } + } + } + } + + private void notifyListeners() { + while (true) { + NotificationTask notification; + synchronized (lock) { + if (pendingNotifications.isEmpty()) { + return; + } + notification = pendingNotifications.poll(); + } + notification.call(); + } + } + + @GuardedBy("lock") + private boolean consumeMessageFromStream() throws IOException { + if (runnableResult.getTrailers().getException() != null + || runnableResult.getResponseContent() == null) { + // Server returned an error, no messages to process. This will result into closing a call with + // an error. + return true; + } + + boolean allMessagesConsumed; + Reader responseReader; + if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) { + // Lazily initialize responseStreamIterator in case if it is a server steraming response + if (responseStreamIterator == null) { + responseStreamIterator = + new ProtoMessageJsonStreamIterator( + new InputStreamReader(runnableResult.getResponseContent(), StandardCharsets.UTF_8)); + } + if (responseStreamIterator.hasNext()) { + responseReader = responseStreamIterator.next(); + } else { + return true; + } + // To make sure that the call will be closed immediately once we read the last + // message from the response (otherwise we would need to wait for another request(1) + // from the client to check if there is anything else left in the stream). + allMessagesConsumed = !responseStreamIterator.hasNext(); + } else { + responseReader = + new InputStreamReader(runnableResult.getResponseContent(), StandardCharsets.UTF_8); + // Unary calls have only one message in their response, so we should be ready to close + // immediately after delivering a single response message. + allMessagesConsumed = true; + } + + ResponseT message = + methodDescriptor.getResponseParser().parse(responseReader, callOptions.getTypeRegistry()); + pendingNotifications.offer(new OnMessageNotificationTask<>(listener, message)); + + return allMessagesConsumed; + } + + @GuardedBy("lock") + private void close( + int statusCode, String message, Throwable cause, boolean terminateImmediatelly) { + try { + if (closed) { + return; + } + closed = true; + // Best effort task cancellation (to not be confused with task's thread interruption). + // If the task is in blocking I/O waiting for the server response, it will keep waiting for + // the response from the server, but once response is received the task will exit silently. + // If the task has already completed, this call has no effect. + if (requestRunnable != null) { + requestRunnable.cancel(); + requestRunnable = null; + } + + HttpJsonMetadata.Builder meatadaBuilder = HttpJsonMetadata.newBuilder(); + if (runnableResult != null && runnableResult.getTrailers() != null) { + meatadaBuilder = runnableResult.getTrailers().toBuilder(); + } + meatadaBuilder.setException(cause); + meatadaBuilder.setStatusMessage(message); + if (responseStreamIterator != null) { + responseStreamIterator.close(); + } + if (runnableResult != null && runnableResult.getResponseContent() != null) { + runnableResult.getResponseContent().close(); + } + + // onClose() suppresses all other pending notifications. + // there should be no place in the code which inserts something in this queue before checking + // the `closed` flag under the lock and refusing to insert anything if `closed == true`. + if (terminateImmediatelly) { + // This usually means we are cancelling the call before processing the response in full. + // It may happen if a user explicitly cancels the call or in response to an unexpected + // exception either from server or a call listener execution. + pendingNotifications.clear(); + } + + pendingNotifications.offer( + new OnCloseNotificationTask<>(listener, statusCode, meatadaBuilder.build())); + + } catch (Throwable e) { + // suppress stream closing exceptions in favor of the actual call closing cause. This method + // should not throw, otherwise we may stuck in an infinite loop of exception processing. + } + } + + // + // Listener notification tasks. Each class simply calls only one specific method on the Listener + // interface, and to do so it also stores tha parameters needed to make the all. + // + private abstract static class NotificationTask { + private final Listener listener; + + NotificationTask(Listener listener) { + this.listener = listener; + } + + protected Listener getListener() { + return listener; + } + + abstract void call(); + } + + private static class OnHeadersNotificationTask extends NotificationTask { + private final HttpJsonMetadata responseHeaders; + + OnHeadersNotificationTask(Listener listener, HttpJsonMetadata responseHeaders) { + super(listener); + this.responseHeaders = responseHeaders; + } + + public void call() { + getListener().onHeaders(responseHeaders); + } + } + + private static class OnMessageNotificationTask extends NotificationTask { + private final ResponseT message; + + OnMessageNotificationTask(Listener listener, ResponseT message) { + super(listener); + this.message = message; + } + + public void call() { + getListener().onMessage(message); + } + } + + private static class OnCloseNotificationTask extends NotificationTask { + private final int statusCode; + private final HttpJsonMetadata trailers; + + OnCloseNotificationTask( + Listener listener, int statusCode, HttpJsonMetadata trailers) { + super(listener); + this.statusCode = statusCode; + this.trailers = trailers; + } + + public void call() { + getListener().onClose(statusCode, trailers); + } + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java new file mode 100644 index 000000000..ad8320eba --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java @@ -0,0 +1,140 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.ApiFuture; +import com.google.api.gax.rpc.ApiCallContext; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nonnull; +import org.threeten.bp.Instant; + +/** + * {@code HttpJsonClientCalls} creates a new {@code HttpJsonClientCAll} from the given call context. + * + *

Package-private for internal use. + */ +class HttpJsonClientCalls { + private static final Logger LOGGER = Logger.getLogger(HttpJsonClientCalls.class.getName()); + + public static HttpJsonClientCall newCall( + ApiMethodDescriptor methodDescriptor, ApiCallContext context) { + + HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault().nullToSelf(context); + + // Try to convert the timeout into a deadline and use it if it occurs before the actual deadline + if (httpJsonContext.getTimeout() != null) { + @Nonnull Instant newDeadline = Instant.now().plus(httpJsonContext.getTimeout()); + HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions(); + if (callOptions.getDeadline() == null || newDeadline.isBefore(callOptions.getDeadline())) { + callOptions = callOptions.toBuilder().setDeadline(newDeadline).build(); + httpJsonContext = httpJsonContext.withCallOptions(callOptions); + } + } + + // TODO: add headers interceptor logic + return httpJsonContext.getChannel().newCall(methodDescriptor, httpJsonContext.getCallOptions()); + } + + static ApiFuture eagerFutureUnaryCall( + HttpJsonClientCall clientCall, RequestT request) { + // Start the call + HttpJsonFuture future = new HttpJsonFuture<>(clientCall); + clientCall.start(new FutureListener<>(future), HttpJsonMetadata.newBuilder().build()); + + // Send the request + try { + clientCall.sendMessage(request); + clientCall.halfClose(); + // Request an extra message to detect misconfigured servers + clientCall.request(2); + } catch (Throwable sendError) { + // Cancel if anything goes wrong + try { + clientCall.cancel(null, sendError); + } catch (Throwable cancelError) { + LOGGER.log(Level.SEVERE, "Error encountered while closing it", sendError); + } + + throw sendError; + } + + return future; + } + + private static class HttpJsonFuture extends AbstractApiFuture { + private final HttpJsonClientCall call; + + private HttpJsonFuture(HttpJsonClientCall call) { + this.call = call; + } + + @Override + protected void interruptTask() { + call.cancel("HttpJsonFuture was cancelled", null); + } + + @Override + public boolean set(T value) { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) { + return super.setException(throwable); + } + } + + private static class FutureListener extends HttpJsonClientCall.Listener { + private final HttpJsonFuture future; + + private FutureListener(HttpJsonFuture future) { + this.future = future; + } + + @Override + public void onMessage(T message) { + if (!future.set(message)) { + throw new IllegalStateException("More than one value received for unary call"); + } + } + + @Override + public void onClose(int statusCode, HttpJsonMetadata trailers) { + if (!future.isDone()) { + future.setException(trailers.getException()); + } else if (statusCode < 200 || statusCode >= 400) { + LOGGER.log( + Level.WARNING, "Received error for unary call after receiving a successful response"); + } + } + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectCallable.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectCallable.java index 55278c22f..dd0826dd6 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectCallable.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectCallable.java @@ -34,9 +34,6 @@ import com.google.api.gax.rpc.UnaryCallable; import com.google.common.base.Preconditions; import com.google.protobuf.TypeRegistry; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.threeten.bp.Instant; /** * {@code HttpJsonDirectCallable} creates HTTP calls. @@ -62,23 +59,13 @@ public ApiFuture futureCall(RequestT request, ApiCallContext inputCon Preconditions.checkNotNull(request); HttpJsonCallContext context = HttpJsonCallContext.createDefault().nullToSelf(inputContext); - @Nullable Instant deadline = context.getDeadline(); - // Try to convert the timeout into a deadline and use it if it occurs before the actual deadline - if (context.getTimeout() != null) { - @Nonnull Instant newDeadline = Instant.now().plus(context.getTimeout()); + context = + context.withCallOptions( + context.getCallOptions().toBuilder().setTypeRegistry(typeRegistry).build()); - if (deadline == null || newDeadline.isBefore(deadline)) { - deadline = newDeadline; - } - } - - HttpJsonCallOptions callOptions = - HttpJsonCallOptions.newBuilder() - .setDeadline(deadline) - .setCredentials(context.getCredentials()) - .setTypeRegistry(typeRegistry) - .build(); - return context.getChannel().issueFutureUnaryCall(callOptions, request, descriptor); + HttpJsonClientCall clientCall = + HttpJsonClientCalls.newCall(descriptor, context); + return HttpJsonClientCalls.eagerFutureUnaryCall(clientCall, request); } @Override diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallable.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallable.java new file mode 100644 index 000000000..ed3bebde3 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallable.java @@ -0,0 +1,69 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.common.base.Preconditions; + +/** + * {@code HttpJsonDirectServerStreamingCallable} creates server-streaming REST calls. + * + *

In a chain of {@link ServerStreamingCallable}s this is the innermost callable. It wraps a + * {@link HttpJsonClientCall} in a {@link StreamController} and the downstream {@link + * ResponseObserver} in a {@link HttpJsonClientCall.Listener}. This class is implemented to look and + * behave as similarly as possible to gRPC variant of it. + * + *

Package-private for internal use. + */ +class HttpJsonDirectServerStreamingCallable + extends ServerStreamingCallable { + + private final ApiMethodDescriptor descriptor; + + HttpJsonDirectServerStreamingCallable(ApiMethodDescriptor descriptor) { + this.descriptor = descriptor; + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(responseObserver); + + HttpJsonClientCall call = HttpJsonClientCalls.newCall(descriptor, context); + HttpJsonDirectStreamController controller = + new HttpJsonDirectStreamController<>(call, responseObserver); + controller.start(request); + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectStreamController.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectStreamController.java new file mode 100644 index 000000000..5f56390f0 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonDirectStreamController.java @@ -0,0 +1,126 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StreamController; +import com.google.common.base.Preconditions; +import java.util.concurrent.CancellationException; + +/** + * Wraps a HttpJsonClientCall in a {@link StreamController}. It feeds events to a {@link + * ResponseObserver} and allows for back pressure. + * + *

Package-private for internal use. + */ +class HttpJsonDirectStreamController implements StreamController { + private final HttpJsonClientCall clientCall; + private final ResponseObserver responseObserver; + private volatile boolean hasStarted; + private volatile boolean autoflowControl = true; + private volatile int numRequested; + private volatile CancellationException cancellationException; + + HttpJsonDirectStreamController( + HttpJsonClientCall clientCall, + ResponseObserver responseObserver) { + this.clientCall = clientCall; + this.responseObserver = responseObserver; + } + + @Override + public void cancel() { + cancellationException = new CancellationException("User cancelled stream"); + clientCall.cancel(null, cancellationException); + } + + @Override + public void disableAutoInboundFlowControl() { + Preconditions.checkState( + !hasStarted, "Can't disable automatic flow control after the stream has started."); + autoflowControl = false; + } + + @Override + public void request(int count) { + Preconditions.checkState(!autoflowControl, "Autoflow control is enabled."); + + // Buffer the requested count in case the consumer requested responses in the onStart() + if (!hasStarted) { + numRequested += count; + } else { + clientCall.request(count); + } + } + + void start(RequestT request) { + responseObserver.onStart(this); + this.hasStarted = true; + clientCall.start(new ResponseObserverAdapter(), HttpJsonMetadata.newBuilder().build()); + + if (autoflowControl) { + clientCall.request(1); + } else if (numRequested > 0) { + clientCall.request(numRequested); + } + + clientCall.sendMessage(request); + } + + private class ResponseObserverAdapter extends HttpJsonClientCall.Listener { + /** + * Notifies the outerObserver of the new message and if automatic flow control is enabled, + * requests the next message. Any errors raised by the outerObserver will be bubbled up to GRPC, + * which cancel the ClientCall and close this listener. + * + * @param message The new message. + */ + @Override + public void onMessage(ResponseT message) { + responseObserver.onResponse(message); + + if (autoflowControl) { + clientCall.request(1); + } + } + + @Override + public void onClose(int statusCode, HttpJsonMetadata trailers) { + if (statusCode >= 200 && statusCode < 300) { + responseObserver.onComplete(); + } else if (cancellationException != null) { + // Intercept cancellations and replace with the top level cause + responseObserver.onError(cancellationException); + } else { + responseObserver.onError(trailers.getException()); + } + } + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionCallable.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionCallable.java index 14be14332..ede9ce343 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionCallable.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionCallable.java @@ -31,18 +31,15 @@ import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import com.google.api.client.http.HttpResponseException; import com.google.api.core.AbstractApiFuture; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiException; -import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import java.util.Set; import java.util.concurrent.CancellationException; @@ -53,12 +50,12 @@ */ class HttpJsonExceptionCallable extends UnaryCallable { private final UnaryCallable callable; - private final ImmutableSet retryableCodes; + private final HttpJsonApiExceptionFactory exceptionFactory; HttpJsonExceptionCallable( UnaryCallable callable, Set retryableCodes) { this.callable = Preconditions.checkNotNull(callable); - this.retryableCodes = ImmutableSet.copyOf(retryableCodes); + this.exceptionFactory = new HttpJsonApiExceptionFactory(retryableCodes); } @Override @@ -73,7 +70,7 @@ public ApiFuture futureCall(RequestT request, ApiCallContext inputCon private class ExceptionTransformingFuture extends AbstractApiFuture implements ApiFutureCallback { - private ApiFuture innerCallFuture; + private final ApiFuture innerCallFuture; private volatile boolean cancelled = false; public ExceptionTransformingFuture(ApiFuture innerCallFuture) { @@ -93,27 +90,11 @@ public void onSuccess(ResponseT r) { @Override public void onFailure(Throwable throwable) { - if (throwable instanceof HttpResponseException) { - HttpResponseException e = (HttpResponseException) throwable; - StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode()); - boolean canRetry = retryableCodes.contains(statusCode.getCode()); - String message = e.getStatusMessage(); - ApiException newException = - message == null - ? ApiExceptionFactory.createException(throwable, statusCode, canRetry) - : ApiExceptionFactory.createException(message, throwable, statusCode, canRetry); - super.setException(newException); - } else if (throwable instanceof CancellationException && cancelled) { + if (throwable instanceof CancellationException && cancelled) { // this just circled around, so ignore. return; - } else if (throwable instanceof ApiException) { - super.setException(throwable); - } else { - // Do not retry on unknown throwable, even when UNKNOWN is in retryableCodes - setException( - ApiExceptionFactory.createException( - throwable, HttpJsonStatusCode.of(StatusCode.Code.UNKNOWN), false)); } + setException(exceptionFactory.create(throwable)); } } } diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionResponseObserver.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionResponseObserver.java new file mode 100644 index 000000000..0264a33f0 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionResponseObserver.java @@ -0,0 +1,91 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StateCheckingResponseObserver; +import com.google.api.gax.rpc.StreamController; +import java.util.concurrent.CancellationException; + +/** Package-private for internal use. */ +class HttpJsonExceptionResponseObserver + extends StateCheckingResponseObserver { + private final ResponseObserver innerObserver; + private volatile CancellationException cancellationException; + private final HttpJsonApiExceptionFactory exceptionFactory; + + public HttpJsonExceptionResponseObserver( + ResponseObserver innerObserver, HttpJsonApiExceptionFactory exceptionFactory) { + this.innerObserver = innerObserver; + this.exceptionFactory = exceptionFactory; + } + + @Override + protected void onStartImpl(final StreamController controller) { + innerObserver.onStart( + new StreamController() { + @Override + public void cancel() { + cancellationException = new CancellationException("User cancelled stream"); + controller.cancel(); + } + + @Override + public void disableAutoInboundFlowControl() { + controller.disableAutoInboundFlowControl(); + } + + @Override + public void request(int count) { + controller.request(count); + } + }); + } + + @Override + protected void onResponseImpl(ResponseT response) { + innerObserver.onResponse(response); + } + + @Override + protected void onErrorImpl(Throwable t) { + if (cancellationException != null) { + t = cancellationException; + } else { + t = exceptionFactory.create(t); + } + innerObserver.onError(t); + } + + @Override + protected void onCompleteImpl() { + innerObserver.onComplete(); + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionServerStreamingCallable.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionServerStreamingCallable.java new file mode 100644 index 000000000..7c82b2fb1 --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonExceptionServerStreamingCallable.java @@ -0,0 +1,65 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.httpjson; + +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StatusCode.Code; +import java.util.Set; + +/** + * Transforms all {@code Throwable}s thrown during a rest call into an instance of {@link + * ApiException}. + * + *

Package-private for internal use. + */ +class HttpJsonExceptionServerStreamingCallable + extends ServerStreamingCallable { + private final ServerStreamingCallable inner; + private final HttpJsonApiExceptionFactory exceptionFactory; + + public HttpJsonExceptionServerStreamingCallable( + ServerStreamingCallable inner, Set retryableCodes) { + this.inner = inner; + this.exceptionFactory = new HttpJsonApiExceptionFactory(retryableCodes); + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + inner.call( + request, + new HttpJsonExceptionResponseObserver<>(responseObserver, exceptionFactory), + context); + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonMetadata.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonMetadata.java new file mode 100644 index 000000000..9fef6db5c --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonMetadata.java @@ -0,0 +1,67 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.core.BetaApi; +import com.google.api.core.InternalExtensionOnly; +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.Map; +import javax.annotation.Nullable; + +@AutoValue +@BetaApi +@InternalExtensionOnly +public abstract class HttpJsonMetadata { + public abstract Map getHeaders(); + + @Nullable + public abstract String getStatusMessage(); + + @Nullable + public abstract Throwable getException(); + + public abstract Builder toBuilder(); + + public static HttpJsonMetadata.Builder newBuilder() { + return new AutoValue_HttpJsonMetadata.Builder().setHeaders(Collections.emptyMap()); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setHeaders(Map headers); + + public abstract Builder setStatusMessage(String value); + + public abstract Builder setException(Throwable value); + + abstract HttpJsonMetadata build(); + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStatusRuntimeException.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStatusRuntimeException.java new file mode 100644 index 000000000..a1b9b1c1b --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStatusRuntimeException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.httpjson; + +/** + * HTTP status code in RuntimeException form, for propagating status code information via + * exceptions. + */ +public class HttpJsonStatusRuntimeException extends RuntimeException { + private static final long serialVersionUID = -5390915748330242256L; + + private final int statusCode; + + public HttpJsonStatusRuntimeException(int statusCode, String message, Throwable cause) { + super(message, cause); + this.statusCode = statusCode; + } + + public int getStatusCode() { + return statusCode; + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStubCallableFactory.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStubCallableFactory.java index 89992d9f8..59b0ceaed 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStubCallableFactory.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonStubCallableFactory.java @@ -36,6 +36,9 @@ import com.google.api.gax.rpc.OperationCallSettings; import com.google.api.gax.rpc.OperationCallable; import com.google.api.gax.rpc.PagedCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamingCallSettings; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.api.gax.rpc.UnaryCallable; @@ -48,9 +51,9 @@ public interface HttpJsonStubCallableFactory< * code. * * @param httpJsonCallSettings the http/json call settings - * @param callSettings {@link UnaryCallSettings} to configure the method-level settings with. - * @param clientContext {@link ClientContext} to use to connect to the service. - * @return {@link UnaryCallable} callable object. + * @param callSettings {@link UnaryCallSettings} to configure the method-level settings with + * @param clientContext {@link ClientContext} to use to connect to the service + * @return {@link UnaryCallable} callable object */ UnaryCallable createUnaryCallable( HttpJsonCallSettings httpJsonCallSettings, @@ -62,9 +65,9 @@ UnaryCallable createUnaryCallable( * generated code. * * @param httpJsonCallSettings the http/json call settings - * @param pagedCallSettings {@link PagedCallSettings} to configure the paged settings with. - * @param clientContext {@link ClientContext} to use to connect to the service. - * @return {@link UnaryCallable} callable object. + * @param pagedCallSettings {@link PagedCallSettings} to configure the paged settings with + * @param clientContext {@link ClientContext} to use to connect to the service + * @return {@link UnaryCallable} callable object */ UnaryCallable createPagedCallable( @@ -78,19 +81,46 @@ UnaryCallable createPagedCallable( * * @param httpJsonCallSettings the http/json call settings * @param batchingCallSettings {@link BatchingCallSettings} to configure the batching related - * settings with. - * @param clientContext {@link ClientContext} to use to connect to the service. - * @return {@link UnaryCallable} callable object. + * settings with + * @param clientContext {@link ClientContext} to use to connect to the service + * @return {@link UnaryCallable} callable object */ UnaryCallable createBatchingCallable( HttpJsonCallSettings httpJsonCallSettings, BatchingCallSettings batchingCallSettings, ClientContext clientContext); + /** + * Creates a callable object that represents a long-running operation. Designed for use by + * generated code. + * + * @param httpJsonCallSettings the http/json call settings + * @param operationCallSettings {@link OperationCallSettings} to configure the method-level + * settings with + * @param clientContext {@link ClientContext} to use to connect to the service + * @param operationsStub opertation stub to use to poll for updates on the Operation + * @return {@link OperationCallable} callable object + */ OperationCallable createOperationCallable( HttpJsonCallSettings httpJsonCallSettings, OperationCallSettings operationCallSettings, ClientContext clientContext, OperationsStub operationsStub); + + /** + * Create a server-streaming callable with. Designed for use by generated code. + * + * @param httpJsonCallSettings the gRPC call settings + * @param callSettings {@link StreamingCallSettings} to configure the method-level settings with. + * @param clientContext {@link ClientContext} to use to connect to the service. + */ + default + ServerStreamingCallable createServerStreamingCallable( + HttpJsonCallSettings httpJsonCallSettings, + ServerStreamingCallSettings callSettings, + ClientContext clientContext) { + return HttpJsonCallableFactory.createServerStreamingCallable( + httpJsonCallSettings, callSettings, clientContext); + } } diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonTransportChannel.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonTransportChannel.java index e992f85a8..337f7b5a0 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonTransportChannel.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonTransportChannel.java @@ -98,6 +98,10 @@ public static Builder newBuilder() { return new AutoValue_HttpJsonTransportChannel.Builder(); } + public static HttpJsonTransportChannel create(ManagedHttpJsonChannel channel) { + return newBuilder().setManagedChannel(channel).build(); + } + @AutoValue.Builder public abstract static class Builder { public abstract Builder setManagedChannel(ManagedHttpJsonChannel value); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java index f3a7ffd6c..621335f89 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java @@ -37,66 +37,139 @@ import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpRequestFactory; import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.json.JsonHttpContent; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.JsonObjectParser; +import com.google.api.client.json.gson.GsonFactory; import com.google.api.client.util.GenericData; -import com.google.api.core.SettableApiFuture; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auto.value.AutoValue; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.LinkedList; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import javax.annotation.Nullable; import org.threeten.bp.Duration; import org.threeten.bp.Instant; /** A runnable object that creates and executes an HTTP request. */ -@AutoValue -abstract class HttpRequestRunnable implements Runnable { - abstract HttpJsonCallOptions getHttpJsonCallOptions(); - - abstract RequestT getRequest(); - - abstract ApiMethodDescriptor getApiMethodDescriptor(); - - abstract HttpTransport getHttpTransport(); +class HttpRequestRunnable implements Runnable { + private final RequestT request; + private final ApiMethodDescriptor methodDescriptor; + private final String endpoint; + private final HttpJsonCallOptions httpJsonCallOptions; + private final HttpTransport httpTransport; + private final HttpJsonMetadata headers; + private final ResultListener resultListener; + + private volatile boolean cancelled = false; + + HttpRequestRunnable( + RequestT request, + ApiMethodDescriptor methodDescriptor, + String endpoint, + HttpJsonCallOptions httpJsonCallOptions, + HttpTransport httpTransport, + HttpJsonMetadata headers, + ResultListener resultListener) { + this.request = request; + this.methodDescriptor = methodDescriptor; + this.endpoint = endpoint; + this.httpJsonCallOptions = httpJsonCallOptions; + this.httpTransport = httpTransport; + this.headers = headers; + this.resultListener = resultListener; + } - abstract String getEndpoint(); + // Best effort cancellation without guarantees. + // It will check if the task cancelled before each three sequential potentially time-consuming + // operations: + // - request construction; + // - request execution (the most time consuming, taking); + // - response construction. + void cancel() { + cancelled = true; + } - abstract JsonFactory getJsonFactory(); + @Override + public void run() { + HttpResponse httpResponse = null; + RunnableResult.Builder result = RunnableResult.builder(); + HttpJsonMetadata.Builder trailers = HttpJsonMetadata.newBuilder(); + HttpRequest httpRequest = null; + try { + // Check if already cancelled before even creating a request + if (cancelled) { + return; + } + httpRequest = createHttpRequest(); + // Check if already cancelled before sending the request; + if (cancelled) { + return; + } - abstract ImmutableList getHeaderEnhancers(); + httpResponse = httpRequest.execute(); - abstract SettableApiFuture getResponseFuture(); + // Check if already cancelled before sending the request; + if (cancelled) { + httpResponse.disconnect(); + return; + } + result.setResponseHeaders( + HttpJsonMetadata.newBuilder().setHeaders(httpResponse.getHeaders()).build()); + result.setStatusCode(httpResponse.getStatusCode()); + result.setResponseContent(httpResponse.getContent()); + trailers.setStatusMessage(httpResponse.getStatusMessage()); + } catch (HttpResponseException e) { + result.setStatusCode(e.getStatusCode()); + result.setResponseHeaders(HttpJsonMetadata.newBuilder().setHeaders(e.getHeaders()).build()); + result.setResponseContent( + new ByteArrayInputStream(e.getContent().getBytes(StandardCharsets.UTF_8))); + trailers.setStatusMessage(e.getStatusMessage()); + trailers.setException(e); + } catch (Throwable e) { + if (httpResponse != null) { + trailers.setStatusMessage(httpResponse.getStatusMessage()); + result.setStatusCode(httpResponse.getStatusCode()); + } else { + result.setStatusCode(400); + } + trailers.setException(e); + } finally { + if (!cancelled) { + resultListener.setResult(result.setTrailers(trailers.build()).build()); + } + } + } HttpRequest createHttpRequest() throws IOException { GenericData tokenRequest = new GenericData(); - HttpRequestFormatter requestFormatter = - getApiMethodDescriptor().getRequestFormatter(); + HttpRequestFormatter requestFormatter = methodDescriptor.getRequestFormatter(); HttpRequestFactory requestFactory; - Credentials credentials = getHttpJsonCallOptions().getCredentials(); + Credentials credentials = httpJsonCallOptions.getCredentials(); if (credentials != null) { - requestFactory = - getHttpTransport().createRequestFactory(new HttpCredentialsAdapter(credentials)); + requestFactory = httpTransport.createRequestFactory(new HttpCredentialsAdapter(credentials)); } else { - requestFactory = getHttpTransport().createRequestFactory(); + requestFactory = httpTransport.createRequestFactory(); } + JsonFactory jsonFactory = GsonFactory.getDefaultInstance(); // Create HTTP request body. - String requestBody = requestFormatter.getRequestBody(getRequest()); + String requestBody = requestFormatter.getRequestBody(request); HttpContent jsonHttpContent; if (!Strings.isNullOrEmpty(requestBody)) { - getJsonFactory().createJsonParser(requestBody).parse(tokenRequest); + jsonFactory.createJsonParser(requestBody).parse(tokenRequest); jsonHttpContent = - new JsonHttpContent(getJsonFactory(), tokenRequest) + new JsonHttpContent(jsonFactory, tokenRequest) .setMediaType((new HttpMediaType("application/json"))); } else { // Force underlying HTTP lib to set Content-Length header to avoid 411s. @@ -105,9 +178,9 @@ HttpRequest createHttpRequest() throws IOException { } // Populate URL path and query parameters. - String endpoint = normalizeEndpoint(getEndpoint()); - GenericUrl url = new GenericUrl(endpoint + requestFormatter.getPath(getRequest())); - Map> queryParams = requestFormatter.getQueryParamNames(getRequest()); + String normalizedEndpoint = normalizeEndpoint(endpoint); + GenericUrl url = new GenericUrl(normalizedEndpoint + requestFormatter.getPath(request)); + Map> queryParams = requestFormatter.getQueryParamNames(request); for (Entry> queryParam : queryParams.entrySet()) { if (queryParam.getValue() != null) { url.set(queryParam.getKey(), queryParam.getValue()); @@ -116,7 +189,7 @@ HttpRequest createHttpRequest() throws IOException { HttpRequest httpRequest = buildRequest(requestFactory, url, jsonHttpContent); - Instant deadline = getHttpJsonCallOptions().getDeadline(); + Instant deadline = httpJsonCallOptions.getDeadline(); if (deadline != null) { long readTimeout = Duration.between(Instant.now(), deadline).toMillis(); if (httpRequest.getReadTimeout() > 0 @@ -126,10 +199,13 @@ HttpRequest createHttpRequest() throws IOException { } } - for (HttpJsonHeaderEnhancer enhancer : getHeaderEnhancers()) { - enhancer.enhance(httpRequest.getHeaders()); + for (Map.Entry entry : headers.getHeaders().entrySet()) { + HttpHeadersUtils.setHeader( + httpRequest.getHeaders(), entry.getKey(), (String) entry.getValue()); } - httpRequest.setParser(new JsonObjectParser(getJsonFactory())); + + httpRequest.setParser(new JsonObjectParser(jsonFactory)); + return httpRequest; } @@ -155,7 +231,7 @@ private HttpRequest buildRequest( // gax-httpjson is), writing own implementation of HttpUrlConnection (fragile and a lot of // work), depending on v2.ApacheHttpTransport (it has many extra dependencies, does not support // mtls etc). - String actualHttpMethod = getApiMethodDescriptor().getHttpMethod(); + String actualHttpMethod = methodDescriptor.getHttpMethod(); String originalHttpMethod = actualHttpMethod; if (HttpMethods.PATCH.equals(actualHttpMethod)) { actualHttpMethod = HttpMethods.POST; @@ -169,8 +245,8 @@ private HttpRequest buildRequest( } // This will be frequently executed, so avoiding using regexps if not necessary. - private String normalizeEndpoint(String endpoint) { - String normalized = endpoint; + private String normalizeEndpoint(String rawEndpoint) { + String normalized = rawEndpoint; // Set protocol as https by default if not set explicitly if (!normalized.contains("://")) { normalized = "https://" + normalized; @@ -183,53 +259,39 @@ private String normalizeEndpoint(String endpoint) { return normalized; } - @Override - public void run() { - try { - HttpRequest httpRequest = createHttpRequest(); - HttpResponse httpResponse = httpRequest.execute(); - - if (getApiMethodDescriptor().getResponseParser() != null) { - ResponseT response = - getApiMethodDescriptor() - .getResponseParser() - .parse(httpResponse.getContent(), getHttpJsonCallOptions().getTypeRegistry()); - - getResponseFuture().set(response); - } else { - getResponseFuture().set(null); - } - } catch (Exception e) { - getResponseFuture().setException(e); - } + @FunctionalInterface + interface ResultListener { + void setResult(RunnableResult result); } - static Builder newBuilder() { - return new AutoValue_HttpRequestRunnable.Builder() - .setHeaderEnhancers(new LinkedList<>()); - } + @AutoValue + abstract static class RunnableResult { + @Nullable + abstract HttpJsonMetadata getResponseHeaders(); - @AutoValue.Builder - abstract static class Builder { - abstract Builder setHttpJsonCallOptions(HttpJsonCallOptions callOptions); + abstract int getStatusCode(); - abstract Builder setRequest(RequestT request); + @Nullable + abstract InputStream getResponseContent(); - abstract Builder setApiMethodDescriptor( - ApiMethodDescriptor methodDescriptor); + abstract HttpJsonMetadata getTrailers(); - abstract Builder setHttpTransport(HttpTransport httpTransport); + public static Builder builder() { + return new AutoValue_HttpRequestRunnable_RunnableResult.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { - abstract Builder setEndpoint(String endpoint); + public abstract Builder setResponseHeaders(HttpJsonMetadata newResponseHeaders); - abstract Builder setJsonFactory(JsonFactory jsonFactory); + public abstract Builder setStatusCode(int newStatusCode); - abstract Builder setHeaderEnhancers( - List headerEnhancers); + public abstract Builder setResponseContent(InputStream newResponseContent); - abstract Builder setResponseFuture( - SettableApiFuture responseFuture); + public abstract Builder setTrailers(HttpJsonMetadata newTrailers); - abstract HttpRequestRunnable build(); + public abstract RunnableResult build(); + } } } diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpResponseParser.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpResponseParser.java index 78aacf2dd..b2c3d5fc8 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpResponseParser.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpResponseParser.java @@ -33,6 +33,7 @@ import com.google.api.core.InternalExtensionOnly; import com.google.protobuf.TypeRegistry; import java.io.InputStream; +import java.io.Reader; /** Interface for classes that parse parts of HTTP responses into the parameterized message type. */ @InternalExtensionOnly @@ -50,13 +51,23 @@ public interface HttpResponseParser { /** * Parse the http body content JSON stream into the MessageFormatT. * - * @param httpContent the body of an HTTP response + * @param httpContent the body of an HTTP response, represented as an {@link InputStream} * @param registry type registry with Any fields descriptors * @throws RestSerializationException if failed to parse the {@code httpContent} to a valid {@code * MessageFormatT} */ MessageFormatT parse(InputStream httpContent, TypeRegistry registry); + /** + * Parse the http body content JSON reader into the MessageFormatT. + * + * @param httpContent the body of an HTTP response, represented as a {@link Reader} + * @param registry type registry with Any fields descriptors + * @throws RestSerializationException if failed to parse the {@code httpContent} to a valid {@code + * MessageFormatT} + */ + MessageFormatT parse(Reader httpContent, TypeRegistry registry); + /** * Serialize an object into an HTTP body, which is written out to output. * diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java index 2e4ff935b..ca92d0fbe 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/InstantiatingHttpJsonChannelProvider.java @@ -41,11 +41,10 @@ import com.google.api.gax.rpc.mtls.MtlsProvider; import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import java.io.IOException; import java.security.GeneralSecurityException; import java.security.KeyStore; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -187,12 +186,7 @@ HttpTransport createHttpTransport() throws IOException, GeneralSecurityException } private TransportChannel createChannel() throws IOException, GeneralSecurityException { - Map headers = headerProvider.getHeaders(); - - List headerEnhancers = Lists.newArrayList(); - for (Map.Entry header : headers.entrySet()) { - headerEnhancers.add(HttpJsonHeaderEnhancers.create(header.getKey(), header.getValue())); - } + Map headers = new HashMap<>(headerProvider.getHeaders()); HttpTransport httpTransportToUse = httpTransport; if (httpTransportToUse == null) { @@ -202,7 +196,7 @@ private TransportChannel createChannel() throws IOException, GeneralSecurityExce ManagedHttpJsonChannel channel = ManagedHttpJsonChannel.newBuilder() .setEndpoint(endpoint) - .setHeaderEnhancers(headerEnhancers) + .setDefaultHeaders(HttpJsonMetadata.newBuilder().setHeaders(headers).build()) .setExecutor(executor) .setHttpTransport(httpTransportToUse) .build(); diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 760f3f0b1..d75152230 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -31,18 +31,12 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.gson.GsonFactory; import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; -import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import java.io.IOException; -import java.util.LinkedList; -import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -51,14 +45,12 @@ /** Implementation of HttpJsonChannel which can issue http-json calls. */ @BetaApi public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResource { - private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); private static final ExecutorService DEFAULT_EXECUTOR = InstantiatingExecutorProvider.newBuilder().build().getExecutor(); private final Executor executor; private final String endpoint; - private final JsonFactory jsonFactory; - private final ImmutableList headerEnhancers; + private final HttpJsonMetadata defaultHeaders; private final HttpTransport httpTransport; private boolean isTransportShutdown; @@ -66,38 +58,31 @@ public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResour private ManagedHttpJsonChannel( Executor executor, String endpoint, - JsonFactory jsonFactory, - List headerEnhancers, - @Nullable HttpTransport httpTransport) { + @Nullable HttpTransport httpTransport, + HttpJsonMetadata defaultHeaders) { this.executor = executor; this.endpoint = endpoint; - this.jsonFactory = jsonFactory; - this.headerEnhancers = ImmutableList.copyOf(headerEnhancers); this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport; + this.defaultHeaders = defaultHeaders; } @Override + public HttpJsonClientCall newCall( + ApiMethodDescriptor methodDescriptor, HttpJsonCallOptions callOptions) { + + return new HttpJsonClientCallImpl<>( + methodDescriptor, endpoint, callOptions, httpTransport, executor, defaultHeaders); + } + + @Override + @Deprecated public ApiFuture issueFutureUnaryCall( HttpJsonCallOptions callOptions, RequestT request, ApiMethodDescriptor methodDescriptor) { - final SettableApiFuture responseFuture = SettableApiFuture.create(); - - HttpRequestRunnable runnable = - HttpRequestRunnable.newBuilder() - .setResponseFuture(responseFuture) - .setApiMethodDescriptor(methodDescriptor) - .setHeaderEnhancers(headerEnhancers) - .setHttpJsonCallOptions(callOptions) - .setHttpTransport(httpTransport) - .setJsonFactory(jsonFactory) - .setRequest(request) - .setEndpoint(endpoint) - .build(); - - executor.execute(runnable); - - return responseFuture; + + return HttpJsonClientCalls.eagerFutureUnaryCall( + newCall(methodDescriptor, callOptions), request); } @Override @@ -139,15 +124,14 @@ public void close() {} public static Builder newBuilder() { return new Builder() - .setHeaderEnhancers(new LinkedList()) + .setDefaultHeaders(HttpJsonMetadata.newBuilder().build()) .setExecutor(DEFAULT_EXECUTOR); } public static class Builder { private Executor executor; private String endpoint; - private JsonFactory jsonFactory = JSON_FACTORY; - private List headerEnhancers; + private HttpJsonMetadata defaultHeaders; private HttpTransport httpTransport; private Builder() {} @@ -162,8 +146,8 @@ public Builder setEndpoint(String endpoint) { return this; } - public Builder setHeaderEnhancers(List headerEnhancers) { - this.headerEnhancers = headerEnhancers; + public Builder setDefaultHeaders(HttpJsonMetadata defaultHeaders) { + this.defaultHeaders = defaultHeaders; return this; } @@ -174,8 +158,7 @@ public Builder setHttpTransport(HttpTransport httpTransport) { public ManagedHttpJsonChannel build() { Preconditions.checkNotNull(endpoint); - return new ManagedHttpJsonChannel( - executor, endpoint, jsonFactory, headerEnhancers, httpTransport); + return new ManagedHttpJsonChannel(executor, endpoint, httpTransport, defaultHeaders); } } } diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIterator.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIterator.java new file mode 100644 index 000000000..84167de5e --- /dev/null +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIterator.java @@ -0,0 +1,134 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import com.google.gson.stream.JsonWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.PipedReader; +import java.io.PipedWriter; +import java.io.Reader; +import java.util.Iterator; + +/** This class is not thread-safe and is expected to be used under external synchronization. */ +class ProtoMessageJsonStreamIterator implements Closeable, Iterator { + private volatile boolean arrayStarted; + private final JsonReader jsonReader; + private final PipedReader reader; + private final PipedWriter writer; + + ProtoMessageJsonStreamIterator(Reader rawReader) throws IOException { + this.arrayStarted = false; + this.jsonReader = new JsonReader(rawReader); + this.reader = new PipedReader(0x40000); // 256K + this.writer = new PipedWriter(); + writer.connect(reader); + } + + @Override + public void close() throws IOException { + reader.close(); + writer.close(); + jsonReader.close(); + } + + public boolean hasNext() { + try { + if (!arrayStarted) { + jsonReader.beginArray(); + arrayStarted = true; + } + return jsonReader.hasNext(); + } catch (IOException e) { + throw new RestSerializationException(e); + } + } + + @Override + public Reader next() { + try { + int nestedObjectCount = 0; + JsonWriter jsonWriter = new JsonWriter(writer); + do { + JsonToken token = jsonReader.peek(); + switch (token) { + case BEGIN_ARRAY: + jsonReader.beginArray(); + jsonWriter.beginArray(); + break; + case END_ARRAY: + jsonReader.endArray(); + jsonWriter.endArray(); + break; + case BEGIN_OBJECT: + nestedObjectCount++; + jsonReader.beginObject(); + jsonWriter.beginObject(); + break; + case END_OBJECT: + jsonReader.endObject(); + jsonWriter.endObject(); + nestedObjectCount--; + break; + case NAME: + String name = jsonReader.nextName(); + jsonWriter.name(name); + break; + case STRING: + String s = jsonReader.nextString(); + jsonWriter.value(s); + break; + case NUMBER: + String n = jsonReader.nextString(); + jsonWriter.value(n); + break; + case BOOLEAN: + boolean b = jsonReader.nextBoolean(); + jsonWriter.value(b); + break; + case NULL: + jsonReader.nextNull(); + jsonWriter.nullValue(); + break; + case END_DOCUMENT: + nestedObjectCount--; + } + } while (nestedObjectCount > 0); + + jsonWriter.flush(); + + return reader; + } catch (IOException e) { + throw new RestSerializationException(e); + } + } +} diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageResponseParser.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageResponseParser.java index fabf77ce7..2820b2c56 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageResponseParser.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoMessageResponseParser.java @@ -32,7 +32,10 @@ import com.google.api.core.BetaApi; import com.google.protobuf.Message; import com.google.protobuf.TypeRegistry; +import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; import java.nio.charset.StandardCharsets; /** The implementation of {@link HttpResponseParser} which works with protobuf messages. */ @@ -48,23 +51,31 @@ private ProtoMessageResponseParser(ResponseT defaultInstance, TypeRegistry defau this.defaultRegistry = defaultRegistry; } - public static - ProtoMessageResponseParser.Builder newBuilder() { - return new ProtoMessageResponseParser.Builder() + public static + ProtoMessageResponseParser.Builder newBuilder() { + return new ProtoMessageResponseParser.Builder() .setDefaultTypeRegistry(TypeRegistry.getEmptyTypeRegistry()); } /* {@inheritDoc} */ @Override public ResponseT parse(InputStream httpContent) { - return ProtoRestSerializer.create(defaultRegistry) - .fromJson(httpContent, StandardCharsets.UTF_8, defaultInstance.newBuilderForType()); + return parse(httpContent, defaultRegistry); } @Override public ResponseT parse(InputStream httpContent, TypeRegistry registry) { + try (Reader json = new InputStreamReader(httpContent, StandardCharsets.UTF_8)) { + return parse(json, registry); + } catch (IOException e) { + throw new RestSerializationException("Failed to parse response message", e); + } + } + + @Override + public ResponseT parse(Reader httpContent, TypeRegistry registry) { return ProtoRestSerializer.create(registry) - .fromJson(httpContent, StandardCharsets.UTF_8, defaultInstance.newBuilderForType()); + .fromJson(httpContent, defaultInstance.newBuilderForType()); } /* {@inheritDoc} */ diff --git a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoRestSerializer.java b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoRestSerializer.java index 39f352910..9c75be0b0 100644 --- a/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoRestSerializer.java +++ b/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ProtoRestSerializer.java @@ -36,10 +36,7 @@ import com.google.protobuf.TypeRegistry; import com.google.protobuf.util.JsonFormat; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Reader; -import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -86,15 +83,14 @@ String toJson(RequestT message) { /** * Deserializes a {@code message} from an input stream to a protobuf message. * - * @param message the input stream with a JSON-encoded message in it - * @param messageCharset the message charset + * @param json the input reader with a JSON-encoded message in it * @param builder an empty builder for the specific {@code RequestT} message to serialize * @throws RestSerializationException if failed to deserialize a protobuf message from the JSON * stream */ @SuppressWarnings("unchecked") - RequestT fromJson(InputStream message, Charset messageCharset, Message.Builder builder) { - try (Reader json = new InputStreamReader(message, messageCharset)) { + RequestT fromJson(Reader json, Message.Builder builder) { + try { JsonFormat.parser().usingTypeRegistry(registry).ignoringUnknownFields().merge(json, builder); return (RequestT) builder.build(); } catch (IOException e) { diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ApiMessageHttpRequestTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ApiMessageHttpRequestTest.java index a5ac52638..174e5ac90 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ApiMessageHttpRequestTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ApiMessageHttpRequestTest.java @@ -30,17 +30,13 @@ package com.google.api.gax.httpjson; import com.google.api.client.http.HttpRequest; -import com.google.api.client.json.gson.GsonFactory; import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.core.SettableApiFuture; import com.google.api.pathtemplate.PathTemplate; import com.google.api.resourcenames.ResourceName; import com.google.api.resourcenames.ResourceNameFactory; -import com.google.auth.Credentials; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.truth.Truth; -import com.google.protobuf.TypeRegistry; import java.io.IOException; import java.io.OutputStream; import java.util.HashMap; @@ -48,29 +44,15 @@ import java.util.Map; import javax.annotation.Nullable; import org.junit.Test; -import org.threeten.bp.Instant; +import org.mockito.Mockito; public class ApiMessageHttpRequestTest { private static final String ENDPOINT = "https://www.googleapis.com/animals/v1/projects/"; - private static PathTemplate nameTemplate = PathTemplate.create("name/{name}"); - - private static HttpJsonCallOptions fakeCallOptions = - new HttpJsonCallOptions() { - @Override - public Instant getDeadline() { - return null; - } - - @Override - public Credentials getCredentials() { - return null; - } - - @Override - public TypeRegistry getTypeRegistry() { - return null; - } - }; + private static final PathTemplate nameTemplate = PathTemplate.create("name/{name}"); + + @SuppressWarnings("unchecked") + private static final HttpResponseParser responseParser = + Mockito.mock(HttpResponseParser.class); @Test public void testFieldMask() throws IOException { @@ -149,18 +131,18 @@ public String getFieldValue(String s) { .setFullMethodName("house.details.get") .setHttpMethod(null) .setRequestFormatter(frogFormatter) + .setResponseParser(responseParser) .build(); - HttpRequestRunnable httpRequestRunnable = - HttpRequestRunnable.newBuilder() - .setHttpJsonCallOptions(fakeCallOptions) - .setEndpoint(ENDPOINT) - .setRequest(insertFrogRequest) - .setApiMethodDescriptor(apiMethodDescriptor) - .setHttpTransport(new MockHttpTransport()) - .setJsonFactory(new GsonFactory()) - .setResponseFuture(SettableApiFuture.create()) - .build(); + HttpRequestRunnable httpRequestRunnable = + new HttpRequestRunnable<>( + insertFrogRequest, + apiMethodDescriptor, + ENDPOINT, + HttpJsonCallOptions.newBuilder().build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); String expectedUrl = ENDPOINT + "name/tree_frog" + "?requestId=request57"; diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java index 4a2d59136..394f7df32 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java @@ -31,177 +31,158 @@ import static com.google.common.truth.Truth.assertThat; -import com.google.api.core.SettableApiFuture; -import com.google.api.pathtemplate.PathTemplate; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.TypeRegistry; -import java.io.InputStream; +import com.google.api.client.http.HttpResponseException; +import com.google.api.gax.httpjson.testing.MockHttpService; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ApiExceptionFactory; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.testing.FakeStatusCode; +import com.google.protobuf.Field; +import com.google.protobuf.Field.Cardinality; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.threeten.bp.Duration; -import org.threeten.bp.Instant; public class HttpJsonDirectCallableTest { - private final ApiMethodDescriptor API_DESCRIPTOR = - ApiMethodDescriptor.newBuilder() - .setFullMethodName("fakeMethod") - .setHttpMethod("GET") - .setRequestFormatter(new FakeRequestFormatter()) - .setResponseParser(new FakeResponseParser()) + private static final ApiMethodDescriptor FAKE_METHOD_DESCRIPTOR = + ApiMethodDescriptor.newBuilder() + .setFullMethodName("google.cloud.v1.Fake/FakeMethod") + .setHttpMethod("POST") + .setRequestFormatter( + ProtoMessageRequestFormatter.newBuilder() + .setPath( + "/fake/v1/name/{name}", + request -> { + Map fields = new HashMap<>(); + ProtoRestSerializer serializer = ProtoRestSerializer.create(); + serializer.putPathParam(fields, "name", request.getName()); + return fields; + }) + .setQueryParamsExtractor( + request -> { + Map> fields = new HashMap<>(); + ProtoRestSerializer serializer = ProtoRestSerializer.create(); + serializer.putQueryParam(fields, "number", request.getNumber()); + return fields; + }) + .setRequestBodyExtractor( + request -> + ProtoRestSerializer.create() + .toBody("*", request.toBuilder().clearName().build())) + .build()) + .setResponseParser( + ProtoMessageResponseParser.newBuilder() + .setDefaultInstance(Field.getDefaultInstance()) + .build()) .build(); - @SuppressWarnings("unchecked") - @Test - public void testTimeout() { - HttpJsonChannel mockChannel = Mockito.mock(HttpJsonChannel.class); - - String expectedRequest = "fake"; - - HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(API_DESCRIPTOR); - - // Mock the channel that captures the call options - ArgumentCaptor capturedCallOptions = - ArgumentCaptor.forClass(HttpJsonCallOptions.class); - - Mockito.when( - mockChannel.issueFutureUnaryCall( - capturedCallOptions.capture(), - Mockito.anyString(), - Mockito.any(ApiMethodDescriptor.class))) - .thenReturn(SettableApiFuture.create()); - - // Compose the call context - Duration timeout = Duration.ofSeconds(10); - Instant minExpectedDeadline = Instant.now().plus(timeout); - - HttpJsonCallContext callContext = - HttpJsonCallContext.createDefault().withChannel(mockChannel).withTimeout(timeout); - - callable.futureCall(expectedRequest, callContext); - - Instant maxExpectedDeadline = Instant.now().plus(timeout); + private static final MockHttpService MOCK_SERVICE = + new MockHttpService(Collections.singletonList(FAKE_METHOD_DESCRIPTOR), "google.com:443"); + + private final ManagedHttpJsonChannel channel = + ManagedHttpJsonChannel.newBuilder() + .setEndpoint("google.com:443") + .setDefaultHeaders( + HttpJsonMetadata.newBuilder() + .setHeaders(Collections.singletonMap("header-key", "headerValue")) + .build()) + .setExecutor(executorService) + .setHttpTransport(MOCK_SERVICE) + .build(); - // Verify that the timeout was converted into a deadline - assertThat(capturedCallOptions.getValue().getDeadline()).isAtLeast(minExpectedDeadline); - assertThat(capturedCallOptions.getValue().getDeadline()).isAtMost(maxExpectedDeadline); + private static ExecutorService executorService; + + @BeforeClass + public static void initialize() { + executorService = + Executors.newFixedThreadPool( + 2, + r -> { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }); } - @SuppressWarnings("unchecked") - @Test - public void testTimeoutAfterDeadline() { - HttpJsonChannel mockChannel = Mockito.mock(HttpJsonChannel.class); - - String expectedRequest = "fake"; - - HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(API_DESCRIPTOR); - - // Mock the channel that captures the call options - ArgumentCaptor capturedCallOptions = - ArgumentCaptor.forClass(HttpJsonCallOptions.class); - - Mockito.when( - mockChannel.issueFutureUnaryCall( - capturedCallOptions.capture(), - Mockito.anyString(), - Mockito.any(ApiMethodDescriptor.class))) - .thenReturn(SettableApiFuture.create()); - - // Compose the call context - Instant priorDeadline = Instant.now().plusSeconds(5); - Duration timeout = Duration.ofSeconds(10); - - HttpJsonCallContext callContext = - HttpJsonCallContext.createDefault() - .withChannel(mockChannel) - .withDeadline(priorDeadline) - .withTimeout(timeout); - - callable.futureCall(expectedRequest, callContext); + @AfterClass + public static void destroy() { + executorService.shutdownNow(); + } - // Verify that the timeout was ignored - assertThat(capturedCallOptions.getValue().getDeadline()).isEqualTo(priorDeadline); + @After + public void tearDown() { + MOCK_SERVICE.reset(); } - @SuppressWarnings("unchecked") @Test - public void testTimeoutBeforeDeadline() { - HttpJsonChannel mockChannel = Mockito.mock(HttpJsonChannel.class); - - String expectedRequest = "fake"; - - HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(API_DESCRIPTOR); - - // Mock the channel that captures the call options - ArgumentCaptor capturedCallOptions = - ArgumentCaptor.forClass(HttpJsonCallOptions.class); - - Mockito.when( - mockChannel.issueFutureUnaryCall( - capturedCallOptions.capture(), - Mockito.anyString(), - Mockito.any(ApiMethodDescriptor.class))) - .thenReturn(SettableApiFuture.create()); - - // Compose the call context - Duration timeout = Duration.ofSeconds(10); - Instant subsequentDeadline = Instant.now().plusSeconds(15); - - Instant minExpectedDeadline = Instant.now().plus(timeout); + public void testSuccessfulUnaryResponse() throws ExecutionException, InterruptedException { + HttpJsonDirectCallable callable = + new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); HttpJsonCallContext callContext = HttpJsonCallContext.createDefault() - .withChannel(mockChannel) - .withDeadline(subsequentDeadline) - .withTimeout(timeout); - - callable.futureCall(expectedRequest, callContext); - - Instant maxExpectedDeadline = Instant.now().plus(timeout); - - // Verify that the timeout was converted into a deadline - assertThat(capturedCallOptions.getValue().getDeadline()).isAtLeast(minExpectedDeadline); - assertThat(capturedCallOptions.getValue().getDeadline()).isAtMost(maxExpectedDeadline); - } - - private static final class FakeRequestFormatter implements HttpRequestFormatter { - @Override - public Map> getQueryParamNames(String apiMessage) { - return ImmutableMap.of(); - } - - @Override - public String getRequestBody(String apiMessage) { - return "fake"; - } - - @Override - public String getPath(String apiMessage) { - return "/fake/path"; - } - - @Override - public PathTemplate getPathTemplate() { - return PathTemplate.create("/fake/path"); - } + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); + + Field request; + Field expectedResponse; + request = + expectedResponse = + Field.newBuilder() // "echo" service + .setName("imTheBestField") + .setNumber(2) + .setCardinality(Cardinality.CARDINALITY_OPTIONAL) + .setDefaultValue("blah") + .build(); + + MOCK_SERVICE.addResponse(expectedResponse); + + Field actualResponse = callable.futureCall(request, callContext).get(); + + assertThat(actualResponse).isEqualTo(expectedResponse); + assertThat(MOCK_SERVICE.getRequestPaths().size()).isEqualTo(1); + String headerValue = MOCK_SERVICE.getRequestHeaders().get("header-key").iterator().next(); + assertThat(headerValue).isEqualTo("headerValue"); } - private static final class FakeResponseParser implements HttpResponseParser { - @Override - public String parse(InputStream httpContent) { - return "fake"; - } - - @Override - public String parse(InputStream httpContent, TypeRegistry registry) { - return parse(httpContent); - } - - @Override - public String serialize(String response) { - return response; + @Test + public void testErrorUnaryResponse() throws InterruptedException { + HttpJsonDirectCallable callable = + new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); + + HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + + Field request; + request = + Field.newBuilder() // "echo" service + .setName("imTheBestField") + .setNumber(2) + .setCardinality(Cardinality.CARDINALITY_OPTIONAL) + .setDefaultValue("blah") + .build(); + + ApiException exception = + ApiExceptionFactory.createException( + new Exception(), FakeStatusCode.of(Code.NOT_FOUND), false); + MOCK_SERVICE.addException(exception); + + try { + callable.futureCall(request, callContext).get(); + Assert.fail("No exception raised"); + } catch (ExecutionException e) { + HttpResponseException respExp = (HttpResponseException) e.getCause(); + assertThat(respExp.getStatusCode()).isEqualTo(400); + assertThat(respExp.getContent()).isEqualTo(exception.toString()); } } } diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java new file mode 100644 index 000000000..094b09e49 --- /dev/null +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java @@ -0,0 +1,367 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType; +import com.google.api.gax.httpjson.testing.MockHttpService; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.ServerStreamingCallSettings; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StateCheckingResponseObserver; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.testing.FakeCallContext; +import com.google.common.collect.Lists; +import com.google.common.truth.Truth; +import com.google.protobuf.Field; +import com.google.type.Color; +import com.google.type.Money; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class HttpJsonDirectServerStreamingCallableTest { + private static final ApiMethodDescriptor METHOD_SERVER_STREAMING_RECOGNIZE = + ApiMethodDescriptor.newBuilder() + .setFullMethodName("google.cloud.v1.Fake/ServerStreamingRecognize") + .setHttpMethod("POST") + .setRequestFormatter( + ProtoMessageRequestFormatter.newBuilder() + .setPath( + "/fake/v1/recognize/{blue}", + request -> { + Map fields = new HashMap<>(); + ProtoRestSerializer serializer = ProtoRestSerializer.create(); + serializer.putPathParam(fields, "blue", request.getBlue()); + return fields; + }) + .setQueryParamsExtractor( + request -> { + Map> fields = new HashMap<>(); + ProtoRestSerializer serializer = ProtoRestSerializer.create(); + serializer.putQueryParam(fields, "red", request.getRed()); + return fields; + }) + .setRequestBodyExtractor( + request -> + ProtoRestSerializer.create() + .toBody("*", request.toBuilder().clearBlue().clearRed().build())) + .build()) + .setResponseParser( + ProtoMessageResponseParser.newBuilder() + .setDefaultInstance(Money.getDefaultInstance()) + .build()) + .setType(MethodType.SERVER_STREAMING) + .build(); + + private static final MockHttpService MOCK_SERVICE = + new MockHttpService( + Collections.singletonList(METHOD_SERVER_STREAMING_RECOGNIZE), "google.com:443"); + + private static final Color DEFAULT_REQUEST = Color.newBuilder().setRed(0.5f).build(); + private static final Color ASYNC_REQUEST = DEFAULT_REQUEST.toBuilder().setGreen(1000).build(); + private static final Color ERROR_REQUEST = Color.newBuilder().setRed(-1).build(); + private static final Money DEFAULT_RESPONSE = + Money.newBuilder().setCurrencyCode("USD").setUnits(127).build(); + private static final Money DEFAULTER_RESPONSE = + Money.newBuilder().setCurrencyCode("UAH").setUnits(255).build(); + + private ClientContext clientContext; + private ServerStreamingCallSettings streamingCallSettings; + private ServerStreamingCallable streamingCallable; + + private static ExecutorService executorService; + + @BeforeClass + public static void initialize() { + executorService = Executors.newFixedThreadPool(2); + } + + @AfterClass + public static void destroy() { + executorService.shutdownNow(); + } + + @Before + public void setUp() throws InstantiationException, IllegalAccessException, IOException { + ManagedHttpJsonChannel channel = + ManagedHttpJsonChannel.newBuilder() + .setEndpoint("google.com:443") + .setDefaultHeaders( + HttpJsonMetadata.newBuilder() + .setHeaders(Collections.singletonMap("header-key", "headerValue")) + .build()) + .setExecutor(executorService) + .setHttpTransport(MOCK_SERVICE) + .build(); + + clientContext = + ClientContext.newBuilder() + .setTransportChannel(HttpJsonTransportChannel.create(channel)) + .setDefaultCallContext(HttpJsonCallContext.of(channel, HttpJsonCallOptions.DEFAULT)) + .build(); + streamingCallSettings = ServerStreamingCallSettings.newBuilder().build(); + streamingCallable = + HttpJsonCallableFactory.createServerStreamingCallable( + HttpJsonCallSettings.create(METHOD_SERVER_STREAMING_RECOGNIZE), + streamingCallSettings, + clientContext); + } + + @After + public void tearDown() { + MOCK_SERVICE.reset(); + } + + @Test + public void testBadContext() { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + streamingCallable = + HttpJsonCallableFactory.createServerStreamingCallable( + HttpJsonCallSettings.create(METHOD_SERVER_STREAMING_RECOGNIZE), + streamingCallSettings, + clientContext + .toBuilder() + .setDefaultCallContext(FakeCallContext.createDefault()) + .build()); + + CountDownLatch latch = new CountDownLatch(1); + + MoneyObserver observer = new MoneyObserver(true, latch); + try { + streamingCallable.call(DEFAULT_REQUEST, observer); + Assert.fail("Callable should have thrown an exception"); + } catch (IllegalArgumentException expected) { + Truth.assertThat(expected) + .hasMessageThat() + .contains("context must be an instance of HttpJsonCallContext"); + } + } + + @Test + public void testServerStreamingStart() throws InterruptedException { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + CountDownLatch latch = new CountDownLatch(1); + MoneyObserver moneyObserver = new MoneyObserver(true, latch); + + streamingCallable.call(DEFAULT_REQUEST, moneyObserver); + + Truth.assertThat(moneyObserver.controller).isNotNull(); + // wait for the task to complete, otherwise it may interfere with other tests, since they share + // the same MockService and unfinished request in this tes may start readind messages designated + // for other tests. + Truth.assertThat(latch.await(2, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void testServerStreaming() throws InterruptedException { + + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE}); + CountDownLatch latch = new CountDownLatch(3); + MoneyObserver moneyObserver = new MoneyObserver(true, latch); + + streamingCallable.call(DEFAULT_REQUEST, moneyObserver); + + Truth.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + Truth.assertThat(latch.getCount()).isEqualTo(0); + Truth.assertThat(moneyObserver.error).isNull(); + Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULTER_RESPONSE); + } + + @Test + public void testManualFlowControl() throws Exception { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + CountDownLatch latch = new CountDownLatch(2); + MoneyObserver moneyObserver = new MoneyObserver(false, latch); + + streamingCallable.call(DEFAULT_REQUEST, moneyObserver); + + Truth.assertThat(latch.await(1000, TimeUnit.MILLISECONDS)).isFalse(); + Truth.assertWithMessage("Received response before requesting it") + .that(moneyObserver.response) + .isNull(); + + moneyObserver.controller.request(1); + Truth.assertThat(latch.await(1000, TimeUnit.MILLISECONDS)).isTrue(); + + Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE); + Truth.assertThat(moneyObserver.completed).isTrue(); + } + + @Test + public void testCancelClientCall() throws Exception { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + CountDownLatch latch = new CountDownLatch(1); + MoneyObserver moneyObserver = new MoneyObserver(false, latch); + + streamingCallable.call(ASYNC_REQUEST, moneyObserver); + + moneyObserver.controller.cancel(); + moneyObserver.controller.request(1); + Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue(); + + Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class); + Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream"); + } + + @Test + public void testOnResponseError() throws Throwable { + MOCK_SERVICE.addException(404, new RuntimeException("some error")); + + CountDownLatch latch = new CountDownLatch(1); + MoneyObserver moneyObserver = new MoneyObserver(true, latch); + + streamingCallable.call(ERROR_REQUEST, moneyObserver); + Truth.assertThat(latch.await(1000, TimeUnit.MILLISECONDS)).isTrue(); + + Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class); + Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode()) + .isEqualTo(Code.NOT_FOUND); + Truth.assertThat(moneyObserver.error) + .hasMessageThat() + .isEqualTo( + "com.google.api.client.http.HttpResponseException: 404\n" + + "POST https://google.com:443/fake/v1/recognize/0.0?red=-1.0\n" + + "java.lang.RuntimeException: some error"); + } + + @Test + public void testObserverErrorCancelsCall() throws Throwable { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + final RuntimeException expectedCause = new RuntimeException("some error"); + final SettableApiFuture actualErrorF = SettableApiFuture.create(); + + ResponseObserver moneyObserver = + new StateCheckingResponseObserver() { + @Override + protected void onStartImpl(StreamController controller) {} + + @Override + protected void onResponseImpl(Money response) { + throw expectedCause; + } + + @Override + protected void onErrorImpl(Throwable t) { + actualErrorF.set(t); + } + + @Override + protected void onCompleteImpl() { + actualErrorF.set(null); + } + }; + + streamingCallable.call(DEFAULT_REQUEST, moneyObserver); + Throwable actualError = actualErrorF.get(11500, TimeUnit.MILLISECONDS); + + Truth.assertThat(actualError).isInstanceOf(ApiException.class); + Truth.assertThat(((ApiException) actualError).getStatusCode().getCode()) + .isEqualTo(StatusCode.Code.CANCELLED); + + // gax httpjson transport layer is responsible for the immediate cancellation + Truth.assertThat(actualError.getCause()).isInstanceOf(HttpJsonStatusRuntimeException.class); + // and the client error is cause for httpjson transport layer to cancel it + Truth.assertThat(actualError.getCause().getCause()).isSameInstanceAs(expectedCause); + } + + @Test + public void testBlockingServerStreaming() { + MOCK_SERVICE.addResponse(new Money[] {DEFAULT_RESPONSE}); + Color request = Color.newBuilder().setRed(0.5f).build(); + ServerStream response = streamingCallable.call(request); + List responseData = Lists.newArrayList(response); + + Money expected = Money.newBuilder().setCurrencyCode("USD").setUnits(127).build(); + Truth.assertThat(responseData).containsExactly(expected); + } + + static class MoneyObserver extends StateCheckingResponseObserver { + private final boolean autoFlowControl; + private final CountDownLatch latch; + + volatile StreamController controller; + volatile Money response; + volatile Throwable error; + volatile boolean completed; + + MoneyObserver(boolean autoFlowControl, CountDownLatch latch) { + this.autoFlowControl = autoFlowControl; + this.latch = latch; + } + + @Override + protected void onStartImpl(StreamController controller) { + this.controller = controller; + if (!autoFlowControl) { + controller.disableAutoInboundFlowControl(); + } + } + + @Override + protected void onResponseImpl(Money value) { + response = value; + latch.countDown(); + } + + @Override + protected void onErrorImpl(Throwable t) { + error = t; + latch.countDown(); + } + + @Override + protected void onCompleteImpl() { + completed = true; + latch.countDown(); + } + } +} diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java index 44672b28b..f2846f6a0 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java @@ -31,19 +31,14 @@ import com.google.api.client.http.EmptyContent; import com.google.api.client.http.HttpRequest; -import com.google.api.client.json.gson.GsonFactory; import com.google.api.client.testing.http.MockHttpTransport; -import com.google.api.core.SettableApiFuture; import com.google.api.gax.httpjson.testing.FakeApiMessage; import com.google.api.pathtemplate.PathTemplate; -import com.google.auth.Credentials; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.truth.Truth; -import com.google.protobuf.TypeRegistry; import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -51,39 +46,20 @@ import java.util.TreeMap; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Instant; +import org.mockito.Mockito; public class HttpRequestRunnableTest { - private static HttpJsonCallOptions fakeCallOptions; private static CatMessage catMessage; private static final String ENDPOINT = "https://www.googleapis.com/animals/v1/projects/"; private static HttpRequestFormatter catFormatter; private static HttpResponseParser catParser; - private static PathTemplate nameTemplate = PathTemplate.create("name/{name}"); - private static Set queryParams = + private static final PathTemplate nameTemplate = PathTemplate.create("name/{name}"); + private static final Set queryParams = Sets.newTreeSet(Lists.newArrayList("food", "size", "gibberish")); @SuppressWarnings("unchecked") @BeforeClass public static void setUp() { - fakeCallOptions = - new HttpJsonCallOptions() { - @Override - public Instant getDeadline() { - return null; - } - - @Override - public Credentials getCredentials() { - return null; - } - - @Override - public TypeRegistry getTypeRegistry() { - return null; - } - }; - catMessage = new CatMessage( ImmutableMap.of( @@ -131,23 +107,7 @@ public PathTemplate getPathTemplate() { } }; - catParser = - new HttpResponseParser() { - @Override - public EmptyMessage parse(InputStream httpContent) { - return null; - } - - @Override - public EmptyMessage parse(InputStream httpContent, TypeRegistry registry) { - return null; - } - - @Override - public String serialize(EmptyMessage response) { - return null; - } - }; + catParser = Mockito.mock(HttpResponseParser.class); } @Test @@ -161,15 +121,14 @@ public void testRequestUrl() throws IOException { .build(); HttpRequestRunnable httpRequestRunnable = - HttpRequestRunnable.newBuilder() - .setHttpJsonCallOptions(fakeCallOptions) - .setEndpoint(ENDPOINT) - .setRequest(catMessage) - .setApiMethodDescriptor(methodDescriptor) - .setHttpTransport(new MockHttpTransport()) - .setJsonFactory(new GsonFactory()) - .setResponseFuture(SettableApiFuture.create()) - .build(); + new HttpRequestRunnable<>( + catMessage, + methodDescriptor, + ENDPOINT, + HttpJsonCallOptions.newBuilder().build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); Truth.assertThat(httpRequest.getContent()).isInstanceOf(EmptyContent.class); @@ -188,15 +147,15 @@ public void testRequestUrlUnnormalized() throws IOException { .build(); HttpRequestRunnable httpRequestRunnable = - HttpRequestRunnable.newBuilder() - .setHttpJsonCallOptions(fakeCallOptions) - .setEndpoint("www.googleapis.com/animals/v1/projects") - .setRequest(catMessage) - .setApiMethodDescriptor(methodDescriptor) - .setHttpTransport(new MockHttpTransport()) - .setJsonFactory(new GsonFactory()) - .setResponseFuture(SettableApiFuture.create()) - .build(); + new HttpRequestRunnable<>( + catMessage, + methodDescriptor, + "www.googleapis.com/animals/v1/projects", + HttpJsonCallOptions.newBuilder().build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); + HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); Truth.assertThat(httpRequest.getContent()).isInstanceOf(EmptyContent.class); String expectedUrl = @@ -217,15 +176,15 @@ public void testRequestUrlUnnormalizedPatch() throws IOException { .build(); HttpRequestRunnable httpRequestRunnable = - HttpRequestRunnable.newBuilder() - .setHttpJsonCallOptions(fakeCallOptions) - .setEndpoint("www.googleapis.com/animals/v1/projects") - .setRequest(catMessage) - .setApiMethodDescriptor(methodDescriptor) - .setHttpTransport(new MockHttpTransport()) - .setJsonFactory(new GsonFactory()) - .setResponseFuture(SettableApiFuture.create()) - .build(); + new HttpRequestRunnable<>( + catMessage, + methodDescriptor, + "www.googleapis.com/animals/v1/projects", + HttpJsonCallOptions.newBuilder().build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); + HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); Truth.assertThat(httpRequest.getContent()).isInstanceOf(EmptyContent.class); String expectedUrl = diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/MockHttpServiceTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/MockHttpServiceTest.java index 3571275e9..d41b974ae 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/MockHttpServiceTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/MockHttpServiceTest.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.Reader; import java.util.List; import java.util.Map; import org.junit.Before; @@ -104,6 +105,11 @@ public PetMessage parse(InputStream httpContent, TypeRegistry registry) { return parse(httpContent); } + @Override + public PetMessage parse(Reader httpContent, TypeRegistry registry) { + return null; + } + @Override public String serialize(PetMessage response) { return ((List) response.getFieldValue("type")).get(0); diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIteratorTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIteratorTest.java new file mode 100644 index 000000000..c9836db9a --- /dev/null +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoMessageJsonStreamIteratorTest.java @@ -0,0 +1,238 @@ +/* + * Copyright 2022 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.google.api.gax.httpjson; + +import com.google.common.truth.Truth; +import com.google.protobuf.Field; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Option; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.io.StringReader; +import org.junit.Test; + +public class ProtoMessageJsonStreamIteratorTest { + + @Test + public void testEmpty() throws IOException { + String jsonData = "[]"; + ProtoMessageJsonStreamIterator streamIter = + new ProtoMessageJsonStreamIterator(new StringReader(jsonData)); + + Truth.assertThat(streamIter.hasNext()).isFalse(); + streamIter.close(); + } + + @Test + public void testSingleElement() throws IOException { + Field[] expectedData = + new Field[] { + Field.newBuilder() + .setName("cat") + .addOptions(Option.newBuilder().setName("haha").build()) + .addOptions(Option.newBuilder().setName("hoho").build()) + .setNumber(1) + .setDefaultValue("mew") + .build() + }; + + String jsonData = + "[{\n" + + " \"number\": 1,\n" + + " \"name\": \"cat\",\n" + + " \"options\": [{\n" + + " \"name\": \"haha\"\n" + + " }, {\n" + + " \"name\": \"hoho\"\n" + + " }],\n" + + " \"defaultValue\": \"mew\"\n" + + "}]"; + + ProtoMessageJsonStreamIterator streamIter = + new ProtoMessageJsonStreamIterator(new StringReader(jsonData)); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + Field.Builder builder = Field.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[0]); + + Truth.assertThat(streamIter.hasNext()).isFalse(); + + streamIter.close(); + // closing a closed iterator should be no-op. + streamIter.close(); + } + + @Test + public void testProtobufWrapperObjects() throws IOException { + Int64Value[] expectedData = + new Int64Value[] { + Int64Value.newBuilder().setValue(1234567889999977L).build(), + Int64Value.newBuilder().setValue(2234567889999977L).build(), + Int64Value.newBuilder().setValue(3234567889999977L).build() + }; + + String jsonData = "[\"1234567889999977\", \t \"2234567889999977\",\n\"3234567889999977\"]"; + + ProtoMessageJsonStreamIterator streamIter = + new ProtoMessageJsonStreamIterator(new StringReader(jsonData)); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + Int64Value.Builder builder = Int64Value.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[0]); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + builder = Int64Value.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[1]); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + builder = Int64Value.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[2]); + + Truth.assertThat(streamIter.hasNext()).isFalse(); + + streamIter.close(); + } + + @Test + public void testMultipleElements() throws IOException { + Field[] expectedData = + new Field[] { + Field.newBuilder() + .setName("cat") + .addOptions(Option.newBuilder().setName("haha").build()) + .addOptions(Option.newBuilder().setName("hoho").build()) + .setNumber(1) + .setDefaultValue("mew") + .build(), + Field.newBuilder() + .setName("dog") + .addOptions(Option.newBuilder().setName("muu").build()) + .setNumber(2) + .setDefaultValue("woof") + .build(), + Field.newBuilder() + .setName("cow") + .addOptions(Option.newBuilder().setName("bee").build()) + .setNumber(3) + .setDefaultValue("muu") + .build() + }; + + String jsonData = + "[{\n" + + " \"number\": 1,\n" + + " \"name\": \"cat\",\n" + + " \"options\": [{\n" + + " \"name\": \"haha\"\n" + + " }, {\n" + + " \"name\": \"hoho\"\n" + + " }],\n" + + " \"defaultValue\": \"mew\"\n" + + "},\n" + + "{\n" + + " \"number\": 2,\n" + + " \"name\": \"dog\",\n" + + " \"options\": [{\n" + + " \"name\": \"muu\"\n" + + " }],\n" + + " \"defaultValue\": \"woof\"\n" + + "},\n" + + "{\n" + + " \"number\": 3,\n" + + " \"name\": \"cow\",\n" + + " \"options\": [{\n" + + " \"name\": \"bee\"\n" + + " }],\n" + + " \"defaultValue\": \"muu\"\n" + + "}]"; + + ProtoMessageJsonStreamIterator streamIter = + new ProtoMessageJsonStreamIterator(new StringReader(jsonData)); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + Field.Builder builder = Field.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[0]); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + builder = Field.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[1]); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + builder = Field.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData[2]); + + Truth.assertThat(streamIter.hasNext()).isFalse(); + + streamIter.close(); + } + + @Test + public void testEscapedString() throws IOException { + Field expectedData = + Field.newBuilder() + .setName( + "[{\n" + + "\"fInt32\": 23,\n" + + "\"fInt64\": \"1234567889999977\",\n" + + "\"fDouble\": 1234.343232226,\n" + + "\"fKingdom\": \"ARCHAEBACTERIA\"\n" + + "}]") + .build(); + + String jsonData = + "[{\n" + + " \"name\": \"[{\\n" + + "\\\"fInt32\\\": 23,\\n" + + "\\\"fInt64\\\": \\\"1234567889999977\\\",\\n" + + "\\\"fDouble\\\": 1234.343232226,\\n" + + "\\\"fKingdom\\\": \\\"ARCHAEBACTERIA\\\"\\n" + + "}]\"\n" + + "}]"; + + ProtoMessageJsonStreamIterator streamIter = + new ProtoMessageJsonStreamIterator(new StringReader(jsonData)); + + Truth.assertThat(streamIter.hasNext()).isTrue(); + Field.Builder builder = Field.newBuilder(); + JsonFormat.parser().merge(streamIter.next(), builder); + Truth.assertThat(builder.build()).isEqualTo(expectedData); + Truth.assertThat(streamIter.hasNext()).isFalse(); + + streamIter.close(); + } +} diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoRestSerializerTest.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoRestSerializerTest.java index 16199dd40..29d648965 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoRestSerializerTest.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/ProtoRestSerializerTest.java @@ -34,9 +34,8 @@ import com.google.protobuf.Field; import com.google.protobuf.Field.Cardinality; import com.google.protobuf.Option; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.io.StringReader; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -84,21 +83,14 @@ public void toJson() { @Test public void fromJson() { Field fieldFromJson = - requestSerializer.fromJson( - new ByteArrayInputStream(fieldJson.getBytes(StandardCharsets.UTF_8)), - StandardCharsets.UTF_8, - Field.newBuilder()); - + requestSerializer.fromJson(new StringReader(fieldJson), Field.newBuilder()); Truth.assertThat(fieldFromJson).isEqualTo(field); } @Test public void fromJsonInvalidJson() { try { - requestSerializer.fromJson( - new ByteArrayInputStream("heh".getBytes(StandardCharsets.UTF_8)), - StandardCharsets.UTF_8, - Field.newBuilder()); + requestSerializer.fromJson(new StringReader("heh"), Field.newBuilder()); Assert.fail(); } catch (RestSerializationException e) { Truth.assertThat(e.getCause()).isInstanceOf(IOException.class); diff --git a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java index e6fb4d586..a682088ed 100644 --- a/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java +++ b/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java @@ -36,6 +36,7 @@ import com.google.api.client.testing.http.MockLowLevelHttpRequest; import com.google.api.client.testing.http.MockLowLevelHttpResponse; import com.google.api.gax.httpjson.ApiMethodDescriptor; +import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType; import com.google.api.pathtemplate.PathTemplate; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -49,14 +50,18 @@ /** * Mocks an HTTPTransport. Expected responses and exceptions can be added to a queue from which this * mock HttpTransport polls when it relays a response. + * + *

As required by {@link MockHttpTransport} this implementation is thread-safe, but it is not + * idempotent (as a typical service would be) and must be used with extra caution. Mocked responses + * are returned in FIFO order and if multiple threads read from the same MockHttpService + * simultaneously, they may be getting responses intended for other consumers. */ public final class MockHttpService extends MockHttpTransport { - private final Multimap requestHeaders = LinkedListMultimap.create(); private final List requestPaths = new LinkedList<>(); private final Queue responseHandlers = new LinkedList<>(); - private List serviceMethodDescriptors; - private String endpoint; + private final List serviceMethodDescriptors; + private final String endpoint; /** * Create a MockHttpService. @@ -68,133 +73,182 @@ public final class MockHttpService extends MockHttpTransport { */ public MockHttpService(List serviceMethodDescriptors, String pathPrefix) { this.serviceMethodDescriptors = ImmutableList.copyOf(serviceMethodDescriptors); - endpoint = pathPrefix; + this.endpoint = pathPrefix; } @Override - public LowLevelHttpRequest buildRequest(final String method, final String url) { + public synchronized LowLevelHttpRequest buildRequest(String method, String url) { requestPaths.add(url); - return new MockLowLevelHttpRequest() { - @Override - public void addHeader(String name, String value) { - requestHeaders.put(name, value); - } - - @Override - public LowLevelHttpResponse execute() { - return getHttpResponse(method, url); - } - }; + return new MockHttpRequest(this, method, url); } /** Add an ApiMessage to the response queue. */ - public void addResponse(final Object response) { - responseHandlers.add( - new MockHttpService.HttpResponseFactory() { - @Override - public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String fullTargetUrl) { - MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse(); - Preconditions.checkArgument( - serviceMethodDescriptors != null, - "MockHttpService has null serviceMethodDescriptors."); - - String relativePath = getRelativePath(fullTargetUrl); - - for (ApiMethodDescriptor methodDescriptor : serviceMethodDescriptors) { - // Check the comment in com.google.api.gax.httpjson.HttpRequestRunnable.buildRequest() - // method for details why it is needed. - String descriptorHttpMethod = methodDescriptor.getHttpMethod(); - if (!httpMethod.equals(descriptorHttpMethod)) { - if (!(HttpMethods.PATCH.equals(descriptorHttpMethod) - && HttpMethods.POST.equals(httpMethod))) { - continue; - } - } - - PathTemplate pathTemplate = methodDescriptor.getRequestFormatter().getPathTemplate(); - // Server figures out which RPC method is called based on the endpoint path pattern. - if (!pathTemplate.matches(relativePath)) { - continue; - } - - // Emulate the server's creation of an HttpResponse from the response message - // instance. - String httpContent = methodDescriptor.getResponseParser().serialize(response); - - httpResponse.setContent(httpContent.getBytes()); - httpResponse.setStatusCode(200); - return httpResponse; - } - - // Return 404 when none of this server's endpoint templates match the given URL. - httpResponse.setContent( - String.format("Method not found for path '%s'", relativePath).getBytes()); - httpResponse.setStatusCode(404); - return httpResponse; - } - }); + public synchronized void addResponse(Object response) { + responseHandlers.add(new MessageResponseFactory(endpoint, serviceMethodDescriptors, response)); } /** Add an expected null response (empty HTTP response body). */ - public void addNullResponse() { + public synchronized void addNullResponse() { responseHandlers.add( - new MockHttpService.HttpResponseFactory() { - @Override - public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl) { - return new MockLowLevelHttpResponse().setStatusCode(200); - } - }); + (httpMethod, targetUrl) -> new MockLowLevelHttpResponse().setStatusCode(200)); } /** Add an Exception to the response queue. */ - public void addException(final Exception exception) { - responseHandlers.add( - new MockHttpService.HttpResponseFactory() { - @Override - public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl) { - MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse(); - httpResponse.setStatusCode(400); - httpResponse.setContent(exception.toString().getBytes()); - httpResponse.setContentEncoding("text/plain"); - return httpResponse; - } - }); + public synchronized void addException(Exception exception) { + addException(400, exception); + } + + public synchronized void addException(int statusCode, Exception exception) { + responseHandlers.add(new ExceptionResponseFactory(statusCode, exception)); } /** Get the FIFO list of URL paths to which requests were sent. */ - public List getRequestPaths() { + public synchronized List getRequestPaths() { return requestPaths; } /** Get the FIFO list of request headers sent. */ - public Multimap getRequestHeaders() { + public synchronized Multimap getRequestHeaders() { return ImmutableListMultimap.copyOf(requestHeaders); } + private synchronized void putRequestHeader(String name, String value) { + requestHeaders.put(name, value); + } + + private synchronized MockLowLevelHttpResponse getHttpResponse(String method, String url) { + Preconditions.checkArgument(!responseHandlers.isEmpty()); + return responseHandlers.poll().getHttpResponse(method, url); + } + /* Reset the expected response queue, the method descriptor, and the logged request paths list. */ - public void reset() { + public synchronized void reset() { responseHandlers.clear(); requestPaths.clear(); requestHeaders.clear(); } - private String getRelativePath(String fullTargetUrl) { - // relativePath will be repeatedly truncated until it contains only - // the path template substring of the endpoint URL. - String relativePath = fullTargetUrl.replaceFirst(endpoint, ""); - int queryParamIndex = relativePath.indexOf("?"); - queryParamIndex = queryParamIndex < 0 ? relativePath.length() : queryParamIndex; - relativePath = relativePath.substring(0, queryParamIndex); + private interface HttpResponseFactory { + MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl); + } - return relativePath; + private static class MockHttpRequest extends MockLowLevelHttpRequest { + private final MockHttpService service; + private final String method; + private final String url; + + public MockHttpRequest(MockHttpService service, String method, String url) { + this.service = service; + this.method = method; + this.url = url; + } + + @Override + public void addHeader(String name, String value) { + service.putRequestHeader(name, value); + } + + @Override + public LowLevelHttpResponse execute() { + return service.getHttpResponse(method, url); + } } - private MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl) { - Preconditions.checkArgument(!responseHandlers.isEmpty()); - return responseHandlers.poll().getHttpResponse(httpMethod, targetUrl); + private static class ExceptionResponseFactory implements HttpResponseFactory { + private final int statusCode; + private final Exception exception; + + public ExceptionResponseFactory(int statusCode, Exception exception) { + this.statusCode = statusCode; + this.exception = exception; + } + + @Override + public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl) { + MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse(); + httpResponse.setStatusCode(statusCode); + httpResponse.setContent(exception.toString().getBytes()); + httpResponse.setContentEncoding("text/plain"); + return httpResponse; + } } - private interface HttpResponseFactory { - MockLowLevelHttpResponse getHttpResponse(String httpMethod, String targetUrl); + private static class MessageResponseFactory implements HttpResponseFactory { + private final List serviceMethodDescriptors; + private final Object response; + private final String endpoint; + + public MessageResponseFactory( + String endpoint, List serviceMethodDescriptors, Object response) { + this.endpoint = endpoint; + this.serviceMethodDescriptors = ImmutableList.copyOf(serviceMethodDescriptors); + this.response = response; + } + + @Override + public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String fullTargetUrl) { + MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse(); + + String relativePath = getRelativePath(fullTargetUrl); + + for (ApiMethodDescriptor methodDescriptor : serviceMethodDescriptors) { + // Check the comment in com.google.api.gax.httpjson.HttpRequestRunnable.buildRequest() + // method for details why it is needed. + String descriptorHttpMethod = methodDescriptor.getHttpMethod(); + if (!httpMethod.equals(descriptorHttpMethod)) { + if (!(HttpMethods.PATCH.equals(descriptorHttpMethod) + && HttpMethods.POST.equals(httpMethod))) { + continue; + } + } + + PathTemplate pathTemplate = methodDescriptor.getRequestFormatter().getPathTemplate(); + // Server figures out which RPC method is called based on the endpoint path pattern. + if (!pathTemplate.matches(relativePath)) { + continue; + } + + // Emulate the server's creation of an HttpResponse from the response message + // instance. + String httpContent; + if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) { + // Quick and dirty json array construction. Good enough for + Object[] responseArray = (Object[]) response; + StringBuilder sb = new StringBuilder(); + sb.append('['); + for (Object responseElement : responseArray) { + if (sb.length() > 1) { + sb.append(','); + } + sb.append(methodDescriptor.getResponseParser().serialize(responseElement)); + } + sb.append(']'); + httpContent = sb.toString(); + } else { + httpContent = methodDescriptor.getResponseParser().serialize(response); + } + + httpResponse.setContent(httpContent.getBytes()); + httpResponse.setStatusCode(200); + return httpResponse; + } + + // Return 404 when none of this server's endpoint templates match the given URL. + httpResponse.setContent( + String.format("Method not found for path '%s'", relativePath).getBytes()); + httpResponse.setStatusCode(404); + return httpResponse; + } + + private String getRelativePath(String fullTargetUrl) { + // relativePath will be repeatedly truncated until it contains only + // the path template substring of the endpoint URL. + String relativePath = fullTargetUrl.replaceFirst(endpoint, ""); + int queryParamIndex = relativePath.indexOf("?"); + queryParamIndex = queryParamIndex < 0 ? relativePath.length() : queryParamIndex; + relativePath = relativePath.substring(0, queryParamIndex); + + return relativePath; + } } }