From 0665c2473b5b1a18061d1e58382320ae55295520 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 3 Jan 2023 18:09:47 -0500 Subject: [PATCH] fix: update gRPC object list implementation to include synthetic directories (#1824) Rework ITObjectTest.testListBlobsCurrentDirectory to have a more descriptive name along with a more straight forward implementation and assertions. (Removed all the listing retry stuff, object list is strongly consistent in GCS.) * test: remove obsolete test which is validating field mask which is tested separately --- .../google/cloud/storage/GrpcStorageImpl.java | 185 +++++++++++----- .../google/cloud/storage/it/ITKmsTest.java | 47 ---- .../google/cloud/storage/it/ITObjectTest.java | 206 ++++++++---------- 3 files changed, 227 insertions(+), 211 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index dc8e475e1..97389f926 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -92,6 +92,7 @@ import com.google.storage.v2.ListBucketsRequest; import com.google.storage.v2.ListHmacKeysRequest; import com.google.storage.v2.ListObjectsRequest; +import com.google.storage.v2.ListObjectsResponse; import com.google.storage.v2.LockBucketRetentionPolicyRequest; import com.google.storage.v2.Object; import com.google.storage.v2.ObjectAccessControl; @@ -101,8 +102,6 @@ import com.google.storage.v2.RewriteObjectRequest; import com.google.storage.v2.RewriteResponse; import com.google.storage.v2.StorageClient; -import com.google.storage.v2.StorageClient.ListObjectsPage; -import com.google.storage.v2.StorageClient.ListObjectsPagedResponse; import com.google.storage.v2.UpdateBucketRequest; import com.google.storage.v2.UpdateHmacKeyRequest; import com.google.storage.v2.UpdateObjectRequest; @@ -127,6 +126,7 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -461,8 +461,6 @@ public Page list(BucketListOption... options) { @Override public Page list(String bucket, BlobListOption... options) { - UnaryCallable listObjectsCallable = - storageClient.listObjectsPagedCallable(); Opts opts = Opts.unwrap(options); GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); @@ -470,13 +468,11 @@ public Page list(String bucket, BlobListOption... options) { ListObjectsRequest.newBuilder().setParent(bucketNameCodec.encode(bucket)); ListObjectsRequest req = opts.listObjectsRequest().apply(builder).build(); try { - ListObjectsPagedResponse call = listObjectsCallable.call(req, grpcCallContext); - ListObjectsPage page = call.getPage(); - return new TransformingPageDecorator<>( - page, - syntaxDecoders.blob.andThen(opts.clearBlobFields()), + return Retrying.run( getOptions(), - retryAlgorithmManager.getFor(req)); + retryAlgorithmManager.getFor(req), + () -> storageClient.listObjectsCallable().call(req, grpcCallContext), + resp -> new ListObjectsWithSyntheticDirectoriesPage(grpcCallContext, req, resp)); } catch (Exception e) { throw StorageException.coalesce(e); } @@ -1440,6 +1436,95 @@ private final class SyntaxDecoders { b -> codecs.bucketInfo().decode(b).asBucket(GrpcStorageImpl.this); } + /** + * Today {@link com.google.cloud.storage.spi.v1.HttpStorageRpc#list(String, Map)} creates + * synthetic objects to represent {@code prefixes} ("directories") returned as part of a list + * objects response. Specifically, a StorageObject with an `isDirectory` attribute added. + * + *

This approach is not sound, and presents an otherwise ephemeral piece of metadata as an + * actual piece of data. (A {@code prefix} is not actually an object, and therefor can't be + * queried for other object metadata.) + * + *

In an effort to preserve compatibility with the current public API, this class attempts to + * encapsulate the process of producing these Synthetic Directory Objects and lifting them into + * the Page. + * + *

This behavior should NOT be carried forward to any possible new API for the storage client. + */ + private final class ListObjectsWithSyntheticDirectoriesPage implements Page { + + private final GrpcCallContext ctx; + private final ListObjectsRequest req; + private final ListObjectsResponse resp; + + private ListObjectsWithSyntheticDirectoriesPage( + GrpcCallContext ctx, ListObjectsRequest req, ListObjectsResponse resp) { + this.ctx = ctx; + this.req = req; + this.resp = resp; + } + + @Override + public boolean hasNextPage() { + return !resp.getNextPageToken().isEmpty(); + } + + @Override + public String getNextPageToken() { + return resp.getNextPageToken(); + } + + @Override + public Page getNextPage() { + ListObjectsRequest nextPageReq = + req.toBuilder().setPageToken(resp.getNextPageToken()).build(); + try { + ListObjectsResponse nextPageResp = + Retrying.run( + GrpcStorageImpl.this.getOptions(), + retryAlgorithmManager.getFor(nextPageReq), + () -> storageClient.listObjectsCallable().call(nextPageReq, ctx), + Decoder.identity()); + return new ListObjectsWithSyntheticDirectoriesPage(ctx, nextPageReq, nextPageResp); + } catch (Exception e) { + throw StorageException.coalesce(e); + } + } + + @Override + public Iterable iterateAll() { + // drop to our interface type to help type inference below with the stream. + Page curr = this; + Predicate> exhausted = p -> p != null && p.hasNextPage(); + // Create a stream which will attempt to call getNextPage repeatedly until we meet our + // condition of exhaustion. By doing this we are able to rely on the retry logic in + // getNextPage + return () -> + streamIterate(curr, exhausted, Page::getNextPage) + .filter(Objects::nonNull) + .flatMap(p -> StreamSupport.stream(p.getValues().spliterator(), false)) + .iterator(); + } + + @Override + public Iterable getValues() { + return () -> { + String bucketName = bucketNameCodec.decode(req.getParent()); + return Streams.concat( + resp.getObjectsList().stream().map(syntaxDecoders.blob::decode), + resp.getPrefixesList().stream() + .map( + prefix -> + BlobInfo.newBuilder(bucketName, prefix) + .setSize(0L) + .setIsDirectory(true) + .build()) + .map(info -> info.asBlob(GrpcStorageImpl.this))) + .iterator(); + }; + } + } + static final class TransformingPageDecorator< RequestT, ResponseT, @@ -1511,50 +1596,50 @@ public Iterable getValues() { .map(translator::decode) .iterator(); } + } - private static Stream streamIterate( - T seed, Predicate shouldComputeNext, UnaryOperator computeNext) { - requireNonNull(seed, "seed must be non null"); - requireNonNull(shouldComputeNext, "shouldComputeNext must be non null"); - requireNonNull(computeNext, "computeNext must be non null"); - Spliterator spliterator = - new AbstractSpliterator(Long.MAX_VALUE, 0) { - T prev; - boolean started = false; - boolean done = false; - - @Override - public boolean tryAdvance(Consumer action) { - // if we haven't started, emit our seed and return - if (!started) { - started = true; - action.accept(seed); - prev = seed; + private static Stream streamIterate( + T seed, Predicate shouldComputeNext, UnaryOperator computeNext) { + requireNonNull(seed, "seed must be non null"); + requireNonNull(shouldComputeNext, "shouldComputeNext must be non null"); + requireNonNull(computeNext, "computeNext must be non null"); + Spliterator spliterator = + new AbstractSpliterator(Long.MAX_VALUE, 0) { + T prev; + boolean started = false; + boolean done = false; + + @Override + public boolean tryAdvance(Consumer action) { + // if we haven't started, emit our seed and return + if (!started) { + started = true; + action.accept(seed); + prev = seed; + return true; + } + // if we've previously finished quickly return + if (done) { + return false; + } + // test whether we should try and compute the next value + if (shouldComputeNext.test(prev)) { + // compute the next value and figure out if we can use it + T next = computeNext.apply(prev); + if (next != null) { + action.accept(next); + prev = next; return true; } - // if we've previously finished quickly return - if (done) { - return false; - } - // test whether we should try and compute the next value - if (shouldComputeNext.test(prev)) { - // compute the next value and figure out if we can use it - T next = computeNext.apply(prev); - if (next != null) { - action.accept(next); - prev = next; - return true; - } - } - - // fallthrough, if we haven't taken an action by now consider the stream done and - // return - done = true; - return false; } - }; - return StreamSupport.stream(spliterator, false); - } + + // fallthrough, if we haven't taken an action by now consider the stream done and + // return + done = true; + return false; + } + }; + return StreamSupport.stream(spliterator, false); } static T throwHttpJsonOnly(String methodName) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITKmsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITKmsTest.java index 9d335ab74..e49acf880 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITKmsTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITKmsTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import com.google.api.gax.paging.Page; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; @@ -41,19 +40,14 @@ import com.google.cloud.storage.it.runner.annotations.Backend; import com.google.cloud.storage.it.runner.annotations.CrossRun; import com.google.cloud.storage.it.runner.annotations.Inject; -import com.google.cloud.storage.it.runner.annotations.ParallelFriendly; import com.google.cloud.storage.it.runner.registry.Generator; import com.google.cloud.storage.it.runner.registry.KmsFixture; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterators; import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.ByteBuffer; import java.security.Key; -import java.util.Iterator; import java.util.Random; -import java.util.Set; import javax.crypto.spec.SecretKeySpec; import org.junit.Rule; import org.junit.Test; @@ -63,7 +57,6 @@ @CrossRun( transports = {Transport.HTTP, Transport.GRPC}, backends = Backend.PROD) -@ParallelFriendly public class ITKmsTest { private static final long seed = -7071346537822433445L; @@ -186,46 +179,6 @@ public void testGetBlobKmsKeyNameField() { assertNull(remoteBlob.getContentType()); } - @Test(timeout = 5000) - public void testListBlobsKmsKeySelectedFields() throws InterruptedException { - String[] blobNames = { - "test-list-blobs-selected-field-kms-key-name-blob1", - "test-list-blobs-selected-field-kms-key-name-blob2" - }; - BlobInfo blob1 = BlobInfo.newBuilder(bucket, blobNames[0]).setContentType(CONTENT_TYPE).build(); - BlobInfo blob2 = BlobInfo.newBuilder(bucket, blobNames[1]).setContentType(CONTENT_TYPE).build(); - Blob remoteBlob1 = - storage.create(blob1, Storage.BlobTargetOption.kmsKeyName(kms.getKey1().getName())); - Blob remoteBlob2 = - storage.create(blob2, Storage.BlobTargetOption.kmsKeyName(kms.getKey1().getName())); - assertNotNull(remoteBlob1); - assertNotNull(remoteBlob2); - Page page = - storage.list( - bucket.getName(), - Storage.BlobListOption.prefix("test-list-blobs-selected-field-kms-key-name-blob"), - Storage.BlobListOption.fields(BlobField.KMS_KEY_NAME)); - // Listing blobs is eventually consistent, we loop until the list is of the expected size. The - // test fails if timeout is reached. - while (Iterators.size(page.iterateAll().iterator()) != 2) { - Thread.sleep(500); - page = - storage.list( - bucket.getName(), - Storage.BlobListOption.prefix("test-list-blobs-selected-field-kms-key-name-blob"), - Storage.BlobListOption.fields(BlobField.KMS_KEY_NAME)); - } - Set blobSet = ImmutableSet.of(blobNames[0], blobNames[1]); - Iterator iterator = page.iterateAll().iterator(); - while (iterator.hasNext()) { - Blob remoteBlob = iterator.next(); - assertEquals(bucket.getName(), remoteBlob.getBucket()); - assertTrue(blobSet.contains(remoteBlob.getName())); - assertTrue(remoteBlob.getKmsKeyName().startsWith(kms.getKey1().getName())); - assertNull(remoteBlob.getContentType()); - } - } - @Test public void testRotateFromCustomerEncryptionToKmsKey() { String sourceBlobName = "test-copy-blob-encryption-key-source"; diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectTest.java index 206382f24..56f671f98 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectTest.java @@ -82,9 +82,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.crypto.spec.SecretKeySpec; import org.junit.Test; @@ -310,88 +313,36 @@ public void testGetBlobFailNonExistingGeneration() { } } - @Test(timeout = 5000) - public void testListBlobsSelectedFields() throws InterruptedException { + @Test + public void testListBlobsSelectedFields() { + String baseName = generator.randomObjectName(); + + String name1 = baseName + "1"; + String name2 = baseName + "2"; - String[] blobNames = { - "test-list-blobs-selected-fields-blob1", "test-list-blobs-selected-fields-blob2" - }; ImmutableMap metadata = ImmutableMap.of("k", "v"); - BlobInfo blob1 = - BlobInfo.newBuilder(bucket, blobNames[0]) - .setContentType(CONTENT_TYPE) - .setMetadata(metadata) - .build(); - BlobInfo blob2 = - BlobInfo.newBuilder(bucket, blobNames[1]) - .setContentType(CONTENT_TYPE) - .setMetadata(metadata) - .build(); + BlobInfo blob1 = BlobInfo.newBuilder(bucket, name1).setMetadata(metadata).build(); + BlobInfo blob2 = BlobInfo.newBuilder(bucket, name2).setMetadata(metadata).build(); Blob remoteBlob1 = storage.create(blob1); Blob remoteBlob2 = storage.create(blob2); - assertNotNull(remoteBlob1); - assertNotNull(remoteBlob2); - Page page = - storage.list( - bucket.getName(), - BlobListOption.prefix("test-list-blobs-selected-fields-blob"), - BlobListOption.fields(BlobField.METADATA)); - // Listing blobs is eventually consistent, we loop until the list is of the expected size. The - // test fails if timeout is reached. - while (Iterators.size(page.iterateAll().iterator()) != 2) { - Thread.sleep(500); - page = - storage.list( - bucket.getName(), - BlobListOption.prefix("test-list-blobs-selected-fields-blob"), - BlobListOption.fields(BlobField.METADATA)); - } - Set blobSet = ImmutableSet.of(blobNames[0], blobNames[1]); - Iterator iterator = page.iterateAll().iterator(); - while (iterator.hasNext()) { - Blob remoteBlob = iterator.next(); - assertEquals(bucket.getName(), remoteBlob.getBucket()); - assertTrue(blobSet.contains(remoteBlob.getName())); - assertEquals(metadata, remoteBlob.getMetadata()); - assertNull(remoteBlob.getContentType()); - } - } - @Test(timeout = 5000) - public void testListBlobsEmptySelectedFields() throws InterruptedException { + ImmutableSet> expected = + Stream.of(remoteBlob1, remoteBlob2) + .map(BlobInfo::getMetadata) + .collect(ImmutableSet.toImmutableSet()); - String[] blobNames = { - "test-list-blobs-empty-selected-fields-blob1", "test-list-blobs-empty-selected-fields-blob2" - }; - BlobInfo blob1 = BlobInfo.newBuilder(bucket, blobNames[0]).setContentType(CONTENT_TYPE).build(); - BlobInfo blob2 = BlobInfo.newBuilder(bucket, blobNames[1]).setContentType(CONTENT_TYPE).build(); - Blob remoteBlob1 = storage.create(blob1); - Blob remoteBlob2 = storage.create(blob2); - assertNotNull(remoteBlob1); - assertNotNull(remoteBlob2); Page page = storage.list( bucket.getName(), - BlobListOption.prefix("test-list-blobs-empty-selected-fields-blob"), - BlobListOption.fields()); - // Listing blobs is eventually consistent, we loop until the list is of the expected size. The - // test fails if timeout is reached. - while (Iterators.size(page.iterateAll().iterator()) != 2) { - Thread.sleep(500); - page = - storage.list( - bucket.getName(), - BlobListOption.prefix("test-list-blobs-empty-selected-fields-blob"), - BlobListOption.fields()); - } - Set blobSet = ImmutableSet.of(blobNames[0], blobNames[1]); - Iterator iterator = page.iterateAll().iterator(); - while (iterator.hasNext()) { - Blob remoteBlob = iterator.next(); - assertEquals(bucket.getName(), remoteBlob.getBucket()); - assertTrue(blobSet.contains(remoteBlob.getName())); - assertNull(remoteBlob.getContentType()); - } + Storage.BlobListOption.prefix(baseName), + Storage.BlobListOption.fields(BlobField.METADATA)); + + ImmutableSet blobs = ImmutableSet.copyOf(page.iterateAll()); + + ImmutableSet> actual = + blobs.stream().map(BlobInfo::getMetadata).collect(ImmutableSet.toImmutableSet()); + + assertThat(actual).isEqualTo(expected); } @Test @@ -545,49 +496,76 @@ public void testListBlobsWithOffset() throws ExecutionException, InterruptedExce } } - @Test(timeout = 5000) - // This test is currently timing out for GRPC - @Exclude(transports = Transport.GRPC) - public void testListBlobsCurrentDirectory() throws InterruptedException { - - String directoryName = "test-list-blobs-current-directory/"; - String subdirectoryName = "subdirectory/"; - String[] blobNames = {directoryName + subdirectoryName + "blob1", directoryName + "blob2"}; - BlobInfo blob1 = BlobInfo.newBuilder(bucket, blobNames[0]).setContentType(CONTENT_TYPE).build(); - BlobInfo blob2 = BlobInfo.newBuilder(bucket, blobNames[1]).setContentType(CONTENT_TYPE).build(); - Blob remoteBlob1 = storage.create(blob1, BLOB_BYTE_CONTENT); - Blob remoteBlob2 = storage.create(blob2, BLOB_BYTE_CONTENT); - assertNotNull(remoteBlob1); - assertNotNull(remoteBlob2); - Page page = + @Test + public void testListBlobsCurrentDirectoryIncludesBothObjectsAndSyntheticDirectories() { + String bucketName = bucket.getName(); + String directoryName = generator.randomObjectName(); + String subdirectoryName = "subdirectory"; + + String uriSubDir = String.format("gs://%s/%s/%s/", bucketName, directoryName, subdirectoryName); + String uri1 = String.format("gs://%s/%s/%s/blob1", bucketName, directoryName, subdirectoryName); + String uri2 = String.format("gs://%s/%s/blob2", bucketName, directoryName); + + BlobId id1 = BlobId.fromGsUtilUri(uri1); + BlobId id2 = BlobId.fromGsUtilUri(uri2); + BlobId idSubDir = BlobId.fromGsUtilUri(uriSubDir); + + BlobInfo blob1 = BlobInfo.newBuilder(id1).build(); + BlobInfo blob2 = BlobInfo.newBuilder(id2).build(); + BlobInfo obj1Gen1 = storage.create(blob1, BLOB_BYTE_CONTENT).asBlobInfo(); + BlobInfo obj2Gen1 = storage.create(blob2, BLOB_BYTE_CONTENT).asBlobInfo(); + + Page page1 = storage.list( - bucket.getName(), - BlobListOption.prefix("test-list-blobs-current-directory/"), + bucketName, + BlobListOption.prefix(directoryName + "/"), BlobListOption.currentDirectory()); - // Listing blobs is eventually consistent, we loop until the list is of the expected size. The - // test fails if timeout is reached. - while (Iterators.size(page.iterateAll().iterator()) != 2) { - Thread.sleep(500); - page = - storage.list( - bucket.getName(), - BlobListOption.prefix("test-list-blobs-current-directory/"), - BlobListOption.currentDirectory()); - } - Iterator iterator = page.iterateAll().iterator(); - while (iterator.hasNext()) { - Blob remoteBlob = iterator.next(); - assertEquals(bucket.getName(), remoteBlob.getBucket()); - if (remoteBlob.getName().equals(blobNames[1])) { - assertEquals(CONTENT_TYPE, remoteBlob.getContentType()); - assertEquals(BLOB_BYTE_CONTENT.length, (long) remoteBlob.getSize()); - assertFalse(remoteBlob.isDirectory()); - } else if (remoteBlob.getName().equals(directoryName + subdirectoryName)) { - assertEquals(0L, (long) remoteBlob.getSize()); - assertTrue(remoteBlob.isDirectory()); - } else { - fail("Unexpected blob with name " + remoteBlob.getName()); - } + + ImmutableList blobs = ImmutableList.copyOf(page1.iterateAll()); + + ImmutableSet actual = + blobs.stream() + .map(Blob::asBlobInfo) + .map(info -> PackagePrivateMethodWorkarounds.noAcl(info)) + .collect(ImmutableSet.toImmutableSet()); + + // obj1Gen1 is "in subdirectory" and we don't expect to receive it as a result when listing + // object in "the current directory" + assertThat(actual).doesNotContain(obj1Gen1); + + // make sure one of the results we received is the "subdirectory" blob1 is "in" + Optional first = actual.stream().filter(BlobInfo::isDirectory).findFirst(); + assertThat(first.isPresent()).isTrue(); + assertThat(first.get().getBlobId()).isEqualTo(idSubDir); + + assertThat(actual).contains(PackagePrivateMethodWorkarounds.noAcl(obj2Gen1)); + } + + @Test + public void testListBlobsMultiplePages() { + String basePath = generator.randomObjectName(); + + ImmutableList expected = + IntStream.rangeClosed(1, 10) + .mapToObj(i -> String.format("%s/%2d", basePath, i)) + .map(name -> BlobInfo.newBuilder(bucket, name).build()) + .map(info -> storage.create(info, BlobTargetOption.doesNotExist())) + .map(info1 -> PackagePrivateMethodWorkarounds.noAcl(info1)) + .collect(ImmutableList.toImmutableList()); + + Page page = + storage.list(bucket.getName(), BlobListOption.prefix(basePath), BlobListOption.pageSize(3)); + + ImmutableList actual = + ImmutableList.copyOf(page.iterateAll()).stream() + .map(info -> PackagePrivateMethodWorkarounds.noAcl(info)) + .collect(ImmutableList.toImmutableList()); + + try { + assertThat(actual).isEqualTo(expected); + } finally { + // delete all the objects we created + expected.stream().map(BlobInfo::getBlobId).forEach(storage::delete); } }