Skip to content

Commit

Permalink
fix: update gRPC object list implementation to include synthetic dire…
Browse files Browse the repository at this point in the history
…ctories (#1824)

Rework ITObjectTest.testListBlobsCurrentDirectory to have a more descriptive name
along with a more straight forward implementation and assertions. (Removed all the
listing retry stuff, object list is strongly consistent in GCS.)

* test: remove obsolete test which is validating field mask which is tested separately
  • Loading branch information
BenWhitehead committed Jan 3, 2023
1 parent a3b46cf commit 0665c24
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 211 deletions.
Expand Up @@ -92,6 +92,7 @@
import com.google.storage.v2.ListBucketsRequest;
import com.google.storage.v2.ListHmacKeysRequest;
import com.google.storage.v2.ListObjectsRequest;
import com.google.storage.v2.ListObjectsResponse;
import com.google.storage.v2.LockBucketRetentionPolicyRequest;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectAccessControl;
Expand All @@ -101,8 +102,6 @@
import com.google.storage.v2.RewriteObjectRequest;
import com.google.storage.v2.RewriteResponse;
import com.google.storage.v2.StorageClient;
import com.google.storage.v2.StorageClient.ListObjectsPage;
import com.google.storage.v2.StorageClient.ListObjectsPagedResponse;
import com.google.storage.v2.UpdateBucketRequest;
import com.google.storage.v2.UpdateHmacKeyRequest;
import com.google.storage.v2.UpdateObjectRequest;
Expand All @@ -127,6 +126,7 @@
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -461,22 +461,18 @@ public Page<Bucket> list(BucketListOption... options) {

@Override
public Page<Blob> list(String bucket, BlobListOption... options) {
UnaryCallable<ListObjectsRequest, ListObjectsPagedResponse> listObjectsCallable =
storageClient.listObjectsPagedCallable();
Opts<ObjectListOpt> opts = Opts.unwrap(options);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
ListObjectsRequest.Builder builder =
ListObjectsRequest.newBuilder().setParent(bucketNameCodec.encode(bucket));
ListObjectsRequest req = opts.listObjectsRequest().apply(builder).build();
try {
ListObjectsPagedResponse call = listObjectsCallable.call(req, grpcCallContext);
ListObjectsPage page = call.getPage();
return new TransformingPageDecorator<>(
page,
syntaxDecoders.blob.andThen(opts.clearBlobFields()),
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req));
retryAlgorithmManager.getFor(req),
() -> storageClient.listObjectsCallable().call(req, grpcCallContext),
resp -> new ListObjectsWithSyntheticDirectoriesPage(grpcCallContext, req, resp));
} catch (Exception e) {
throw StorageException.coalesce(e);
}
Expand Down Expand Up @@ -1440,6 +1436,95 @@ private final class SyntaxDecoders {
b -> codecs.bucketInfo().decode(b).asBucket(GrpcStorageImpl.this);
}

/**
* Today {@link com.google.cloud.storage.spi.v1.HttpStorageRpc#list(String, Map)} creates
* synthetic objects to represent {@code prefixes} ("directories") returned as part of a list
* objects response. Specifically, a StorageObject with an `isDirectory` attribute added.
*
* <p>This approach is not sound, and presents an otherwise ephemeral piece of metadata as an
* actual piece of data. (A {@code prefix} is not actually an object, and therefor can't be
* queried for other object metadata.)
*
* <p>In an effort to preserve compatibility with the current public API, this class attempts to
* encapsulate the process of producing these Synthetic Directory Objects and lifting them into
* the Page.
*
* <p>This behavior should NOT be carried forward to any possible new API for the storage client.
*/
private final class ListObjectsWithSyntheticDirectoriesPage implements Page<Blob> {

private final GrpcCallContext ctx;
private final ListObjectsRequest req;
private final ListObjectsResponse resp;

private ListObjectsWithSyntheticDirectoriesPage(
GrpcCallContext ctx, ListObjectsRequest req, ListObjectsResponse resp) {
this.ctx = ctx;
this.req = req;
this.resp = resp;
}

@Override
public boolean hasNextPage() {
return !resp.getNextPageToken().isEmpty();
}

@Override
public String getNextPageToken() {
return resp.getNextPageToken();
}

@Override
public Page<Blob> getNextPage() {
ListObjectsRequest nextPageReq =
req.toBuilder().setPageToken(resp.getNextPageToken()).build();
try {
ListObjectsResponse nextPageResp =
Retrying.run(
GrpcStorageImpl.this.getOptions(),
retryAlgorithmManager.getFor(nextPageReq),
() -> storageClient.listObjectsCallable().call(nextPageReq, ctx),
Decoder.identity());
return new ListObjectsWithSyntheticDirectoriesPage(ctx, nextPageReq, nextPageResp);
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public Iterable<Blob> iterateAll() {
// drop to our interface type to help type inference below with the stream.
Page<Blob> curr = this;
Predicate<Page<Blob>> exhausted = p -> p != null && p.hasNextPage();
// Create a stream which will attempt to call getNextPage repeatedly until we meet our
// condition of exhaustion. By doing this we are able to rely on the retry logic in
// getNextPage
return () ->
streamIterate(curr, exhausted, Page::getNextPage)
.filter(Objects::nonNull)
.flatMap(p -> StreamSupport.stream(p.getValues().spliterator(), false))
.iterator();
}

@Override
public Iterable<Blob> getValues() {
return () -> {
String bucketName = bucketNameCodec.decode(req.getParent());
return Streams.concat(
resp.getObjectsList().stream().map(syntaxDecoders.blob::decode),
resp.getPrefixesList().stream()
.map(
prefix ->
BlobInfo.newBuilder(bucketName, prefix)
.setSize(0L)
.setIsDirectory(true)
.build())
.map(info -> info.asBlob(GrpcStorageImpl.this)))
.iterator();
};
}
}

static final class TransformingPageDecorator<
RequestT,
ResponseT,
Expand Down Expand Up @@ -1511,50 +1596,50 @@ public Iterable<ModelT> getValues() {
.map(translator::decode)
.iterator();
}
}

private static <T> Stream<T> streamIterate(
T seed, Predicate<? super T> shouldComputeNext, UnaryOperator<T> computeNext) {
requireNonNull(seed, "seed must be non null");
requireNonNull(shouldComputeNext, "shouldComputeNext must be non null");
requireNonNull(computeNext, "computeNext must be non null");
Spliterator<T> spliterator =
new AbstractSpliterator<T>(Long.MAX_VALUE, 0) {
T prev;
boolean started = false;
boolean done = false;

@Override
public boolean tryAdvance(Consumer<? super T> action) {
// if we haven't started, emit our seed and return
if (!started) {
started = true;
action.accept(seed);
prev = seed;
private static <T> Stream<T> streamIterate(
T seed, Predicate<? super T> shouldComputeNext, UnaryOperator<T> computeNext) {
requireNonNull(seed, "seed must be non null");
requireNonNull(shouldComputeNext, "shouldComputeNext must be non null");
requireNonNull(computeNext, "computeNext must be non null");
Spliterator<T> spliterator =
new AbstractSpliterator<T>(Long.MAX_VALUE, 0) {
T prev;
boolean started = false;
boolean done = false;

@Override
public boolean tryAdvance(Consumer<? super T> action) {
// if we haven't started, emit our seed and return
if (!started) {
started = true;
action.accept(seed);
prev = seed;
return true;
}
// if we've previously finished quickly return
if (done) {
return false;
}
// test whether we should try and compute the next value
if (shouldComputeNext.test(prev)) {
// compute the next value and figure out if we can use it
T next = computeNext.apply(prev);
if (next != null) {
action.accept(next);
prev = next;
return true;
}
// if we've previously finished quickly return
if (done) {
return false;
}
// test whether we should try and compute the next value
if (shouldComputeNext.test(prev)) {
// compute the next value and figure out if we can use it
T next = computeNext.apply(prev);
if (next != null) {
action.accept(next);
prev = next;
return true;
}
}

// fallthrough, if we haven't taken an action by now consider the stream done and
// return
done = true;
return false;
}
};
return StreamSupport.stream(spliterator, false);
}

// fallthrough, if we haven't taken an action by now consider the stream done and
// return
done = true;
return false;
}
};
return StreamSupport.stream(spliterator, false);
}

static <T> T throwHttpJsonOnly(String methodName) {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.paging.Page;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
Expand All @@ -41,19 +40,14 @@
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.CrossRun;
import com.google.cloud.storage.it.runner.annotations.Inject;
import com.google.cloud.storage.it.runner.annotations.ParallelFriendly;
import com.google.cloud.storage.it.runner.registry.Generator;
import com.google.cloud.storage.it.runner.registry.KmsFixture;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.io.BaseEncoding;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Key;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import javax.crypto.spec.SecretKeySpec;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -63,7 +57,6 @@
@CrossRun(
transports = {Transport.HTTP, Transport.GRPC},
backends = Backend.PROD)
@ParallelFriendly
public class ITKmsTest {

private static final long seed = -7071346537822433445L;
Expand Down Expand Up @@ -186,46 +179,6 @@ public void testGetBlobKmsKeyNameField() {
assertNull(remoteBlob.getContentType());
}

@Test(timeout = 5000)
public void testListBlobsKmsKeySelectedFields() throws InterruptedException {
String[] blobNames = {
"test-list-blobs-selected-field-kms-key-name-blob1",
"test-list-blobs-selected-field-kms-key-name-blob2"
};
BlobInfo blob1 = BlobInfo.newBuilder(bucket, blobNames[0]).setContentType(CONTENT_TYPE).build();
BlobInfo blob2 = BlobInfo.newBuilder(bucket, blobNames[1]).setContentType(CONTENT_TYPE).build();
Blob remoteBlob1 =
storage.create(blob1, Storage.BlobTargetOption.kmsKeyName(kms.getKey1().getName()));
Blob remoteBlob2 =
storage.create(blob2, Storage.BlobTargetOption.kmsKeyName(kms.getKey1().getName()));
assertNotNull(remoteBlob1);
assertNotNull(remoteBlob2);
Page<Blob> page =
storage.list(
bucket.getName(),
Storage.BlobListOption.prefix("test-list-blobs-selected-field-kms-key-name-blob"),
Storage.BlobListOption.fields(BlobField.KMS_KEY_NAME));
// Listing blobs is eventually consistent, we loop until the list is of the expected size. The
// test fails if timeout is reached.
while (Iterators.size(page.iterateAll().iterator()) != 2) {
Thread.sleep(500);
page =
storage.list(
bucket.getName(),
Storage.BlobListOption.prefix("test-list-blobs-selected-field-kms-key-name-blob"),
Storage.BlobListOption.fields(BlobField.KMS_KEY_NAME));
}
Set<String> blobSet = ImmutableSet.of(blobNames[0], blobNames[1]);
Iterator<Blob> iterator = page.iterateAll().iterator();
while (iterator.hasNext()) {
Blob remoteBlob = iterator.next();
assertEquals(bucket.getName(), remoteBlob.getBucket());
assertTrue(blobSet.contains(remoteBlob.getName()));
assertTrue(remoteBlob.getKmsKeyName().startsWith(kms.getKey1().getName()));
assertNull(remoteBlob.getContentType());
}
}

@Test
public void testRotateFromCustomerEncryptionToKmsKey() {
String sourceBlobName = "test-copy-blob-encryption-key-source";
Expand Down

0 comments on commit 0665c24

Please sign in to comment.