Skip to content

Commit

Permalink
feat: make it possible to disable the buffer of ReadChannels returned…
Browse files Browse the repository at this point in the history
… from Storage.reader (#1974)

For some scenarios, an external client needs the ability to manage buffer blocking itself. To support this, providing 0 to ReadChannel#setChunkSize will disable buffering allowing for client to avoid the need to alight multiple levels of buffer alignments.

Because the buffering is disabled, that means reads can be much more variable in size and individually more impacted by small network latencies (with buffering these can be amortized by the buffer making followup read faster). This is considered advanced usage and will require more work from the client integrator.
  • Loading branch information
BenWhitehead committed Apr 12, 2023
1 parent f1b9493 commit 702ab2b
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 42 deletions.
Expand Up @@ -23,12 +23,12 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
Expand All @@ -40,7 +40,7 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
private ByteRangeSpec byteRangeSpec;
private int chunkSize = _2MiB;
private BufferHandle bufferHandle;
private LazyReadChannel<T> lazyReadChannel;
private LazyReadChannel<?, T> lazyReadChannel;

protected BaseStorageReadChannel(Decoder<T, BlobInfo> objectDecoder) {
this.objectDecoder = objectDecoder;
Expand All @@ -64,15 +64,18 @@ public final synchronized boolean isOpen() {
public final synchronized void close() {
open = false;
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
StorageException.wrapIOException(channel::close);
}
}

@Override
public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
}
return this;
}

Expand All @@ -95,7 +98,7 @@ public final synchronized int read(ByteBuffer dst) throws IOException {
}
try {
// trap if the fact that tmp is already closed, and instead return -1
BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel();
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
Expand Down Expand Up @@ -146,7 +149,7 @@ protected final T getResolvedObject() {
}
}

protected abstract LazyReadChannel<T> newLazyReadChannel();
protected abstract LazyReadChannel<?, T> newLazyReadChannel();

private void maybeResetChannel(boolean freeBuffer) throws IOException {
if (lazyReadChannel != null) {
Expand All @@ -162,9 +165,9 @@ private void maybeResetChannel(boolean freeBuffer) throws IOException {
}
}

private LazyReadChannel<T> internalGetLazyChannel() {
private LazyReadChannel<?, T> internalGetLazyChannel() {
if (lazyReadChannel == null) {
LazyReadChannel<T> tmp = newLazyReadChannel();
LazyReadChannel<?, T> tmp = newLazyReadChannel();
ApiFuture<T> future = tmp.getSession().getResult();
ApiFutures.addCallback(
future,
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.HttpDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
Expand Down Expand Up @@ -56,16 +57,25 @@ public synchronized RestorableState<ReadChannel> capture() {
apiaryReadRequest, blobReadChannelContext.getStorageOptions(), getChunkSize());
}

protected LazyReadChannel<StorageObject> newLazyReadChannel() {
protected LazyReadChannel<?, StorageObject> newLazyReadChannel() {
return new LazyReadChannel<>(
() ->
ResumableMedia.http()
.read()
.byteChannel(blobReadChannelContext)
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(getBufferHandle())
.setApiaryReadRequest(getApiaryReadRequest())
.build());
() -> {
ReadableByteChannelSessionBuilder b =
ResumableMedia.http()
.read()
.byteChannel(blobReadChannelContext)
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
// because we're erasing the specific type of channel, we need to declare it here.
// If we don't, the compiler complains we're not returning a compliant type.
ReadableByteChannelSession<?, StorageObject> session;
if (bufferHandle.capacity() > 0) {
session = b.buffered(bufferHandle).setApiaryReadRequest(getApiaryReadRequest()).build();
} else {
session = b.unbuffered().setApiaryReadRequest(getApiaryReadRequest()).build();
}
return session;
});
}

private ApiaryReadRequest getApiaryReadRequest() {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.storage.GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
import com.google.storage.v2.Object;
import com.google.storage.v2.ReadObjectRequest;
import com.google.storage.v2.ReadObjectResponse;
Expand Down Expand Up @@ -46,17 +47,27 @@ public RestorableState<ReadChannel> capture() {
}

@Override
protected LazyReadChannel<Object> newLazyReadChannel() {
protected LazyReadChannel<?, Object> newLazyReadChannel() {
return new LazyReadChannel<>(
() ->
ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(getBufferHandle())
.setReadObjectRequest(getReadObjectRequest())
.build());
() -> {
ReadableByteChannelSessionBuilder b =
ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression);
BufferHandle bufferHandle = getBufferHandle();
// because we're erasing the specific type of channel, we need to declare it here.
// If we don't, the compiler complains we're not returning a compliant type.
ReadableByteChannelSession<?, Object> session;
if (bufferHandle.capacity() > 0) {
session =
b.buffered(getBufferHandle()).setReadObjectRequest(getReadObjectRequest()).build();
} else {
session = b.unbuffered().setReadObjectRequest(getReadObjectRequest()).build();
}
return session;
});
}

@NonNull
Expand Down
Expand Up @@ -16,26 +16,26 @@

package com.google.cloud.storage;

import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;

final class LazyReadChannel<T> {
final class LazyReadChannel<RBC extends ReadableByteChannel, T> {

private final Supplier<BufferedReadableByteChannelSession<T>> sessionSupplier;
private final Supplier<ReadableByteChannelSession<RBC, T>> sessionSupplier;

@MonotonicNonNull private volatile BufferedReadableByteChannelSession<T> session;
@MonotonicNonNull private volatile BufferedReadableByteChannel channel;
@MonotonicNonNull private volatile ReadableByteChannelSession<RBC, T> session;
@MonotonicNonNull private volatile RBC channel;

private boolean open = false;

LazyReadChannel(Supplier<BufferedReadableByteChannelSession<T>> sessionSupplier) {
LazyReadChannel(Supplier<ReadableByteChannelSession<RBC, T>> sessionSupplier) {
this.sessionSupplier = sessionSupplier;
}

@NonNull
BufferedReadableByteChannel getChannel() {
RBC getChannel() {
if (channel != null) {
return channel;
} else {
Expand All @@ -50,7 +50,7 @@ BufferedReadableByteChannel getChannel() {
}

@NonNull
BufferedReadableByteChannelSession<T> getSession() {
ReadableByteChannelSession<RBC, T> getSession() {
if (session != null) {
return session;
} else {
Expand Down
Expand Up @@ -34,16 +34,18 @@ public final class LazyReadChannelTest {

@Test
public void repeatedCallsOfGetSessionMustReturnTheSameInstance() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannelSession<String> session1 = lrc.getSession();
BufferedReadableByteChannelSession<String> session2 = lrc.getSession();
ReadableByteChannelSession<BufferedReadableByteChannel, String> session1 = lrc.getSession();
ReadableByteChannelSession<BufferedReadableByteChannel, String> session2 = lrc.getSession();
assertThat(session1).isSameInstanceAs(session2);
}

@Test
public void repeatedCallsOfGetChannelMustReturnTheSameInstance() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannel channel1 = lrc.getChannel();
BufferedReadableByteChannel channel2 = lrc.getChannel();
Expand All @@ -52,7 +54,8 @@ public void repeatedCallsOfGetChannelMustReturnTheSameInstance() {

@Test
public void isNotOpenUntilGetChannelIsCalled() {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

assertThat(lrc.isOpen()).isFalse();
BufferedReadableByteChannel channel = lrc.getChannel();
Expand All @@ -63,7 +66,8 @@ public void isNotOpenUntilGetChannelIsCalled() {

@Test
public void closingUnderlyingChannelClosesTheLazyReadChannel() throws IOException {
LazyReadChannel<String> lrc = new LazyReadChannel<>(this::newTestSession);
LazyReadChannel<BufferedReadableByteChannel, String> lrc =
new LazyReadChannel<>(this::newTestSession);

BufferedReadableByteChannel channel = lrc.getChannel();
assertThat(channel.isOpen()).isTrue();
Expand Down
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.storage.it.runner.registry.Generator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -107,6 +108,35 @@ public void storageReadChannel_getObject_returns() throws Exception {
}
}

@Test
// @CrossRun.Exclude(transports = Transport.GRPC)
public void storageReadChannel_shouldAllowDisablingBufferingBySettingChunkSize_lteq0()
throws IOException {
int _512KiB = 512 * 1024;
int _1MiB = 1024 * 1024;

final BlobInfo info;
byte[] uncompressedBytes = DataGenerator.base64Characters().genBytes(_512KiB);
{
BlobInfo tmp = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
Blob gen1 = storage.create(tmp, uncompressedBytes, BlobTargetOption.doesNotExist());
info = gen1.asBlobInfo();
}

try (ReadChannel c = storage.reader(info.getBlobId())) {
c.setChunkSize(0);

ByteBuffer buf = ByteBuffer.allocate(_1MiB);
// Because this is unbuffered, the underlying channel will not necessarily fill up the buf
// in a single read call. Repeatedly read until full or EOF.
int read = fillFrom(buf, c);
assertThat(read).isEqualTo(_512KiB);
String actual = xxd(buf);
String expected = xxd(uncompressedBytes);
assertThat(actual).isEqualTo(expected);
}
}

@Test
public void storageReadChannel_getObject_404() {
BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName());
Expand All @@ -129,4 +159,17 @@ private static <T, F> void equalForField(T actual, T expected, Function<T, F> f)
F eF = f.apply(expected);
assertThat(aF).isEqualTo(eF);
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else {
break;
}
}
return total;
}
}

0 comments on commit 702ab2b

Please sign in to comment.