Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: update gRPC writeAndClose to only set finish_write on the last message #2163

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import org.checkerframework.checker.nullness.qual.NonNull;

final class GapicUnbufferedWritableByteChannel<
RequestFactoryT extends WriteObjectRequestBuilderFactory>
Expand Down Expand Up @@ -82,16 +83,7 @@ public boolean isOpen() {
@Override
public void close() throws IOException {
if (!finished) {
long offset = writeCtx.getTotalSentBytes().get();
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();

WriteObjectRequest.Builder b =
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
if (crc32cValue != null) {
b.setObjectChecksums(
ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
}
WriteObjectRequest message = b.build();
WriteObjectRequest message = finishMessage();
try {
flusher.close(message);
finished = true;
Expand Down Expand Up @@ -139,7 +131,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
.newRequestBuilder()
.setWriteOffset(offset)
.setChecksummedData(checksummedData.build());
if (!datum.isOnlyFullBlocks() || finalize) {
if (!datum.isOnlyFullBlocks()) {
builder.setFinishWrite(true);
if (cumulative != null) {
builder.setObjectChecksums(
Expand All @@ -152,6 +144,10 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
messages.add(build);
bytesConsumed += contentSize;
}
if (finalize && !finished) {
messages.add(finishMessage());
finished = true;
}

try {
flusher.flush(messages);
Expand All @@ -162,4 +158,18 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo

return bytesConsumed;
}

@NonNull
private WriteObjectRequest finishMessage() {
long offset = writeCtx.getTotalSentBytes().get();
Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();

WriteObjectRequest.Builder b =
writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
if (crc32cValue != null) {
b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
}
WriteObjectRequest message = b.build();
return message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ private static WriteObjectRequest possiblyPairDownRequest(
}

if (message.getWriteOffset() > 0) {
b.clearWriteObjectSpec().clearObjectChecksums();
b.clearWriteObjectSpec();
}

if (message.getWriteOffset() > 0 && !message.getFinishWrite()) {
b.clearObjectChecksums();
}
return b.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import com.google.storage.v2.Object;
import com.google.storage.v2.ObjectChecksums;
import com.google.storage.v2.StartResumableWriteRequest;
import com.google.storage.v2.StartResumableWriteResponse;
import com.google.storage.v2.StorageClient;
Expand Down Expand Up @@ -63,8 +64,9 @@ public final class ITGapicUnbufferedWritableByteChannelTest {
private static final Logger LOGGER =
Logger.getLogger(ITGapicUnbufferedWritableByteChannelTest.class.getName());

private static final Hasher HASHER = Hasher.enabled();
private static final ChunkSegmenter segmenter =
new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), 10, 5);
new ChunkSegmenter(HASHER, ByteStringStrategy.copy(), 10, 5);

private static final String uploadId = "upload-id";

Expand All @@ -80,31 +82,35 @@ public final class ITGapicUnbufferedWritableByteChannelTest {
private static final WriteObjectRequest req1 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 0, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 0, 10), HASHER))
.build();
private static final WriteObjectRequest req2 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(10)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 10, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 10, 10), HASHER))
.build();
private static final WriteObjectRequest req3 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(20)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 20, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 20, 10), HASHER))
.build();
private static final WriteObjectRequest req4 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(30)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 30, 10)))
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes, 30, 10), HASHER))
.build();
private static final WriteObjectRequest req5 =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setWriteOffset(40)
.setFinishWrite(true)
.setObjectChecksums(
ObjectChecksums.newBuilder()
.setCrc32C(HASHER.hash(ByteBuffer.wrap(bytes)).getValue())
.build())
.build();

private static final WriteObjectResponse resp1 =
Expand All @@ -123,35 +129,24 @@ public final class ITGapicUnbufferedWritableByteChannelTest {

@Test
public void directUpload() throws IOException, InterruptedException, ExecutionException {
Object obj = Object.newBuilder().setBucket("buck").setName("obj").build();
WriteObjectSpec spec = WriteObjectSpec.newBuilder().setResource(obj).build();

byte[] bytes = DataGenerator.base64Characters().genBytes(40);
WriteObjectRequest req1 =
WriteObjectRequest.newBuilder()
ITGapicUnbufferedWritableByteChannelTest.req1
.toBuilder()
.clearUploadId()
.setWriteObjectSpec(spec)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 0, 10)))
.build();
WriteObjectRequest req2 =
WriteObjectRequest.newBuilder()
.setWriteOffset(10)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 10, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req2.toBuilder().clearUploadId().build();
WriteObjectRequest req3 =
WriteObjectRequest.newBuilder()
.setWriteOffset(20)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 20, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req3.toBuilder().clearUploadId().build();
WriteObjectRequest req4 =
WriteObjectRequest.newBuilder()
.setWriteOffset(30)
.setChecksummedData(TestUtils.getChecksummedData(ByteString.copyFrom(bytes, 30, 10)))
.build();
ITGapicUnbufferedWritableByteChannelTest.req4.toBuilder().clearUploadId().build();
WriteObjectRequest req5 =
WriteObjectRequest.newBuilder().setWriteOffset(40).setFinishWrite(true).build();
ITGapicUnbufferedWritableByteChannelTest.req5.toBuilder().clearUploadId().build();

WriteObjectResponse resp =
WriteObjectResponse.newBuilder().setResource(obj.toBuilder().setSize(40)).build();
WriteObjectResponse resp = resp5;

WriteObjectRequest base = WriteObjectRequest.newBuilder().setWriteObjectSpec(spec).build();
WriteObjectRequestBuilderFactory reqFactory = WriteObjectRequestBuilderFactory.simple(base);
Expand Down Expand Up @@ -314,10 +309,7 @@ public boolean shouldRetry(Throwable t, Object ignore) {

@Test
public void resumableUpload_finalizeWhenWriteAndCloseCalledEvenWhenQuantumAligned()
throws IOException, InterruptedException, ExecutionException {
int quantum = 10;
ChunkSegmenter segmenter =
new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), 50, quantum);
throws IOException {
SettableApiFuture<WriteObjectResponse> result = SettableApiFuture.create();

AtomicReference<List<WriteObjectRequest>> actualFlush = new AtomicReference<>();
Expand All @@ -342,18 +334,10 @@ public void close(@Nullable WriteObjectRequest req) {
}
});

byte[] bytes = DataGenerator.base64Characters().genBytes(quantum);

long written = c.writeAndClose(ByteBuffer.wrap(bytes));
WriteObjectRequest expectedRequest =
WriteObjectRequest.newBuilder()
.setUploadId(uploadId)
.setChecksummedData(getChecksummedData(ByteString.copyFrom(bytes), Hasher.noop()))
.setFinishWrite(true)
.build();

assertThat(written).isEqualTo(10);
assertThat(actualFlush.get()).isEqualTo(ImmutableList.of(expectedRequest));
assertThat(written).isEqualTo(40);
assertThat(actualFlush.get()).isEqualTo(ImmutableList.of(req1, req2, req3, req4, req5));
// calling close is okay, as long as the provided request is null
assertThat(actualClose.get()).isAnyOf(closeRequestSentinel, null);
}
Expand Down