Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly implement GrpcBlobReadChannel#isOpen #1733

Merged
merged 2 commits into from Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -49,19 +49,18 @@ final class GrpcBlobReadChannel implements ReadChannel {
boolean autoGzipDecompression) {
this.lazyReadChannel =
new LazyReadChannel(
Suppliers.memoize(
() -> {
ReadObjectRequest req =
seekReadObjectRequest(request, position, sub(limit, position));
return ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(BufferHandle.allocate(chunkSize))
.setReadObjectRequest(req)
.build();
}));
() -> {
ReadObjectRequest req =
seekReadObjectRequest(request, position, sub(limit, position));
return ResumableMedia.gapic()
.read()
.byteChannel(read)
.setHasher(Hasher.noop())
.setAutoGzipDecompression(autoGzipDecompression)
.buffered(BufferHandle.allocate(chunkSize))
.setReadObjectRequest(req)
.build();
});
}

@Override
Expand All @@ -72,7 +71,7 @@ public void setChunkSize(int chunkSize) {

@Override
public boolean isOpen() {
return false;
return lazyReadChannel.isOpen() && lazyReadChannel.getChannel().isOpen();
}

@Override
Expand All @@ -81,8 +80,8 @@ public void close() {
try {
lazyReadChannel.getChannel().close();
} catch (IOException e) {
// TODO: why does ReadChannel remove IOException?!
throw new RuntimeException(e);
// why does ReadChannel remove IOException?!
throw StorageException.coalesce(e);
}
}
}
Expand Down Expand Up @@ -149,13 +148,24 @@ private static final class LazyReadChannel {
private final Supplier<BufferedReadableByteChannelSession<Object>> session;
private final Supplier<BufferedReadableByteChannel> channel;

private boolean open = false;

public LazyReadChannel(Supplier<BufferedReadableByteChannelSession<Object>> session) {
this.session = session;
this.channel = Suppliers.memoize(() -> session.get().open());
this.channel =
Suppliers.memoize(
() -> {
open = true;
return session.get().open();
});
}

public BufferedReadableByteChannel getChannel() {
return channel.get();
}

public boolean isOpen() {
return open;
}
}
}
Expand Up @@ -183,7 +183,8 @@ public static Iterable<Args<BucketField, BucketInfo>> parameters() {
BucketField.RPO,
(jsonT, grpcT) -> {
assertThat(jsonT.getRpo()).isEqualTo(Rpo.DEFAULT);
assertThat(grpcT.getRpo()).isNull();
// TODO: cleanup allowed null value in mid nov
assertThat(grpcT.getRpo()).isAnyOf(Rpo.DEFAULT, null);
}),
new Args<>(
BucketField.SELF_LINK,
Expand Down