diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java index 60ab4990d..b6b0a76b7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageReadChannel.java @@ -23,12 +23,12 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel; import com.google.cloud.storage.Conversions.Decoder; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; import org.checkerframework.checker.nullness.qual.Nullable; abstract class BaseStorageReadChannel implements StorageReadChannel { @@ -40,7 +40,7 @@ abstract class BaseStorageReadChannel implements StorageReadChannel { private ByteRangeSpec byteRangeSpec; private int chunkSize = _2MiB; private BufferHandle bufferHandle; - private LazyReadChannel lazyReadChannel; + private LazyReadChannel lazyReadChannel; protected BaseStorageReadChannel(Decoder objectDecoder) { this.objectDecoder = objectDecoder; @@ -64,15 +64,18 @@ public final synchronized boolean isOpen() { public final synchronized void close() { open = false; if (internalGetLazyChannel().isOpen()) { - StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close); + ReadableByteChannel channel = internalGetLazyChannel().getChannel(); + StorageException.wrapIOException(channel::close); } } @Override public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) { requireNonNull(byteRangeSpec, "byteRangeSpec must be non null"); - StorageException.wrapIOException(() -> maybeResetChannel(false)); - this.byteRangeSpec = byteRangeSpec; + if (!this.byteRangeSpec.equals(byteRangeSpec)) { + StorageException.wrapIOException(() -> maybeResetChannel(false)); + this.byteRangeSpec = byteRangeSpec; + } return this; } @@ -95,7 +98,7 @@ public final synchronized int read(ByteBuffer dst) throws IOException { } try { // trap if the fact that tmp is already closed, and instead return -1 - BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel(); + ReadableByteChannel tmp = internalGetLazyChannel().getChannel(); if (!tmp.isOpen()) { return -1; } @@ -146,7 +149,7 @@ protected final T getResolvedObject() { } } - protected abstract LazyReadChannel newLazyReadChannel(); + protected abstract LazyReadChannel newLazyReadChannel(); private void maybeResetChannel(boolean freeBuffer) throws IOException { if (lazyReadChannel != null) { @@ -162,9 +165,9 @@ private void maybeResetChannel(boolean freeBuffer) throws IOException { } } - private LazyReadChannel internalGetLazyChannel() { + private LazyReadChannel internalGetLazyChannel() { if (lazyReadChannel == null) { - LazyReadChannel tmp = newLazyReadChannel(); + LazyReadChannel tmp = newLazyReadChannel(); ApiFuture future = tmp.getSession().getResult(); ApiFutures.addCallback( future, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java index 080cfbec5..1f3c5b52c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java @@ -21,6 +21,7 @@ import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest; +import com.google.cloud.storage.HttpDownloadSessionBuilder.ReadableByteChannelSessionBuilder; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; import java.io.Serializable; @@ -56,16 +57,25 @@ public synchronized RestorableState capture() { apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize()); } - protected LazyReadChannel newLazyReadChannel() { + protected LazyReadChannel newLazyReadChannel() { return new LazyReadChannel<>( - () -> - ResumableMedia.http() - .read() - .byteChannel(blobReadChannelContext) - .setAutoGzipDecompression(autoGzipDecompression) - .buffered(getBufferHandle()) - .setApiaryReadRequest(getApiaryReadRequest()) - .build()); + () -> { + ReadableByteChannelSessionBuilder b = + ResumableMedia.http() + .read() + .byteChannel(blobReadChannelContext) + .setAutoGzipDecompression(autoGzipDecompression); + BufferHandle bufferHandle = getBufferHandle(); + // because we're erasing the specific type of channel, we need to declare it here. + // If we don't, the compiler complains we're not returning a compliant type. + ReadableByteChannelSession session; + if (bufferHandle.capacity() > 0) { + session = b.buffered(bufferHandle).setApiaryReadRequest(getApiaryReadRequest()).build(); + } else { + session = b.unbuffered().setApiaryReadRequest(getApiaryReadRequest()).build(); + } + return session; + }); } private ApiaryReadRequest getApiaryReadRequest() { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index d8a7c7c27..b58b9663f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -19,6 +19,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; +import com.google.cloud.storage.GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder; import com.google.storage.v2.Object; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; @@ -46,17 +47,27 @@ public RestorableState capture() { } @Override - protected LazyReadChannel newLazyReadChannel() { + protected LazyReadChannel newLazyReadChannel() { return new LazyReadChannel<>( - () -> - ResumableMedia.gapic() - .read() - .byteChannel(read) - .setHasher(Hasher.noop()) - .setAutoGzipDecompression(autoGzipDecompression) - .buffered(getBufferHandle()) - .setReadObjectRequest(getReadObjectRequest()) - .build()); + () -> { + ReadableByteChannelSessionBuilder b = + ResumableMedia.gapic() + .read() + .byteChannel(read) + .setHasher(Hasher.noop()) + .setAutoGzipDecompression(autoGzipDecompression); + BufferHandle bufferHandle = getBufferHandle(); + // because we're erasing the specific type of channel, we need to declare it here. + // If we don't, the compiler complains we're not returning a compliant type. + ReadableByteChannelSession session; + if (bufferHandle.capacity() > 0) { + session = + b.buffered(getBufferHandle()).setReadObjectRequest(getReadObjectRequest()).build(); + } else { + session = b.unbuffered().setReadObjectRequest(getReadObjectRequest()).build(); + } + return session; + }); } @NonNull diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java index 5b8cba1e9..ba9e90a44 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/LazyReadChannel.java @@ -16,26 +16,26 @@ package com.google.cloud.storage; -import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel; +import java.nio.channels.ReadableByteChannel; import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; -final class LazyReadChannel { +final class LazyReadChannel { - private final Supplier> sessionSupplier; + private final Supplier> sessionSupplier; - @MonotonicNonNull private volatile BufferedReadableByteChannelSession session; - @MonotonicNonNull private volatile BufferedReadableByteChannel channel; + @MonotonicNonNull private volatile ReadableByteChannelSession session; + @MonotonicNonNull private volatile RBC channel; private boolean open = false; - LazyReadChannel(Supplier> sessionSupplier) { + LazyReadChannel(Supplier> sessionSupplier) { this.sessionSupplier = sessionSupplier; } @NonNull - BufferedReadableByteChannel getChannel() { + RBC getChannel() { if (channel != null) { return channel; } else { @@ -50,7 +50,7 @@ BufferedReadableByteChannel getChannel() { } @NonNull - BufferedReadableByteChannelSession getSession() { + ReadableByteChannelSession getSession() { if (session != null) { return session; } else { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java index 09ce3ecaf..ef637abae 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/LazyReadChannelTest.java @@ -34,16 +34,18 @@ public final class LazyReadChannelTest { @Test public void repeatedCallsOfGetSessionMustReturnTheSameInstance() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); - BufferedReadableByteChannelSession session1 = lrc.getSession(); - BufferedReadableByteChannelSession session2 = lrc.getSession(); + ReadableByteChannelSession session1 = lrc.getSession(); + ReadableByteChannelSession session2 = lrc.getSession(); assertThat(session1).isSameInstanceAs(session2); } @Test public void repeatedCallsOfGetChannelMustReturnTheSameInstance() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); BufferedReadableByteChannel channel1 = lrc.getChannel(); BufferedReadableByteChannel channel2 = lrc.getChannel(); @@ -52,7 +54,8 @@ public void repeatedCallsOfGetChannelMustReturnTheSameInstance() { @Test public void isNotOpenUntilGetChannelIsCalled() { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); assertThat(lrc.isOpen()).isFalse(); BufferedReadableByteChannel channel = lrc.getChannel(); @@ -63,7 +66,8 @@ public void isNotOpenUntilGetChannelIsCalled() { @Test public void closingUnderlyingChannelClosesTheLazyReadChannel() throws IOException { - LazyReadChannel lrc = new LazyReadChannel<>(this::newTestSession); + LazyReadChannel lrc = + new LazyReadChannel<>(this::newTestSession); BufferedReadableByteChannel channel = lrc.getChannel(); assertThat(channel.isOpen()).isTrue(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java index 06f0fa5ac..317974d47 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageReadChannelTest.java @@ -42,6 +42,7 @@ import com.google.cloud.storage.it.runner.registry.Generator; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -107,6 +108,35 @@ public void storageReadChannel_getObject_returns() throws Exception { } } + @Test + // @CrossRun.Exclude(transports = Transport.GRPC) + public void storageReadChannel_shouldAllowDisablingBufferingBySettingChunkSize_lteq0() + throws IOException { + int _512KiB = 512 * 1024; + int _1MiB = 1024 * 1024; + + final BlobInfo info; + byte[] uncompressedBytes = DataGenerator.base64Characters().genBytes(_512KiB); + { + BlobInfo tmp = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + Blob gen1 = storage.create(tmp, uncompressedBytes, BlobTargetOption.doesNotExist()); + info = gen1.asBlobInfo(); + } + + try (ReadChannel c = storage.reader(info.getBlobId())) { + c.setChunkSize(0); + + ByteBuffer buf = ByteBuffer.allocate(_1MiB); + // Because this is unbuffered, the underlying channel will not necessarily fill up the buf + // in a single read call. Repeatedly read until full or EOF. + int read = fillFrom(buf, c); + assertThat(read).isEqualTo(_512KiB); + String actual = xxd(buf); + String expected = xxd(uncompressedBytes); + assertThat(actual).isEqualTo(expected); + } + } + @Test public void storageReadChannel_getObject_404() { BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName()); @@ -129,4 +159,17 @@ private static void equalForField(T actual, T expected, Function f) F eF = f.apply(expected); assertThat(aF).isEqualTo(eF); } + + static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { + int total = 0; + while (buf.hasRemaining()) { + int read = c.read(buf); + if (read != -1) { + total += read; + } else { + break; + } + } + return total; + } }