Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add abort() to writer #460

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
<method>com.google.cloud.storage.BucketInfo$Builder deleteLifecycleRules()</method>
<differenceType>7013</differenceType>
</difference>
<difference>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet again, we're getting stuck because of excessive use of interfaces. Either there are multiple implementations of Storage and we shouldn't make this change or we should bite the bullet and combine Storage and StorageImpl once and for all. Ditto StorageRpc and HttpStorageRpc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankyn what do you think about combining Storage and StorageImpl?

<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>void abort(com.google.cloud.WriteChannel)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>void abort(java.lang.String)</method>
</difference>
<difference>
<className>com/google/cloud/storage/BucketInfo$Builder</className>
<method>com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)</method>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ public void run() {
}
}

void cancelUpload() {
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
getOptions().getStorageRpcV1().abort(getUploadId());
}
}),
getOptions().getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

protected StateImpl.Builder stateBuilder() {
return StateImpl.builder(getOptions(), getEntity(), getUploadId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,17 @@ Blob createFrom(
*/
WriteChannel writer(URL signedURL);

/**
* Aborts resumeable upload. Further attempts to write to or close the channel will result in
* {@code StorageException} with the code {@code 499}.
*
* @param channel an instance crated either by {@link #writer(BlobInfo, BlobWriteOption...)} or by
* {@link #writer(URL)} method.
* @throws IllegalArgumentException if inappropriate channel is given
* @throws StorageException upon failure
*/
void abort(WriteChannel channel);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,15 @@ public BlobWriteChannel writer(URL signedURL) {
return new BlobWriteChannel(getOptions(), signedURL);
}

@Override
public void abort(WriteChannel channel) {
if (!(channel instanceof BlobWriteChannel)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not make the method argument a BlobWriteChannel then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elharo BlobWriteChannel is a package-private class. If it were public we could invoke channel.abort() directly.

throw new IllegalArgumentException("channel is not created by Storage");
}
BlobWriteChannel blobWriteChannel = (BlobWriteChannel) channel;
blobWriteChannel.cancelUpload();
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,48 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public void abort(String uploadId) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_ABORT);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest = storage.getRequestFactory().buildDeleteRequest(url);
int code;
String message;
IOException exception = null;
HttpResponse response = null;
try {
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
} catch (HttpResponseException ex) {
exception = ex;
code = ex.getStatusCode();
message = ex.getStatusMessage();
} finally {
if (response != null) {
response.disconnect();
}
}
if (code != 499) {
if (exception != null) {
throw exception;
}
GoogleJsonError error = new GoogleJsonError();
error.setCode(code);
error.setMessage(message);
throw translate(error);
}
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

@Override
public String open(String signedURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HttpStorageRpcSpans {
static final String SPAN_NAME_OPEN_REWRITE = getTraceSpanName("openRewrite(RewriteRequest)");
static final String SPAN_NAME_CONTINUE_REWRITE =
getTraceSpanName("continueRewrite(RewriteResponse)");
static final String SPAN_NAME_ABORT = getTraceSpanName("abort(String)");
static final String SPAN_NAME_GET_BUCKET_ACL = getTraceSpanName("getAcl(String,String,Map)");
static final String SPAN_NAME_DELETE_BUCKET_ACL =
getTraceSpanName("deleteAcl(String,String,Map)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ StorageObject writeWithResponse(
*/
RewriteResponse openRewrite(RewriteRequest rewriteRequest);

/**
* Aborts the given resumable upload.
*
* @throws StorageException upon failure
*/
void abort(String uploadId);

/**
* Continues rewriting on an already open rewrite channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void abort(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public RewriteResponse continueRewrite(RewriteResponse previousResponse) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.api.core.ApiClock;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.Identity;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.security.spec.X509EncodedKeySpec;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.crypto.spec.SecretKeySpec;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -1312,4 +1314,40 @@ public void testWriterFailure() {
assertSame(STORAGE_FAILURE, e.getCause());
}
}

@Test
public void testAbort() {
final AtomicBoolean isAborted = new AtomicBoolean(false);
WriteChannel writer =
new BlobWriteChannel(options, "http://example.com") {
@Override
void cancelUpload() {
isAborted.set(true);
}
};
new StorageImpl(options).abort(writer);
assertTrue(isAborted.get());
}

@Test
public void testAbortNegative() {
WriteChannel writer =
new BaseWriteChannel(null, null, null) {
@Override
protected void flushBuffer(int i, boolean b) {
throw new IllegalStateException();
}

@Override
protected BaseState.Builder stateBuilder() {
throw new IllegalStateException();
}
};

try {
new StorageImpl(options).abort(writer);
} catch (IllegalArgumentException e) {
assertEquals("channel is not created by Storage", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3490,6 +3490,31 @@ public void testAutoContentTypeWriter() throws IOException {
testAutoContentType("writer");
}

@Test
public void testAbort() throws IOException {
BlobId blobId = BlobId.of(BUCKET, "abortedObject");
int chunkSize = 256 * 1024;
WriteChannel writer = storage.writer(BlobInfo.newBuilder(blobId).build());
writer.setChunkSize(chunkSize);
writer.write(ByteBuffer.wrap(BLOB_BYTE_CONTENT));
storage.abort(writer);
try {
writer.write(ByteBuffer.wrap(new byte[chunkSize]));
fail();
} catch (StorageException e) {
assertEquals(499, e.getCode());
assertTrue(e.getMessage().startsWith("499 Client Closed Request"));
}
try {
writer.close();
fail();
} catch (StorageException e) {
assertEquals(499, e.getCode());
assertTrue(e.getMessage().startsWith("499 Client Closed Request"));
}
assertNull(storage.get(blobId));
}

@Test
public void testRemoveBucketCORS() throws ExecutionException, InterruptedException {
String bucketName = RemoteStorageHelper.generateBucketName();
Expand Down