From 3bf60264f47aad8101bb4b4cff9cc0449cf1c4f3 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 28 Mar 2024 12:02:27 -0400 Subject: [PATCH] feat: port ParallelCompositeUploadBlobWriteSessionConfig to work with HttpStorageOptions (#2474) --- .../storage/BlobWriteSessionConfigs.java | 4 +- ...CompositeUploadBlobWriteSessionConfig.java | 4 +- ...lelCompositeUploadWritableByteChannel.java | 10 +- .../storage/RewindableContentInputStream.java | 91 ++++++++++++++++++ .../com/google/cloud/storage/StorageImpl.java | 68 +++++++++++++ .../cloud/storage/spi/v1/HttpStorageRpc.java | 17 ++-- .../RewindableContentInputStreamTest.java | 96 +++++++++++++++++++ ...ositeUploadBlobWriteSessionConfigTest.java | 9 +- 8 files changed, 286 insertions(+), 13 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContentInputStream.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentInputStreamTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index 43da35cc6..9e9479e14 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -142,7 +142,7 @@ * Break the stream of bytes into smaller part objects uploading each part in parallel. Then * composing the parts together to make the ultimate object. * - * gRPC + * gRPC, HTTP * *
    *
  1. @@ -342,7 +342,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection paths * @since 2.28.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - @TransportCompatibility({Transport.GRPC}) + @TransportCompatibility({Transport.GRPC, Transport.HTTP}) public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() { return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults(); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java index 07551abe4..41f52c095 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java @@ -118,9 +118,9 @@ */ @Immutable @BetaApi -@TransportCompatibility({Transport.GRPC}) +@TransportCompatibility({Transport.GRPC, Transport.HTTP}) public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig - implements BlobWriteSessionConfig.GrpcCompatible { + implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible { private static final int MAX_PARTS_PER_COMPOSE = 32; private final int maxPartsPerCompose; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java index 639802cde..a0dce2cfe 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java @@ -431,6 +431,10 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o PART_INDEX.appendTo(partRange, builder); OBJECT_OFFSET.appendTo(offset, builder); b.setMetadata(builder.build()); + // the value of a kms key name will contain the exact version when read from gcs + // however, gcs will not accept that version resource identifier when creating a new object + // strip it out, so it can be included as a query string parameter instead + b.setKmsKeyName(null); b = partMetadataFieldDecorator.apply(b); return b.build(); } @@ -507,7 +511,11 @@ private ApiFuture deleteAsync(BlobId id) { @VisibleForTesting @NonNull static Opts getPartOpts(Opts opts) { - return opts.filter(TO_EXCLUDE_FROM_PARTS).prepend(DOES_NOT_EXIST); + return opts.filter(TO_EXCLUDE_FROM_PARTS) + .prepend(DOES_NOT_EXIST) + // disable gzip transfer encoding for HTTP, it causes a significant bottleneck uploading + // the parts + .prepend(Opts.from(UnifiedOpts.disableGzipContent())); } @VisibleForTesting diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContentInputStream.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContentInputStream.java new file mode 100644 index 000000000..0d2ff144b --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContentInputStream.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; + +/** + * Facade which makes an instance of {@link RewindableContent} appear as an input stream. + * + *

    It does this by calling {@link RewindableContent#writeTo(GatheringByteChannel)} on an + * anonymous channel which closes over the read destination. + */ +final class RewindableContentInputStream extends InputStream { + + private final RewindableContent content; + + RewindableContentInputStream(RewindableContent content) { + this.content = content; + } + + @Override + public int read() throws IOException { + byte[] tmp = new byte[1]; + int read = read(tmp); + if (read == -1) { + return -1; + } else { + return tmp[0] & 0xFF; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + // define a byte buffer as the destination for our write + ByteBuffer dst = ByteBuffer.wrap(b, off, len); + int remaining = dst.remaining(); + if (remaining == 0) { + return 0; + } + long written = + content.writeTo( + new AnonWritableByteChannel() { + @Override + public long write(ByteBuffer[] srcs, int offset, int length) { + // srcs here is the bytes of content + long total = 0; + for (int i = offset; i < length; i++) { + ByteBuffer src = srcs[i]; + // copy what we can from our src to the dst buffer + long written = Buffers.copy(src, dst); + total += written; + } + return total; + } + }); + // if the dst has space, but we didn't write anything means we didn't have anything to write + if (written == 0) { + return -1; + } + return Math.toIntExact(written); + } + + private abstract static class AnonWritableByteChannel implements UnbufferedWritableByteChannel { + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index ea9e62f07..5884a51e5 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -1694,4 +1694,72 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts, ByteBuffer buf) { + + BlobInfo.Builder builder = + info.toBuilder() + .setMd5( + BaseEncoding.base64().encode(Hashing.md5().hashBytes(buf.duplicate()).asBytes())) + .setCrc32c( + BaseEncoding.base64() + .encode(Ints.toByteArray(Hashing.crc32c().hashBytes(buf.duplicate()).asInt()))); + final Map optionsMap = opts.getRpcOptions(); + + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + final StorageObject encoded = codecs.blobInfo().encode(updated); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForObjectsCreate(encoded, optionsMap); + RewindableContent content = RewindableContent.of(buf); + return run( + algorithm, + () -> { + content.rewindTo(0); + return storageRpc.create(encoded, new RewindableContentInputStream(content), optionsMap); + }, + Conversions.json().blobInfo()::decode); + } + + /** + * Behavioral difference compared to {@link #delete(BlobId, BlobSourceOption...)} instead of + * returning false when an object does not exist, we throw an exception. + */ + @Override + public Void internalObjectDelete(BlobId id, Opts opts) { + final StorageObject storageObject = codecs.blobId().encode(id); + ImmutableMap optionsMap = opts.getRpcOptions(); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForObjectsDelete(storageObject, optionsMap); + return run( + algorithm, + () -> { + boolean deleted = storageRpc.delete(storageObject, optionsMap); + // HttpStorageRpc turns a 404 into false, our code needs to know 404 + if (!deleted) { + throw new StorageException(404, "NOT_FOUND", null, null); + } + return null; + }, + Function.identity()); + } + + @Override + public BlobInfo internalObjectGet(BlobId blobId, Opts opts) { + StorageObject storedObject = codecs.blobId().encode(blobId); + ImmutableMap optionsMap = opts.getRpcOptions(); + ResultRetryAlgorithm algorithm = + retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap); + return run( + algorithm, + () -> { + StorageObject storageObject = storageRpc.get(storedObject, optionsMap); + // HttpStorageRpc turns a 404 into null, our code needs to know 404 + if (storageObject == null) { + throw new StorageException(404, "NOT_FOUND", null, null); + } + return storageObject; + }, + codecs.blobInfo()::decode); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 8a9734271..363d9afdd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -43,6 +43,7 @@ import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.client.util.Data; import com.google.api.services.storage.Storage; +import com.google.api.services.storage.Storage.Objects.Compose; import com.google.api.services.storage.Storage.Objects.Get; import com.google.api.services.storage.Storage.Objects.Insert; import com.google.api.services.storage.model.Bucket; @@ -755,13 +756,15 @@ public StorageObject compose( Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_COMPOSE); Scope scope = tracer.withSpan(span); try { - return storage - .objects() - .compose(target.getBucket(), target.getName(), request) - .setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions)) - .setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions)) - .setUserProject(Option.USER_PROJECT.getString(targetOptions)) - .execute(); + Compose compose = + storage + .objects() + .compose(target.getBucket(), target.getName(), request) + .setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions)) + .setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions)) + .setUserProject(Option.USER_PROJECT.getString(targetOptions)); + setEncryptionHeaders(compose.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, targetOptions); + return compose.execute(); } catch (IOException ex) { span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage())); throw translate(ex); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentInputStreamTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentInputStreamTest.java new file mode 100644 index 000000000..b8d7bd5c5 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentInputStreamTest.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.TestUtils.xxd; +import static com.google.common.truth.Truth.assertThat; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.junit.Test; + +public final class RewindableContentInputStreamTest { + + @Test + public void read_empty() throws IOException { + RewindableContent content = RewindableContent.empty(); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + int read = in.read(); + assertThat(read).isEqualTo(-1); + } + } + + @Test + public void readB_emptySrc() throws IOException { + RewindableContent content = RewindableContent.empty(); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + int read = in.read(new byte[1]); + assertThat(read).isEqualTo(-1); + } + } + + @Test + public void readB_emptyDst() throws IOException { + byte[] bytes = DataGenerator.base64Characters().genBytes(1); + RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes)); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + byte[] tmp = new byte[0]; + int read = in.read(tmp); + assertThat(read).isEqualTo(0); + } + } + + @Test + public void readB_singleByte() throws IOException { + byte[] bytes = DataGenerator.base64Characters().genBytes(1); + RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes)); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + byte[] tmp = new byte[_256KiB]; + int read = in.read(tmp); + assertThat(read).isEqualTo(1); + assertThat(tmp[0]).isEqualTo(bytes[0]); + } + } + + @Test + public void read_singleByte() throws IOException { + byte[] bytes = DataGenerator.base64Characters().genBytes(1); + RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes)); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + int read = in.read(); + assertThat(read).isEqualTo(bytes[0]); + } + } + + @Test + public void readB_multiContent() throws IOException { + byte[] bytes = DataGenerator.base64Characters().genBytes(30); + RewindableContent content = + RewindableContent.of( + ByteBuffer.wrap(bytes, 0, 10), + ByteBuffer.wrap(bytes, 10, 10), + ByteBuffer.wrap(bytes, 20, 10)); + try (RewindableContentInputStream in = new RewindableContentInputStream(content)) { + byte[] tmp = new byte[_256KiB]; + int read = in.read(tmp); + assertThat(read).isEqualTo(30); + assertThat(xxd(ByteString.copyFrom(tmp, 0, read))).isEqualTo(xxd(bytes)); + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java index 304cdd69f..3352a92ad 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java @@ -31,6 +31,7 @@ import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.DataGenerator; import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.HttpStorageOptions; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; @@ -73,7 +74,7 @@ @RunWith(StorageITRunner.class) @CrossRun( - transports = {Transport.GRPC}, + transports = {Transport.HTTP, Transport.GRPC}, backends = {Backend.PROD}) public final class ITParallelCompositeUploadBlobWriteSessionConfigTest { @@ -125,6 +126,12 @@ public void setUp() throws Exception { .toBuilder() .setBlobWriteSessionConfig(pcu) .build(); + } else if (transport == Transport.HTTP) { + storageOptions = + ((HttpStorageOptions) injectedStorage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig(pcu) + .build(); } assertWithMessage("unable to resolve options").that(storageOptions).isNotNull(); //noinspection DataFlowIssue