Skip to content

Commit

Permalink
feat: port DefaultBlobWriteSessionConfig to work with HttpStorageOpti…
Browse files Browse the repository at this point in the history
…ons (#2472)
  • Loading branch information
BenWhitehead committed Mar 28, 2024
1 parent 7c86ad0 commit e5772a4
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 8 deletions.
Expand Up @@ -53,6 +53,15 @@ WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts);
}

/**
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#HTTP}
*
* <p>We could evaluate the annotations, but the code for that is more complicated and probably
* not worth the effort.
*/
interface HttpCompatible {}

/**
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC}
Expand Down
Expand Up @@ -55,7 +55,7 @@
* full or close. Buffer size is configurable via
* {@link DefaultBlobWriteSessionConfig#withChunkSize(int)}
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>The network will only be used for the following operations:
* <ol>
* <li>Creating the Resumable Upload Session</li>
Expand Down Expand Up @@ -241,7 +241,7 @@ private BlobWriteSessionConfigs() {}
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static DefaultBlobWriteSessionConfig getDefault() {
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
}
Expand Down
Expand Up @@ -21,17 +21,21 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.nio.channels.WritableByteChannel;
import java.time.Clock;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.concurrent.Immutable;

/**
Expand All @@ -55,9 +59,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = -6873740918589930633L;

private final int chunkSize;
Expand Down Expand Up @@ -146,6 +150,39 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
.build();
})),
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
} else if (s instanceof StorageImpl) {
StorageImpl json = (StorageImpl) s;

return new DecoratedWritableByteChannelSession<>(
new LazySession<>(
new LazyWriteChannel<>(
() -> {
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();

StorageObject encode = Conversions.json().blobInfo().encode(updated);
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
json.getOptions(),
updated,
optionsMap,
json.retryAlgorithmManager.getForResumableUploadSessionCreate(
optionsMap));
ApiFuture<JsonResumableWrite> startAsync =
ApiFutures.immediateFuture(
JsonResumableWrite.of(
encode, optionsMap, uploadIdSupplier.get(), 0L));

return ResumableMedia.http()
.write()
.byteChannel(HttpClientContext.from(json.storageRpc))
.resumable()
.buffered(BufferHandle.allocate(chunkSize))
.setStartAsync(startAsync)
.build();
})),
Conversions.json().blobInfo());
} else {
throw new IllegalStateException(
"Unknown Storage implementation: " + s.getClass().getName());
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiClock;
Expand All @@ -30,6 +31,7 @@
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.spi.ServiceRpcFactory;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
Expand All @@ -39,7 +41,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.time.Clock;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.NonNull;

/** @since 2.14.0 This new api is in preview and is subject to breaking changes. */
@BetaApi
Expand All @@ -55,6 +59,7 @@ public class HttpStorageOptions extends StorageOptions {

private final HttpRetryAlgorithmManager retryAlgorithmManager;
private transient RetryDependenciesAdapter retryDepsAdapter;
private final BlobWriteSessionConfig blobWriteSessionConfig;

private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
super(builder, serviceDefaults);
Expand All @@ -63,6 +68,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
MoreObjects.firstNonNull(
builder.storageRetryStrategy, defaults().getStorageRetryStrategy()));
retryDepsAdapter = new RetryDependenciesAdapter();
blobWriteSessionConfig = builder.blobWriteSessionConfig;
}

@Override
Expand Down Expand Up @@ -120,6 +126,8 @@ RetryingDependencies asRetryDependencies() {
public static class Builder extends StorageOptions.Builder {

private StorageRetryStrategy storageRetryStrategy;
private BlobWriteSessionConfig blobWriteSessionConfig =
HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig();

Builder() {}

Expand Down Expand Up @@ -218,6 +226,24 @@ public HttpStorageOptions.Builder setQuotaProjectId(String quotaProjectId) {
return this;
}

/**
* @see BlobWriteSessionConfig
* @see BlobWriteSessionConfigs
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see HttpStorageDefaults#getDefaultStorageWriterConfig()
* @since 2.29.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public HttpStorageOptions.Builder setBlobWriteSessionConfig(
@NonNull BlobWriteSessionConfig blobWriteSessionConfig) {
requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null");
checkArgument(
blobWriteSessionConfig instanceof BlobWriteSessionConfig.HttpCompatible,
"The provided instance of BlobWriteSessionConfig is not compatible with this HTTP transport.");
this.blobWriteSessionConfig = blobWriteSessionConfig;
return this;
}

@Override
public HttpStorageOptions build() {
return new HttpStorageOptions(this, defaults());
Expand Down Expand Up @@ -249,6 +275,12 @@ public HttpTransportOptions getDefaultTransportOptions() {
public StorageRetryStrategy getStorageRetryStrategy() {
return StorageRetryStrategy.getDefaultStorageRetryStrategy();
}

/** @since 2.29.0 This new api is in preview and is subject to breaking changes. */
@BetaApi
public BlobWriteSessionConfig getDefaultStorageWriterConfig() {
return BlobWriteSessionConfigs.getDefault();
}
}

/**
Expand Down Expand Up @@ -287,7 +319,14 @@ public HttpStorageFactory() {}
public Storage create(StorageOptions options) {
if (options instanceof HttpStorageOptions) {
HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options;
return new StorageImpl(httpStorageOptions);
Clock clock = Clock.systemUTC();
try {
return new StorageImpl(
httpStorageOptions, httpStorageOptions.blobWriteSessionConfig.createFactory(clock));
} catch (IOException e) {
throw new IllegalStateException(
"Unable to instantiate HTTP com.google.cloud.storage.Storage client.", e);
}
} else {
throw new IllegalArgumentException("Only HttpStorageOptions supported");
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
Expand Down Expand Up @@ -92,7 +93,7 @@
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;

final class StorageImpl extends BaseService<StorageOptions> implements Storage {
final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {

private static final byte[] EMPTY_BYTE_ARRAY = {};
private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==";
Expand All @@ -115,11 +116,13 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {

final HttpRetryAlgorithmManager retryAlgorithmManager;
final StorageRpc storageRpc;
final WriterFactory writerFactory;

StorageImpl(HttpStorageOptions options) {
StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) {
super(options);
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
this.storageRpc = options.getStorageRpcV1();
this.writerFactory = writerFactory;
}

@Override
Expand Down Expand Up @@ -1635,4 +1638,13 @@ private Bucket internalBucketGet(String bucket, Map<StorageRpc.Option, ?> option
() -> storageRpc.get(bucketPb, optionsMap),
(b) -> Conversions.json().bucketInfo().decode(b).asBucket(this));
}

@Override
public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);

WritableByteChannelSession<?, BlobInfo> writableByteChannelSession =
writerFactory.writeSession(this, blobInfo, opts);
return BlobWriteSessions.of(writableByteChannelSession);
}
}
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.GrpcStorageOptions;
import com.google.cloud.storage.HttpStorageOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
Expand All @@ -49,7 +50,7 @@

@RunWith(StorageITRunner.class)
@CrossRun(
transports = {Transport.GRPC},
transports = {Transport.HTTP, Transport.GRPC},
backends = {Backend.PROD})
public final class ITBlobWriteSessionTest {

Expand All @@ -68,6 +69,7 @@ public void allDefaults() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bufferToTempDirThenUpload() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand All @@ -94,6 +96,13 @@ public void overrideDefaultBufferSize() throws Exception {
.setBlobWriteSessionConfig(
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
.build();
} else if (transport == Transport.HTTP) {
options =
((HttpStorageOptions) storage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
.build();
}
assertWithMessage("unable to resolve options").that(options).isNotNull();
//noinspection DataFlowIssue
Expand All @@ -103,6 +112,7 @@ public void overrideDefaultBufferSize() throws Exception {
}

@Test
@CrossRun.Exclude(transports = Transport.HTTP)
public void bidiTest() throws Exception {
StorageOptions options = null;
if (transport == Transport.GRPC) {
Expand Down

0 comments on commit e5772a4

Please sign in to comment.