Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new JournalingBlobWriteSessionConfig usable with gRPC transport #2194

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,115 @@
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;

/**
* Factory class to select and construct {@link BlobWriteSessionConfig}s.
*
* <p>There are several strategies which can be used to upload a {@link Blob} to Google Cloud
* Storage. This class provides factories which allow you to select the appropriate strategy for
* your workload.
*
* <table>
* <caption>Comparison of Strategies</caption>
* <tr>
* <th>Strategy</th>
* <th>Factory Method(s)</th>
* <th>Description</th>
* <th>Retry Support</th>
* <th>Transports Supported</th>
* <th>Cloud Storage API used</th>
* <th>Considerations</th>
* </tr>
* <tr>
* <td>Default (Chunk based upload)</td>
* <td>{@link #getDefault()}</td>
* <td>
* Buffer up to a configurable amount of bytes in memory, write to Cloud Storage when
* full or close. Buffer size is configurable via
* {@link DefaultBlobWriteSessionConfig#withChunkSize(int)}
* </td>
* <td>
* Each chunk is retried up to the limitations specified in
* {@link StorageOptions#getRetrySettings()}
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>The network will only be used for the following operations:
* <ol>
* <li>Creating the Resumable Upload Session</li>
* <li>Transmitting zero or more incremental chunks</li>
* <li>Transmitting the final chunk and finalizing the Resumable Upload Session</li>
* <li>
* If any of the above are interrupted with a retryable error, the Resumable Upload Session
* will be queried to reconcile client side state with Cloud Storage
* </li>
* </ol>
* </td>
* </tr>
* <tr>
* <td>Buffer to disk then upload</td>
* <td>
* <ul>
* <li>{@link #bufferToDiskThenUpload(Path)}</li>
* <li>{@link #bufferToDiskThenUpload(Collection) bufferToDiskThenUpload(Collection&lt;Path>)}</li>
* <li>{@link #bufferToTempDirThenUpload()}</li>
* </ul>
* </td>
* <td>
* Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()}
* upload the entire files contents to Cloud Storage. Delete the temporary file.
* </td>
* <td>
* Upload the file in the fewest number of RPC possible retrying within the limitations
* specified in {@link StorageOptions#getRetrySettings()}
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>
* <ol>
* <li>A Resumable Upload Session will be used to upload the file on disk.</li>
* <li>
* If the upload is interrupted with a retryable error, the Resumable Upload Session will
* be queried to restart the upload from Cloud Storage's last received byte
* </li>
* </ol>
* </td>
* </tr>
* <tr>
* <td>Journal to disk while uploading</td>
* <td>{@link #journaling(Collection) journaling(Collection&lt;Path>)}</td>
* <td>
* Create a Resumable Upload Session, before transmitting bytes to Cloud Storage write
* to a recovery file on disk. If the stream to Cloud Storage is interrupted with a
* retryable error query the offset of the Resumable Upload Session, then open the recovery
* file from the offset and transmit the bytes to Cloud Storage.
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>
* <ol>
* <li>
* The stream to Cloud Storage will be held open until a) the write is complete
* b) the stream is interrupted
* </li>
* <li>
* Because the bytes are journaled to disk, the upload to Cloud Storage can only
* be as fast as the disk.
* </li>
* <li>
* The use of <a href="https://cloud.google.com/compute/docs/disks/local-ssd#nvme">Compute
* Engine Local NVMe SSD</a> is strongly encouraged compared to Compute Engine Persistent
* Disk.
* </li>
* </ol>
* </td>
* </tr>
* </table>
*
* @see BlobWriteSessionConfig
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
Expand Down Expand Up @@ -56,11 +158,11 @@ public static DefaultBlobWriteSessionConfig getDefault() {
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under {@code java.io.tmpdir}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
Expand All @@ -72,11 +174,11 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under the specified {@code path}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
Expand All @@ -87,18 +189,34 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under one of the specified {@code paths}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* <p>The specifics of how the work is spread across multiple paths is undefined and subject to
* change.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
throws IOException {
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
}

/**
* Create a new {@link BlobWriteSessionConfig} which will journal writes to a temporary file under
* one of the specified {@code paths} before transmitting the bytes to Cloud Storage.
*
* <p>The specifics of how the work is spread across multiple paths is undefined and subject to
* change.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.27.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths) {
return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand Down Expand Up @@ -96,8 +97,7 @@ WriterFactory createFactory(Clock clock) throws IOException {
return new Factory(recoveryFileManager, clock, gcs);
}

private RecoveryFileManager.RecoverVolumeSinkFactory getRecoverVolumeSinkFactory(
Clock clock, Duration window) {
private RecoveryVolumeSinkFactory getRecoverVolumeSinkFactory(Clock clock, Duration window) {
return path -> {
ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
if (includeLoggingSink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.google.cloud.storage;

import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -137,4 +139,19 @@ static int alignSize(int size, int alignmentMultiple) {
} // else size is already aligned
return alignedSize;
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else if (total == 0) {
return -1;
} else {
break;
}
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ Hasher getHasher() {
return hasher;
}

ChunkSegment[] segmentBuffer(ByteBuffer bb) {
return segmentBuffers(new ByteBuffer[] {bb}, 0, 1);
BenWhitehead marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Given {@code bbs}, yield N segments, where each segment is at most {@code maxSegmentSize}
* bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public int getChunkSize() {
*
* <p><i>Default:</i> {@code 16777216 (16 MiB)}
*
* @param chunkSize The number of bytes each chunk should be. Must be >= {@code 262144 (256 KiB)}
* @param chunkSize The number of bytes each chunk should be. Must be &gt;= {@code 262144 (256
* KiB)}
* @return The new instance
* @see #getChunkSize()
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public int write(ByteBuffer src) throws IOException {
ByteBuffer slice = src.slice();
Buffers.limit(slice, bufferRemaining);
int write = channel.write(slice);
Buffers.position(src, srcPosition + write);
int newPosition = srcPosition + write;
Buffers.position(src, newPosition);
bytesConsumed += write;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory;
import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory;
import com.google.storage.v2.QueryWriteStatusRequest;
import com.google.storage.v2.QueryWriteStatusResponse;
import com.google.storage.v2.ServiceConstants.Values;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
Expand Down Expand Up @@ -106,6 +109,10 @@ ResumableUploadBuilder resumable() {
return new ResumableUploadBuilder();
}

JournalingResumableUploadBuilder journaling() {
return new JournalingResumableUploadBuilder();
}

/**
* When constructing any of our channel sessions, there is always a {@link
* GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction which
Expand Down Expand Up @@ -332,4 +339,102 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
}
}
}

final class JournalingResumableUploadBuilder {

private RetryingDependencies deps;
private ResultRetryAlgorithm<?> alg;
private BufferHandle bufferHandle;
private BufferHandle recoveryBuffer;
private RecoveryFile recoveryFile;
private UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query;

JournalingResumableUploadBuilder() {
this.deps = RetryingDependencies.attemptOnce();
this.alg = Retrying.neverRetry();
}

JournalingResumableUploadBuilder withRetryConfig(
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query) {
this.deps = requireNonNull(deps, "deps must be non null");
this.alg = requireNonNull(alg, "alg must be non null");
this.query = requireNonNull(query, "query must be non null");
return this;
}

JournalingResumableUploadBuilder withBuffer(BufferHandle bufferHandle) {
this.bufferHandle = requireNonNull(bufferHandle, "bufferHandle must be non null");
return this;
}

JournalingResumableUploadBuilder withRecoveryBuffer(BufferHandle bufferHandle) {
this.recoveryBuffer = requireNonNull(bufferHandle, "bufferHandle must be non null");
return this;
}

JournalingResumableUploadBuilder withRecoveryFile(RecoveryFile recoveryFile) {
this.recoveryFile = requireNonNull(recoveryFile, "recoveryFile must be non null");
return this;
}

/**
* Set the Future which will contain the ResumableWrite information necessary to open the Write
* stream.
*/
BuildableJournalingResumableUploadBuilder setStartAsync(
ApiFuture<WriteCtx<ResumableWrite>> start) {
requireNonNull(start, "start must be non null");
return new BuildableJournalingResumableUploadBuilder(start);
}

final class BuildableJournalingResumableUploadBuilder {
private final ApiFuture<WriteCtx<ResumableWrite>> start;

private BuildableJournalingResumableUploadBuilder(ApiFuture<WriteCtx<ResumableWrite>> start) {
this.start = start;
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction()
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}

private BiFunction<
WriteCtx<ResumableWrite>,
SettableApiFuture<WriteObjectResponse>,
UnbufferedWritableByteChannel>
bindFunction() {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is
// invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
RetryingDependencies deps = JournalingResumableUploadBuilder.this.deps;
ResultRetryAlgorithm<?> alg = JournalingResumableUploadBuilder.this.alg;
BufferHandle recoveryBuffer = JournalingResumableUploadBuilder.this.recoveryBuffer;
RecoveryFile recoveryFile = JournalingResumableUploadBuilder.this.recoveryFile;
UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query =
JournalingResumableUploadBuilder.this.query;
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return (writeCtx, resultFuture) ->
new SyncAndUploadUnbufferedWritableByteChannel(
write,
query,
resultFuture,
new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
deps,
alg,
writeCtx,
recoveryFile,
recoveryBuffer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1768,7 +1768,7 @@ ReadObjectRequest getReadObjectRequest(BlobId blob, Opts<ObjectSourceOpt> opts)
return opts.readObjectRequest().apply(builder).build();
}

private WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> opts) {
WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> opts) {
Object object = codecs.blobInfo().encode(info);
Object.Builder objectBuilder =
object
Expand Down