Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: introduce HttpJsonClientCall, Listeners infrastructure and ServerStreaming support in REST transport #1599

Merged
merged 8 commits into from
Jan 21, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testServerStreaming() throws Exception {

streamingCallable.call(DEFAULT_REQUEST, moneyObserver);

latch.await(20, TimeUnit.SECONDS);
Truth.assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
Truth.assertThat(moneyObserver.error).isNull();
Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
}
Expand All @@ -157,13 +157,13 @@ public void testManualFlowControl() throws Exception {

streamingCallable.call(DEFAULT_REQUEST, moneyObserver);

latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isFalse();
Truth.assertWithMessage("Received response before requesting it")
.that(moneyObserver.response)
.isNull();

moneyObserver.controller.request(1);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.response).isEqualTo(DEFAULT_RESPONSE);
Truth.assertThat(moneyObserver.completed).isTrue();
Expand All @@ -178,7 +178,7 @@ public void testCancelClientCall() throws Exception {

moneyObserver.controller.cancel();
moneyObserver.controller.request(1);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.error).isInstanceOf(CancellationException.class);
Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("User cancelled stream");
Expand All @@ -190,7 +190,7 @@ public void testOnResponseError() throws Throwable {
MoneyObserver moneyObserver = new MoneyObserver(true, latch);

streamingCallable.call(ERROR_REQUEST, moneyObserver);
latch.await(500, TimeUnit.MILLISECONDS);
Truth.assertThat(latch.await(500, TimeUnit.MILLISECONDS)).isTrue();

Truth.assertThat(moneyObserver.error).isInstanceOf(ApiException.class);
Truth.assertThat(((ApiException) moneyObserver.error).getStatusCode().getCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.protobuf.TypeRegistry;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

Expand Down Expand Up @@ -91,25 +92,28 @@ ApiMessageHttpResponseParser.Builder<ResponseT> newBuilder() {

@Override
public ResponseT parse(InputStream httpResponseBody) {
return parse(httpResponseBody, null);
}

@Override
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
return parse(new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), registry);
}

@Override
public ResponseT parse(Reader httpResponseBody, TypeRegistry registry) {
if (getResponseInstance() == null) {
return null;
} else {
Type responseType = getResponseInstance().getClass();
try {
return getResponseMarshaller()
.fromJson(
new InputStreamReader(httpResponseBody, StandardCharsets.UTF_8), responseType);
return getResponseMarshaller().fromJson(httpResponseBody, responseType);
} catch (JsonIOException | JsonSyntaxException e) {
throw new RestSerializationException(e);
}
}
}

@Override
public ResponseT parse(InputStream httpResponseBody, TypeRegistry registry) {
return parse(httpResponseBody);
}

@Override
public String serialize(ResponseT response) {
return getResponseMarshaller().toJson(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@
@AutoValue
/* Method descriptor for messages to be transmitted over HTTP. */
public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
public enum MethodType {
UNARY,
CLIENT_STREAMING,
SERVER_STREAMING,
BIDI_STREAMING,
UNKNOWN;
}

public abstract String getFullMethodName();

public abstract HttpRequestFormatter<RequestT> getRequestFormatter();

@Nullable
public abstract HttpResponseParser<ResponseT> getResponseParser();

/** Return the HTTP method for this request message type. */
Expand All @@ -55,8 +61,11 @@ public abstract class ApiMethodDescriptor<RequestT, ResponseT> {
@Nullable
public abstract PollingRequestFactory<RequestT> getPollingRequestFactory();

public abstract MethodType getType();

public static <RequestT, ResponseT> Builder<RequestT, ResponseT> newBuilder() {
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>();
return new AutoValue_ApiMethodDescriptor.Builder<RequestT, ResponseT>()
.setType(MethodType.UNARY);
}

@AutoValue.Builder
Expand All @@ -78,6 +87,8 @@ public abstract Builder<RequestT, ResponseT> setOperationSnapshotFactory(
public abstract Builder<RequestT, ResponseT> setPollingRequestFactory(
PollingRequestFactory<RequestT> pollingRequestFactory);

public abstract Builder<RequestT, ResponseT> setType(MethodType type);

public abstract ApiMethodDescriptor<RequestT, ResponseT> build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2022 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.httpjson;

import com.google.api.client.http.HttpResponseException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.concurrent.CancellationException;

class HttpJsonApiExceptionFactory {
private final Set<Code> retryableCodes;

HttpJsonApiExceptionFactory(Set<Code> retryableCodes) {
this.retryableCodes = ImmutableSet.copyOf(retryableCodes);
}

ApiException create(Throwable throwable) {
if (throwable instanceof HttpResponseException) {
HttpResponseException e = (HttpResponseException) throwable;
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
boolean canRetry = retryableCodes.contains(statusCode.getCode());
String message = e.getStatusMessage();
return createApiException(throwable, statusCode, message, canRetry);
} else if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException e = (StatusRuntimeException) throwable;
StatusCode statusCode = HttpJsonStatusCode.of(e.getStatusCode());
chanseokoh marked this conversation as resolved.
Show resolved Hide resolved
return createApiException(
throwable,
HttpJsonStatusCode.of(e.getStatusCode()),
chanseokoh marked this conversation as resolved.
Show resolved Hide resolved
e.getMessage(),
retryableCodes.contains(statusCode.getCode()));
} else if (throwable instanceof CancellationException) {
chanseokoh marked this conversation as resolved.
Show resolved Hide resolved
return ApiExceptionFactory.createException(
throwable, HttpJsonStatusCode.of(Code.CANCELLED), false);
} else if (throwable instanceof ApiException) {
return (ApiException) throwable;
} else {
// Do not retry on unknown throwable, even when UNKNOWN is in retryableCodes
return ApiExceptionFactory.createException(
chanseokoh marked this conversation as resolved.
Show resolved Hide resolved
throwable, HttpJsonStatusCode.of(StatusCode.Code.UNKNOWN), false);
}
}

private ApiException createApiException(
Throwable throwable, StatusCode statusCode, String message, boolean canRetry) {
return message == null
? ApiExceptionFactory.createException(throwable, statusCode, canRetry)
: ApiExceptionFactory.createException(message, throwable, statusCode, canRetry);
}
}