From 702ab2bb1ceb9f428296591adc6e09023b4a8484 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 12 Apr 2023 13:11:20 -0400 Subject: [PATCH] feat: make it possible to disable the buffer of ReadChannels returned from Storage.reader (#1974) For some scenarios, an external client needs the ability to manage buffer blocking itself. To support this, providing 0 to ReadChannel#setChunkSize will disable buffering allowing for client to avoid the need to alight multiple levels of buffer alignments. Because the buffering is disabled, that means reads can be much more variable in size and individually more impacted by small network latencies (with buffering these can be amortized by the buffer making followup read faster). This is considered advanced usage and will require more work from the client integrator. --- .../cloud/storage/BaseStorageReadChannel.java | 21 +++++---- .../cloud/storage/BlobReadChannelV2.java | 28 ++++++++---- .../cloud/storage/GrpcBlobReadChannel.java | 31 ++++++++----- .../google/cloud/storage/LazyReadChannel.java | 16 +++---- .../cloud/storage/LazyReadChannelTest.java | 16 ++++--- .../storage/it/ITStorageReadChannelTest.java | 43 +++++++++++++++++++ 6 files changed, 113 insertions(+), 42 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java index 60ab4990d..b6b0a76b7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java @@ -23,12 +23,12 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel; import com.google.cloud.storage.Conversions.Decoder; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; import org.checkerframework.checker.nullness.qual.Nullable; abstract class BaseStorageReadChannel implements StorageReadChannel { @@ -40,7 +40,7 @@ abstract class BaseStorageReadChannel implements StorageReadChannel { private ByteRangeSpec byteRangeSpec; private int chunkSize = _2MiB; private BufferHandle bufferHandle; - private LazyReadChannel lazyReadChannel; + private LazyReadChannel lazyReadChannel; protected BaseStorageReadChannel(Decoder objectDecoder) { this.objectDecoder = objectDecoder; @@ -64,15 +64,18 @@ public final synchronized boolean isOpen() { public final synchronized void close() { open = false; if (internalGetLazyChannel().isOpen()) { - StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close); + ReadableByteChannel channel = internalGetLazyChannel().getChannel(); + StorageException.wrapIOException(channel::close); } } @Override public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) { requireNonNull(byteRangeSpec, "byteRangeSpec must be non null"); - StorageException.wrapIOException(() -> maybeResetChannel(false)); - this.byteRangeSpec = byteRangeSpec; + if (!this.byteRangeSpec.equals(byteRangeSpec)) { + StorageException.wrapIOException(() -> maybeResetChannel(false)); + this.byteRangeSpec = byteRangeSpec; + } return this; } @@ -95,7 +98,7 @@ public final synchronized int read(ByteBuffer dst) throws IOException { } try { // trap if the fact that tmp is already closed, and instead return -1 - BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel(); + ReadableByteChannel tmp = internalGetLazyChannel().getChannel(); if (!tmp.isOpen()) { return -1; } @@ -146,7 +149,7 @@ protected final T getResolvedObject() { } } - protected abstract LazyReadChannel newLazyReadChannel(); + protected abstract LazyReadChannel newLazyReadChannel(); private void maybeResetChannel(boolean freeBuffer) throws IOException { if (lazyReadChannel != null) { @@ -162,9 +165,9 @@ private void maybeResetChannel(boolean freeBuffer) throws IOException { } } - private LazyReadChannel internalGetLazyChannel() { + private LazyReadChannel internalGetLazyChannel() { if (lazyReadChannel == null) { - LazyReadChannel tmp = newLazyReadChannel(); + LazyReadChannel tmp = newLazyReadChannel(); ApiFuture future = tmp.getSession().getResult(); ApiFutures.addCallback( future, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java index 080cfbec5..1f3c5b52c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java @@ -21,6 +21,7 @@ import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest; +import com.google.cloud.storage.HttpDownloadSessionBuilder.ReadableByteChannelSessionBuilder; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; import java.io.Serializable; @@ -56,16 +57,25 @@ public synchronized RestorableState capture() { apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize()); } - protected LazyReadChannel newLazyReadChannel() { + protected LazyReadChannel newLazyReadChannel() { return new LazyReadChannel<>( - () -> - ResumableMedia.http() - .read() - .byteChannel(blobReadChannelContext) - .setAutoGzipDecompression(autoGzipDecompression) - .buffered(getBufferHandle()) - .setApiaryReadRequest(getApiaryReadRequest()) - .build()); + () -> { + ReadableByteChannelSessionBuilder b = + ResumableMedia.http() + .read() + .byteChannel(blobReadChannelContext) + .setAutoGzipDecompression(autoGzipDecompression); + BufferHandle bufferHandle = getBufferHandle(); + // because we're erasing the specific type of channel, we need to declare it here. + // If we don't, the compiler complains we're not returning a compliant type. + ReadableByteChannelSession session; + if (bufferHandle.capacity() > 0) { + session = b.buffered(bufferHandle).setApiaryReadRequest(getApiaryReadRequest()).build(); + } else { + session = b.unbuffered().setApiaryReadRequest(getApiaryReadRequest()).build(); + } + return session; + }); } private ApiaryReadRequest getApiaryReadRequest() { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index d8a7c7c27..b58b9663f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -19,6 +19,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; +import com.google.cloud.storage.GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder; import com.google.storage.v2.Object; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; @@ -46,17 +47,27 @@ public RestorableState capture() { } @Override - protected LazyReadChannel newLazyReadChannel() { + protected LazyReadChannel newLazyReadChannel() { return new LazyReadChannel<>( - () -> - ResumableMedia.gapic() - .read() - .byteChannel(read) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(autoGzipDecompression) - .buffered(getBufferHandle()) - .setReadObjectRequest(getReadObjectRequest()) - .build()); + () -> { + ReadableByteChannelSessionBuilder b = + ResumableMedia.gapic() + .read() + .byteChannel(read) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(autoGzipDecompression); + BufferHandle bufferHandle = getBufferHandle(); + // because we're erasing the specific type of channel, we need to declare it here. + // If we don't, the compiler complains we're not returning a compliant type. + ReadableByteChannelSession session; + if (bufferHandle.capacity() > 0) { + session = + b.buffered(getBufferHandle()).setReadObjectRequest(getReadObjectRequest()).build(); + } else { + session = b.unbuffered().setReadObjectRequest(getReadObjectRequest()).build(); + } + return session; + }); } @NonNull diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java index 5b8cba1e9..ba9e90a44 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java @@ -16,26 +16,26 @@ package com.google.cloud.storage; -import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel; +import java.nio.channels.ReadableByteChannel; import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; -final class LazyReadChannel { +final class LazyReadChannel { - private final Supplier> sessionSupplier; + private final Supplier> sessionSupplier; - @MonotonicNonNull private volatile BufferedReadableByteChannelSession session; - @MonotonicNonNull private volatile BufferedReadableByteChannel channel; + @MonotonicNonNull private volatile ReadableByteChannelSession session; + @MonotonicNonNull private volatile RBC channel; private boolean open = false; - LazyReadChannel(Supplier> sessionSupplier) { + LazyReadChannel(Supplier> sessionSupplier) { this.sessionSupplier = sessionSupplier; } @NonNull - BufferedReadableByteChannel getChannel() { + RBC getChannel() { if (channel != null) { return channel; } else { @@ -50,7 +50,7 @@ BufferedReadableByteChannel getChannel() { } @NonNull - BufferedReadableByteChannelSession getSession() { + ReadableByteChannelSession getSession() { if (session != null) { return session; } else { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java index 09ce3ecaf..ef637abae 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java @@ -34,16 +34,18 @@ public final class LazyReadChannelTest { @Test public void repeatedCallsOfGetSessionMustReturnTheSameInstance() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); - BufferedReadableByteChannelSession session1 = lrc.getSession(); - BufferedReadableByteChannelSession session2 = lrc.getSession(); + ReadableByteChannelSession session1 = lrc.getSession(); + ReadableByteChannelSession session2 = lrc.getSession(); assertThat(session1).isSameInstanceAs(session2); } @Test public void repeatedCallsOfGetChannelMustReturnTheSameInstance() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); BufferedReadableByteChannel channel1 = lrc.getChannel(); BufferedReadableByteChannel channel2 = lrc.getChannel(); @@ -52,7 +54,8 @@ public void repeatedCallsOfGetChannelMustReturnTheSameInstance() { @Test public void isNotOpenUntilGetChannelIsCalled() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); assertThat(lrc.isOpen()).isFalse(); BufferedReadableByteChannel channel = lrc.getChannel(); @@ -63,7 +66,8 @@ public void isNotOpenUntilGetChannelIsCalled() { @Test public void closingUnderlyingChannelClosesTheLazyReadChannel() throws IOException { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); BufferedReadableByteChannel channel = lrc.getChannel(); assertThat(channel.isOpen()).isTrue(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java index 06f0fa5ac..317974d47 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java @@ -42,6 +42,7 @@ import com.google.cloud.storage.it.runner.registry.Generator; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -107,6 +108,35 @@ public void storageReadChannel_getObject_returns() throws Exception { } } + @Test + // @CrossRun.Exclude(transports = Transport.GRPC) + public void storageReadChannel_shouldAllowDisablingBufferingBySettingChunkSize_lteq0() + throws IOException { + int _512KiB = 512 * 1024; + int _1MiB = 1024 * 1024; + + final BlobInfo info; + byte[] uncompressedBytes = DataGenerator.base64Characters().genBytes(_512KiB); + { + BlobInfo tmp = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + Blob gen1 = storage.create(tmp, uncompressedBytes, BlobTargetOption.doesNotExist()); + info = gen1.asBlobInfo(); + } + + try (ReadChannel c = storage.reader(info.getBlobId())) { + c.setChunkSize(0); + + ByteBuffer buf = ByteBuffer.allocate(_1MiB); + // Because this is unbuffered, the underlying channel will not necessarily fill up the buf + // in a single read call. Repeatedly read until full or EOF. + int read = fillFrom(buf, c); + assertThat(read).isEqualTo(_512KiB); + String actual = xxd(buf); + String expected = xxd(uncompressedBytes); + assertThat(actual).isEqualTo(expected); + } + } + @Test public void storageReadChannel_getObject_404() { BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName()); @@ -129,4 +159,17 @@ private static void equalForField(T actual, T expected, Function f) F eF = f.apply(expected); assertThat(aF).isEqualTo(eF); } + + static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { + int total = 0; + while (buf.hasRemaining()) { + int read = c.read(buf); + if (read != -1) { + total += read; + } else { + break; + } + } + return total; + } }