Skip to content

Commit

Permalink
fix: update BlobWriteChannelV2 to properly carry forward offset after…
Browse files Browse the repository at this point in the history
… incremental flush (#2125)

The tests in ITObjectTest did not perform a flush before capturing the write channel and this flow was not previously validated.
  • Loading branch information
BenWhitehead committed Jul 18, 2023
1 parent c805051 commit c099a2f
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
private final SettableApiFuture<StorageObject> result;
private final LongConsumer committedBytesCallback;

private boolean open = true;
private boolean open;
private long cumulativeByteCount;
private boolean finished = false;
private boolean finished;

ApiaryUnbufferedWritableByteChannel(
HttpClientContext httpClientContext,
Expand All @@ -50,6 +50,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
this.open = true;
this.cumulativeByteCount = resumableWrite.getBeginOffset();
this.finished = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public WriteChannel restore() {
entity != null ? Conversions.apiary().blobInfo().encode(entity) : null;
return new BlobWriteChannelV2.BlobWriteChannelV2State(
(HttpStorageOptions) serviceOptions,
JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId),
JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId, position),
position,
isOpen,
chunkSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ static final class BlobWriteChannelV2State

@Override
public WriteChannel restore() {
JsonResumableWrite resumableWrite = this.resumableWrite;
if (position != null) {
resumableWrite = resumableWrite.withBeginOffset(position);
}
BlobWriteChannelV2 channel =
new BlobWriteChannelV2(BlobReadChannelContext.from(options), resumableWrite);
if (chunkSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void rewindTo(long offset) {
success = true;
//noinspection DataFlowIssue compareTo result will filter out actualSize == null
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
} else if (compare < 0) {
} else if (compare > 0) {
StorageException se =
JsonResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
uploadId, response, null, toString(storageObject));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.base.MoreObjects;
Expand All @@ -41,24 +43,40 @@ final class JsonResumableWrite implements Serializable {
@MonotonicNonNull private final String signedUrl;

@NonNull private final String uploadId;
private final long beginOffset;

private volatile String objectJson;

private JsonResumableWrite(
StorageObject object,
Map<StorageRpc.Option, ?> options,
String signedUrl,
@NonNull String uploadId) {
@NonNull String uploadId,
long beginOffset) {
this.object = object;
this.options = options;
this.signedUrl = signedUrl;
this.uploadId = uploadId;
this.beginOffset = beginOffset;
}

public @NonNull String getUploadId() {
return uploadId;
}

public long getBeginOffset() {
return beginOffset;
}

public JsonResumableWrite withBeginOffset(long newBeginOffset) {
checkArgument(
newBeginOffset >= beginOffset,
"New beginOffset must be >= existing beginOffset (%s >= %s)",
newBeginOffset,
beginOffset);
return new JsonResumableWrite(object, options, signedUrl, uploadId, newBeginOffset);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -68,15 +86,16 @@ public boolean equals(Object o) {
return false;
}
JsonResumableWrite that = (JsonResumableWrite) o;
return Objects.equals(object, that.object)
return beginOffset == that.beginOffset
&& Objects.equals(object, that.object)
&& Objects.equals(options, that.options)
&& Objects.equals(signedUrl, that.signedUrl)
&& uploadId.equals(that.uploadId);
&& Objects.equals(uploadId, that.uploadId);
}

@Override
public int hashCode() {
return Objects.hash(object, options, signedUrl, uploadId);
return Objects.hash(object, options, signedUrl, uploadId, beginOffset);
}

@Override
Expand All @@ -86,6 +105,7 @@ public String toString() {
.add("options", options)
.add("signedUrl", signedUrl)
.add("uploadId", uploadId)
.add("beginOffset", beginOffset)
.toString();
}

Expand All @@ -112,11 +132,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}

static JsonResumableWrite of(
StorageObject req, Map<StorageRpc.Option, ?> options, String uploadId) {
return new JsonResumableWrite(req, options, null, uploadId);
StorageObject req, Map<StorageRpc.Option, ?> options, String uploadId, long beginOffset) {
return new JsonResumableWrite(req, options, null, uploadId, beginOffset);
}

static JsonResumableWrite of(String signedUrl, String uploadId) {
return new JsonResumableWrite(null, null, signedUrl, uploadId);
static JsonResumableWrite of(String signedUrl, String uploadId, long beginOffset) {
return new JsonResumableWrite(null, null, signedUrl, uploadId, beginOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get());
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);

JsonResumableSession session =
ResumableSession.json(
Expand Down Expand Up @@ -671,7 +671,7 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options)
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get());
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
}

Expand All @@ -688,7 +688,7 @@ public StorageWriteChannel writer(URL signedURL) {
ResumableMedia.startUploadForSignedUrl(
getOptions(), signedURL, forResumableUploadSessionCreate);
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get());
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void rewindWillQueryStatusOnlyWhenDirty() throws Exception {
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down Expand Up @@ -167,7 +168,8 @@ public void retryAttemptWillReturnQueryResultIfPersistedSizeMatchesSpecifiedEndO
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down Expand Up @@ -234,7 +236,8 @@ public void rewindOfContentIsRelativeToItsBeginOffsetOfTheOverallObject() throws
URI endpoint = fakeHttpServer.getEndpoint();
String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID());

JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl);
JsonResumableWrite resumableWrite =
JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0);
JsonResumableSession session =
new JsonResumableSession(
httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ protected Restorable<?>[] restorableObjects() {
JsonResumableWrite.of(
Conversions.apiary().blobInfo().encode(BlobInfo.newBuilder("b", "n").build()),
ImmutableMap.of(),
"upload-id"));
"upload-id",
0));
return new Restorable<?>[] {readerV2, writer};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage.it;

import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -27,6 +28,7 @@
import com.google.api.client.json.JsonParser;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.NoCredentials;
import com.google.cloud.RestorableState;
import com.google.cloud.WriteChannel;
import com.google.cloud.conformance.storage.v1.InstructionList;
import com.google.cloud.conformance.storage.v1.Method;
Expand All @@ -53,6 +55,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Optional;
import java.util.logging.Logger;
import org.junit.Test;
Expand Down Expand Up @@ -153,6 +156,39 @@ public void changeChunkSizeAfterWrite() throws IOException {
}
}

@Test
public void restoreProperlyPlumbsBeginOffset() throws IOException {
BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
int _256KiB = 256 * 1024;

byte[] bytes1 = DataGenerator.base64Characters().genBytes(_256KiB);
byte[] bytes2 = DataGenerator.base64Characters().genBytes(73);

int allLength = bytes1.length + bytes2.length;
byte[] expected = Arrays.copyOf(bytes1, allLength);
System.arraycopy(bytes2, 0, expected, bytes1.length, bytes2.length);
String xxdExpected = xxd(expected);

RestorableState<WriteChannel> capture;
{
WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist());
writer.setChunkSize(_256KiB);
writer.write(ByteBuffer.wrap(bytes1));
// explicitly do not close writer, it will finalize the session
capture = writer.capture();
}

assertThat(capture).isNotNull();
WriteChannel restored = capture.restore();
restored.write(ByteBuffer.wrap(bytes2));
restored.close();

byte[] readAllBytes = storage.readAllBytes(info.getBlobId());
assertThat(readAllBytes).hasLength(expected.length);
String xxdActual = xxd(readAllBytes);
assertThat(xxdActual).isEqualTo(xxdExpected);
}

private void doJsonUnexpectedEOFTest(int contentSize, int cappedByteCount) throws IOException {
String blobPath = String.format("%s/%s/blob", generator.randomObjectName(), NOW_STRING);

Expand Down

0 comments on commit c099a2f

Please sign in to comment.