Skip to content

Commit

Permalink
fix: properly implement GrpcBlobReadChannel#isOpen (#1733)
Browse files Browse the repository at this point in the history
* fix: fix GrpcBlobReadChannel#isOpen

Previously a subbed method was still being used, now it will check the lazy channel.

Removed duplicate memoization of channel factory

* chore: update assertion for possible RPO values via gRPC
  • Loading branch information
BenWhitehead committed Oct 24, 2022
1 parent d5e937f commit 04e5166
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 18 deletions.
Expand Up @@ -49,19 +49,18 @@ final class GrpcBlobReadChannel implements ReadChannel {
boolean autoGzipDecompression) {
this.lazyReadChannel =
new LazyReadChannel(
Suppliers.memoize(
() -> {
ReadObjectRequest req =
seekReadObjectRequest(request, position, sub(limit, position));
return ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(BufferHandle.allocate(chunkSize))
.setReadObjectRequest(req)
.build();
}));
() -> {
ReadObjectRequest req =
seekReadObjectRequest(request, position, sub(limit, position));
return ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(BufferHandle.allocate(chunkSize))
.setReadObjectRequest(req)
.build();
});
}

@Override
Expand All @@ -72,7 +71,7 @@ public void setChunkSize(int chunkSize) {

@Override
public boolean isOpen() {
return false;
return lazyReadChannel.isOpen() && lazyReadChannel.getChannel().isOpen();
}

@Override
Expand All @@ -81,8 +80,8 @@ public void close() {
try {
lazyReadChannel.getChannel().close();
} catch (IOException e) {
// TODO: why does ReadChannel remove IOException?!
throw new RuntimeException(e);
// why does ReadChannel remove IOException?!
throw StorageException.coalesce(e);
}
}
}
Expand Down Expand Up @@ -149,13 +148,24 @@ private static final class LazyReadChannel {
private final Supplier<BufferedReadableByteChannelSession<Object>> session;
private final Supplier<BufferedReadableByteChannel> channel;

private boolean open = false;

public LazyReadChannel(Supplier<BufferedReadableByteChannelSession<Object>> session) {
this.session = session;
this.channel = Suppliers.memoize(() -> session.get().open());
this.channel =
Suppliers.memoize(
() -> {
open = true;
return session.get().open();
});
}

public BufferedReadableByteChannel getChannel() {
return channel.get();
}

public boolean isOpen() {
return open;
}
}
}
Expand Up @@ -183,7 +183,8 @@ public static Iterable<Args<BucketField, BucketInfo>> parameters() {
BucketField.RPO,
(jsonT, grpcT) -> {
assertThat(jsonT.getRpo()).isEqualTo(Rpo.DEFAULT);
assertThat(grpcT.getRpo()).isNull();
// TODO: cleanup allowed null value in mid nov
assertThat(grpcT.getRpo()).isAnyOf(Rpo.DEFAULT, null);
}),
new Args<>(
BucketField.SELF_LINK,
Expand Down

0 comments on commit 04e5166

Please sign in to comment.