Skip to content

Commit

Permalink
fix: update request method of HttpStorageRpc to properly configure of…
Browse files Browse the repository at this point in the history
…fset on requests (#1434)

* fix: update request method of HttpStorageRpc to properly configure offset on requests

When invoking downloadTo(..., OutputStream) if a retry was attempted the proper byte
offset was not being sent in the retried request. Update logic of HttpStorageRpc.read
to manually set the range header rather than trying to rely on MediaDownloader to do
it along with not automatically decompressing the byte stream.

Update ITRetryConformanceTest to run Scenario 8 test cases, which cover resuming a
download which could have caught this error sooner.

Update StorageException.translate(IOException) to classify `IOException: Premature EOF`
as the existing retryable reason `connectionClosedPrematurely`. Add case to
DefaultRetryHandlingBehaviorTest to ensure conformance to this categorization.

Break downloadTo integration test out into their own class, and separate
the multiple scenarios being tested in the same method.

Related to #1425
  • Loading branch information
BenWhitehead committed Jun 8, 2022
1 parent eac03a8 commit 72dc0df
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 72 deletions.
Expand Up @@ -108,9 +108,11 @@ static BaseServiceException coalesce(Throwable t) {
* @returns {@code StorageException}
*/
public static StorageException translate(IOException exception) {
if (exception.getMessage().contains("Connection closed prematurely")) {
return new StorageException(
0, exception.getMessage(), CONNECTION_CLOSED_PREMATURELY, exception);
String message = exception.getMessage();
if (message != null
&& (message.contains("Connection closed prematurely")
|| message.contains("Premature EOF"))) {
return new StorageException(0, message, CONNECTION_CLOSED_PREMATURELY, exception);
} else {
// default
return new StorageException(exception);
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.media.MediaHttpDownloader;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.EmptyContent;
import com.google.api.client.http.GenericUrl;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void onFailure(GoogleJsonError googleJsonError, HttpHeaders httpHeaders)
}

private static StorageException translate(IOException exception) {
return new StorageException(exception);
return StorageException.translate(exception);
}

private static StorageException translate(GoogleJsonError exception) {
Expand Down Expand Up @@ -750,10 +751,14 @@ public long read(
} else {
req.setReturnRawInputStream(false);
}
req.getMediaHttpDownloader().setBytesDownloaded(position);
req.getMediaHttpDownloader().setDirectDownloadEnabled(true);

if (position > 0) {
req.getRequestHeaders().setRange(String.format("bytes=%d-", position));
}
MediaHttpDownloader mediaHttpDownloader = req.getMediaHttpDownloader();
mediaHttpDownloader.setDirectDownloadEnabled(true);
req.executeMedia().download(outputStream);
return req.getMediaHttpDownloader().getNumBytesDownloaded();
return mediaHttpDownloader.getNumBytesDownloaded();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.junit.Assert.fail;

import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.io.JsonEOFException;
Expand Down Expand Up @@ -96,6 +97,7 @@ public void validateBehavior() {
} else if (shouldRetry && !defaultShouldRetryResult && !legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are rejecting when we want a retry";
fail(message);
} else if (shouldRetry && defaultShouldRetryResult && legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are allowing";
Expand All @@ -111,6 +113,7 @@ public void validateBehavior() {
} else if (!shouldRetry && defaultShouldRetryResult && legacyShouldRetryResult) {
actualBehavior = Behavior.SAME;
message = "both are too permissive";
fail(message);
} else if (!shouldRetry && defaultShouldRetryResult && !legacyShouldRetryResult) {
actualBehavior = Behavior.DEFAULT_MORE_PERMISSIBLE;
message = "default is too permissive";
Expand Down Expand Up @@ -298,7 +301,7 @@ enum ThrowableCategory {
STORAGE_EXCEPTION_GOOGLE_JSON_ERROR_503(new StorageException(C.JSON_503)),
STORAGE_EXCEPTION_GOOGLE_JSON_ERROR_504(new StorageException(C.JSON_504)),
STORAGE_EXCEPTION_SOCKET_TIMEOUT_EXCEPTION(new StorageException(C.SOCKET_TIMEOUT_EXCEPTION)),
STORAGE_EXCEPTION_SOCKET_EXCEPTION(new StorageException(C.SOCKET_EXCEPTION)),
STORAGE_EXCEPTION_SOCKET_EXCEPTION(StorageException.translate(C.SOCKET_EXCEPTION)),
STORAGE_EXCEPTION_SSL_EXCEPTION(new StorageException(C.SSL_EXCEPTION)),
STORAGE_EXCEPTION_SSL_EXCEPTION_CONNECTION_SHUTDOWN(
new StorageException(C.SSL_EXCEPTION_CONNECTION_SHUTDOWN)),
Expand All @@ -322,6 +325,9 @@ enum ThrowableCategory {
"connectionClosedPrematurely",
"connectionClosedPrematurely",
C.CONNECTION_CLOSED_PREMATURELY)),
STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON(
StorageException.translate(C.CONNECTION_CLOSED_PREMATURELY)),
STORAGE_EXCEPTION_0_IO_PREMATURE_EOF(StorageException.translate(C.IO_PREMATURE_EOF)),
EMPTY_JSON_PARSE_ERROR(new IllegalArgumentException("no JSON input found")),
JACKSON_EOF_EXCEPTION(C.JACKSON_EOF_EXCEPTION),
STORAGE_EXCEPTION_0_JACKSON_EOF_EXCEPTION(
Expand Down Expand Up @@ -400,6 +406,7 @@ private static final class C {
new JsonEOFException(null, JsonToken.VALUE_STRING, "parse-exception");
private static final MalformedJsonException GSON_MALFORMED_EXCEPTION =
new MalformedJsonException("parse-exception");
private static final IOException IO_PREMATURE_EOF = new IOException("Premature EOF");

private static HttpResponseException newHttpResponseException(
int httpStatusCode, String name) {
Expand Down Expand Up @@ -919,6 +926,28 @@ private static ImmutableList<Case> getAllCases() {
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory
.STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON,
HandlerCategory.IDEMPOTENT,
ExpectRetry.YES,
Behavior.SAME),
new Case(
ThrowableCategory
.STORAGE_EXCEPTION_0_CONNECTION_CLOSED_PREMATURELY_IO_CAUSE_NO_REASON,
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_IO_PREMATURE_EOF,
HandlerCategory.IDEMPOTENT,
ExpectRetry.YES,
Behavior.SAME),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_IO_PREMATURE_EOF,
HandlerCategory.NONIDEMPOTENT,
ExpectRetry.NO,
Behavior.DEFAULT_MORE_STRICT),
new Case(
ThrowableCategory.STORAGE_EXCEPTION_0_INTERNAL_ERROR,
HandlerCategory.IDEMPOTENT,
Expand Down
Expand Up @@ -154,7 +154,7 @@ public static Collection<Object[]> testCases() throws IOException {
.and(
(m, trc) ->
trc.getScenarioId()
< 7) // Temporarily exclude resumable media scenarios
!= 7) // Temporarily exclude resumable upload scenarios
)
.build();

Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.defaultSetup;
import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.serviceAccount;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -66,6 +67,7 @@
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.ObjectAcl;
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.Objects;
import com.google.cloud.storage.conformance.retry.RpcMethodMappings.Mappings.ServiceAccount;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
Expand Down Expand Up @@ -108,6 +110,11 @@
final class RpcMethodMappings {
private static final Logger LOGGER = Logger.getLogger(RpcMethodMappings.class.getName());

private static final Predicate<TestRetryConformance> groupIsDownload =
methodGroupIs("storage.objects.download");
private static final Predicate<TestRetryConformance> groupIsResumableUpload =
methodGroupIs("storage.resumable.upload");

static final int _2MiB = 2 * 1024 * 1024;
final Multimap<RpcMethod, RpcMethodMapping> funcMap;

Expand Down Expand Up @@ -1079,7 +1086,8 @@ private static void delete(ArrayList<RpcMethodMapping> a) {
private static void get(ArrayList<RpcMethodMapping> a) {
a.add(
RpcMethodMapping.newBuilder(39, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithoutGeneration))
.withTest(
(ctx, c) ->
Expand All @@ -1088,13 +1096,15 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(239, objects.get)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsDownload)))
.withTest(
(ctx, c) ->
ctx.peek(state -> ctx.getStorage().get(state.getBlob().getBlobId())))
.build());
a.add(
RpcMethodMapping.newBuilder(40, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.map(
Expand All @@ -1108,6 +1118,7 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(41, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.map(
Expand Down Expand Up @@ -1196,7 +1207,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(60, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withTest((ctx, c) -> ctx.peek(state -> assertTrue(state.getBlob().exists())))
.build());
a.add(
Expand Down Expand Up @@ -1297,10 +1309,12 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(75, objects.get)
.withApplicable(not(groupIsDownload))
.withTest((ctx, c) -> ctx.peek(state -> state.getBlob().reload()))
.build());
a.add(
RpcMethodMapping.newBuilder(76, objects.get)
.withApplicable(not(groupIsDownload))
.withTest(
(ctx, c) ->
ctx.peek(
Expand All @@ -1311,7 +1325,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(107, objects.get)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withApplicable(
and(not(TestRetryConformance::isPreconditionsProvided), not(groupIsDownload)))
.withTest(
(ctx, c) ->
ctx.map(state -> state.with(state.getBucket().get(c.getObjectName()))))
Expand All @@ -1321,7 +1336,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
private static void insert(ArrayList<RpcMethodMapping> a) {
a.add(
RpcMethodMapping.newBuilder(46, objects.insert)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsResumableUpload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithGenerationZero))
.withTest(
(ctx, c) ->
Expand All @@ -1336,7 +1352,8 @@ private static void insert(ArrayList<RpcMethodMapping> a) {
.build());
a.add(
RpcMethodMapping.newBuilder(47, objects.insert)
.withApplicable(TestRetryConformance::isPreconditionsProvided)
.withApplicable(
and(TestRetryConformance::isPreconditionsProvided, not(groupIsResumableUpload)))
.withSetup(defaultSetup.andThen(Local.blobInfoWithGenerationZero))
.withTest(
(ctx, c) ->
Expand Down Expand Up @@ -1932,4 +1949,8 @@ private static void get(ArrayList<RpcMethodMapping> a) {
private static void put(ArrayList<RpcMethodMapping> a) {}
}
}

private static Predicate<TestRetryConformance> methodGroupIs(String s) {
return (c) -> s.equals(c.getMethod().getGroup());
}
}
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
import com.google.common.base.Joiner;
import com.google.common.base.Suppliers;
import com.google.errorprone.annotations.Immutable;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -37,7 +38,9 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* An individual resolved test case correlating config from {@link
Expand All @@ -58,13 +61,16 @@ final class TestRetryConformance {
BASE_ID = formatter.format(now).replaceAll("[:]", "").substring(0, 6);
}

private static final int _512KiB = 512 * 1024;
private static final int _8MiB = 8 * 1024 * 1024;

private final String projectId;
private final String bucketName;
private final String bucketName2;
private final String userProject;
private final String objectName;

private final byte[] helloWorldUtf8Bytes = "Hello, World!!!".getBytes(StandardCharsets.UTF_8);
private final Supplier<byte[]> lazyHelloWorldUtf8Bytes;
private final Path helloWorldFilePath = resolvePathForResource();
private final ServiceAccountCredentials serviceAccountCredentials =
resolveServiceAccountCredentials();
Expand Down Expand Up @@ -126,6 +132,33 @@ final class TestRetryConformance {
String.format(
"%s_s%03d-%s-m%03d_obj1",
BASE_ID, scenarioId, instructionsString.toLowerCase(), mappingId);
lazyHelloWorldUtf8Bytes =
Suppliers.memoize(
() -> {
// define a lazy supplier for bytes.
// Not all tests need data for an object, though some tests - resumable upload - needs
// more than 8MiB.
// We want to avoid allocating 8.1MiB for each test unnecessarily, especially since we
// instantiate all permuted test cases. ~1000 * 8.1MiB ~~ > 8GiB.
String helloWorld = "Hello, World!";
int baseDataSize;
switch (method.getName()) {
case "storage.objects.insert":
baseDataSize = _8MiB + 1;
break;
case "storage.objects.get":
baseDataSize = _512KiB;
break;
default:
baseDataSize = helloWorld.length();
break;
}
int endInclusive = (baseDataSize / helloWorld.length());
return IntStream.rangeClosed(1, endInclusive)
.mapToObj(i -> helloWorld)
.collect(Collectors.joining())
.getBytes(StandardCharsets.UTF_8);
});
}

public String getProjectId() {
Expand Down Expand Up @@ -153,7 +186,7 @@ public String getObjectName() {
}

public byte[] getHelloWorldUtf8Bytes() {
return helloWorldUtf8Bytes;
return lazyHelloWorldUtf8Bytes.get();
}

public Path getHelloWorldFilePath() {
Expand Down

0 comments on commit 72dc0df

Please sign in to comment.