Skip to content

Commit

Permalink
feat: Add Bidi write feature (#2343)
Browse files Browse the repository at this point in the history
* feat: Add Bidi write feature

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* add in bidi ssb workload

* WIP

* Add in bidi fields

* Add in DefaultBlobWriteSession test

* fix copyright

* remove bucket creation line

* cleanup object created by ssb

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix up comments

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Sydney Munro <sydmunro@google.com>
  • Loading branch information
3 people committed Feb 21, 2024
1 parent 1d2064b commit 47fde85
Show file tree
Hide file tree
Showing 17 changed files with 1,226 additions and 28 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-storage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-storage:2.33.0'
implementation 'com.google.cloud:google-cloud-storage:2.34.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.33.0"
libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.34.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -428,7 +428,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-storage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-storage.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.33.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.34.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
@@ -0,0 +1,180 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.BidiWriteObjectResponse;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.time.Clock;

public class BidiBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
private static final long serialVersionUID = -903533790705476197L;

private final int bufferSize;

@InternalApi
BidiBlobWriteSessionConfig(int bufferSize) {
this.bufferSize = bufferSize;
}

/**
* The number of bytes to hold in the buffer before each flush
*
* <p><i>Default:</i> {@code 16777216 (16 MiB)}
*
* @see #withBufferSize(int)
* @since 2.34.0 This new api is in preview and is subject to breaking changes.
*/
public int getBufferSize() {
return bufferSize;
}

@Override
WriterFactory createFactory(Clock clock) throws IOException {
return new Factory(ByteSizeConstants._16MiB);
}

@InternalApi
private static final class Factory implements WriterFactory {
private static final Conversions.Decoder<BidiWriteObjectResponse, BlobInfo>
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
Conversions.grpc().blobInfo().compose(BidiWriteObjectResponse::getResource);

private final int bufferSize;

private Factory(int bufferSize) {
this.bufferSize = bufferSize;
}

@InternalApi
@Override
public WritableByteChannelSession<?, BlobInfo> writeSession(
StorageInternal s, BlobInfo info, UnifiedOpts.Opts<UnifiedOpts.ObjectTargetOpt> opts) {
if (s instanceof GrpcStorageImpl) {
return new DecoratedWritableByteChannelSession<>(
new LazySession<>(
new LazyWriteChannel<>(
() -> {
GrpcStorageImpl grpc = (GrpcStorageImpl) s;
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);

ApiFuture<BidiResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req);
return ResumableMedia.gapic()
.write()
.bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable())
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.copy())
.resumable()
.withRetryConfig(
grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
.buffered(BufferHandle.allocate(bufferSize))
.setStartAsync(startResumableWrite)
.build();
})),
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
} else {
throw new IllegalStateException(
"Unknown Storage implementation: " + s.getClass().getName());
}
}
}

/**
* Create a new instance with the {@code bufferSize} set to the specified value.
*
* <p><i>Default:</i> {@code 16777216 (16 MiB)}
*
* @param bufferSize The number of bytes to hold in the buffer before each flush. Must be &gt;=
* {@code 262144 (256 KiB)}
* @return The new instance
* @see #getBufferSize()
* @since 2.34.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public BidiBlobWriteSessionConfig withBufferSize(int bufferSize) {
Preconditions.checkArgument(
bufferSize >= ByteSizeConstants._256KiB,
"bufferSize must be >= %d",
ByteSizeConstants._256KiB);
return new BidiBlobWriteSessionConfig(bufferSize);
}

private static final class DecoratedWritableByteChannelSession<WBC extends WritableByteChannel, T>
implements WritableByteChannelSession<WBC, BlobInfo> {

private final WritableByteChannelSession<WBC, T> delegate;
private final Conversions.Decoder<T, BlobInfo> decoder;

private DecoratedWritableByteChannelSession(
WritableByteChannelSession<WBC, T> delegate, Conversions.Decoder<T, BlobInfo> decoder) {
this.delegate = delegate;
this.decoder = decoder;
}

@Override
public WBC open() {
try {
return WritableByteChannelSession.super.open();
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public ApiFuture<WBC> openAsync() {
return delegate.openAsync();
}

@Override
public ApiFuture<BlobInfo> getResult() {
return ApiFutures.transform(
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
}
}

private static final class LazySession<R>
implements WritableByteChannelSession<
BufferedWritableByteChannelSession.BufferedWritableByteChannel, R> {
private final LazyWriteChannel<R> lazy;

private LazySession(LazyWriteChannel<R> lazy) {
this.lazy = lazy;
}

@Override
public ApiFuture<BufferedWritableByteChannelSession.BufferedWritableByteChannel> openAsync() {
return lazy.getSession().openAsync();
}

@Override
public ApiFuture<R> getResult() {
return lazy.getSession().getResult();
}
}
}
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto;

import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory;
import com.google.storage.v2.BidiWriteObjectRequest;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
import java.util.Objects;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.Nullable;

final class BidiResumableWrite implements BidiWriteObjectRequestBuilderFactory {

private final StartResumableWriteRequest req;
private final StartResumableWriteResponse res;

private final BidiWriteObjectRequest writeRequest;

public BidiResumableWrite(
StartResumableWriteRequest req,
StartResumableWriteResponse res,
Function<String, BidiWriteObjectRequest> f) {
this.req = req;
this.res = res;
this.writeRequest = f.apply(res.getUploadId());
}

public StartResumableWriteRequest getReq() {
return req;
}

public StartResumableWriteResponse getRes() {
return res;
}

@Override
public BidiWriteObjectRequest.Builder newBuilder() {
return writeRequest.toBuilder();
}

@Override
public @Nullable String bucketName() {
if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
return req.getWriteObjectSpec().getResource().getBucket();
}
return null;
}

@Override
public String toString() {
return "BidiResumableWrite{" + "req=" + fmtProto(req) + ", res=" + fmtProto(res) + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ResumableWrite)) {
return false;
}
ResumableWrite resumableWrite = (ResumableWrite) o;
return Objects.equals(req, resumableWrite.getReq())
&& Objects.equals(res, resumableWrite.getRes());
}

@Override
public int hashCode() {
return Objects.hash(req, res);
}

/**
* Helper function which is more specific than {@link Function#identity()}. Constraining the input
* and output to be exactly {@link BidiResumableWrite}.
*/
static BidiResumableWrite identity(BidiResumableWrite w) {
return w;
}
}

0 comments on commit 47fde85

Please sign in to comment.