Skip to content

Commit

Permalink
feat: add abort() to writer
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-fa committed Aug 10, 2020
1 parent f8d6e15 commit 257317d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 0 deletions.
10 changes: 10 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
<method>com.google.cloud.storage.BucketInfo$Builder deleteLifecycleRules()</method>
<differenceType>7013</differenceType>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/Storage</className>
<method>void abort(com.google.cloud.WriteChannel)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>void abort(java.lang.String)</method>
</difference>
<difference>
<className>com/google/cloud/storage/BucketInfo$Builder</className>
<method>com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)</method>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ public void run() {
}
}

void cancelUpload() {
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
getOptions().getStorageRpcV1().abort(getUploadId());
}
}),
getOptions().getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

protected StateImpl.Builder stateBuilder() {
return StateImpl.builder(getOptions(), getEntity(), getUploadId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,17 @@ Blob createFrom(
*/
WriteChannel writer(URL signedURL);

/**
* Aborts resumeable upload. Further attempts to write to or close the channel will result in
* {@code StorageException} with the code {@code 499}.
*
* @param channel an instance crated either by {@link #writer(BlobInfo, BlobWriteOption...)} or by
* {@link #writer(URL)} method.
* @throws IllegalArgumentException if inappropriate channel is given
* @throws StorageException upon failure
*/
void abort(WriteChannel channel);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,15 @@ public BlobWriteChannel writer(URL signedURL) {
return new BlobWriteChannel(getOptions(), signedURL);
}

@Override
public void abort(WriteChannel channel) {
if (!(channel instanceof BlobWriteChannel)) {
throw new IllegalArgumentException("channel is not created by Storage");
}
BlobWriteChannel blobWriteChannel = (BlobWriteChannel) channel;
blobWriteChannel.cancelUpload();
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,48 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public void abort(String uploadId) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_ABORT);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest = storage.getRequestFactory().buildDeleteRequest(url);
int code;
String message;
IOException exception = null;
HttpResponse response = null;
try {
response = httpRequest.execute();
code = response.getStatusCode();
message = response.getStatusMessage();
} catch (HttpResponseException ex) {
exception = ex;
code = ex.getStatusCode();
message = ex.getStatusMessage();
} finally {
if (response != null) {
response.disconnect();
}
}
if (code != 499) {
if (exception != null) {
throw exception;
}
GoogleJsonError error = new GoogleJsonError();
error.setCode(code);
error.setMessage(message);
throw translate(error);
}
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

@Override
public String open(String signedURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class HttpStorageRpcSpans {
static final String SPAN_NAME_OPEN_REWRITE = getTraceSpanName("openRewrite(RewriteRequest)");
static final String SPAN_NAME_CONTINUE_REWRITE =
getTraceSpanName("continueRewrite(RewriteResponse)");
static final String SPAN_NAME_ABORT = getTraceSpanName("abort(String)");
static final String SPAN_NAME_GET_BUCKET_ACL = getTraceSpanName("getAcl(String,String,Map)");
static final String SPAN_NAME_DELETE_BUCKET_ACL =
getTraceSpanName("deleteAcl(String,String,Map)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ StorageObject writeWithResponse(
*/
RewriteResponse openRewrite(RewriteRequest rewriteRequest);

/**
* Aborts the given resumable upload.
*
* @throws StorageException upon failure
*/
void abort(String uploadId);

/**
* Continues rewriting on an already open rewrite channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public void abort(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public RewriteResponse continueRewrite(RewriteResponse previousResponse) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.api.core.ApiClock;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.Identity;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.security.spec.X509EncodedKeySpec;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.crypto.spec.SecretKeySpec;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -1312,4 +1314,40 @@ public void testWriterFailure() {
assertSame(STORAGE_FAILURE, e.getCause());
}
}

@Test
public void testAbort() {
final AtomicBoolean isAborted = new AtomicBoolean(false);
WriteChannel writer =
new BlobWriteChannel(options, "http://example.com") {
@Override
void cancelUpload() {
isAborted.set(true);
}
};
new StorageImpl(options).abort(writer);
assertTrue(isAborted.get());
}

@Test
public void testAbortNegative() {
WriteChannel writer =
new BaseWriteChannel(null, null, null) {
@Override
protected void flushBuffer(int i, boolean b) {
throw new IllegalStateException();
}

@Override
protected BaseState.Builder stateBuilder() {
throw new IllegalStateException();
}
};

try {
new StorageImpl(options).abort(writer);
} catch (IllegalArgumentException e) {
assertEquals("channel is not created by Storage", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3490,6 +3490,31 @@ public void testAutoContentTypeWriter() throws IOException {
testAutoContentType("writer");
}

@Test
public void testAbort() throws IOException {
BlobId blobId = BlobId.of(BUCKET, "abortedObject");
int chunkSize = 256 * 1024;
WriteChannel writer = storage.writer(BlobInfo.newBuilder(blobId).build());
writer.setChunkSize(chunkSize);
writer.write(ByteBuffer.wrap(BLOB_BYTE_CONTENT));
storage.abort(writer);
try {
writer.write(ByteBuffer.wrap(new byte[chunkSize]));
fail();
} catch (StorageException e) {
assertEquals(499, e.getCode());
assertTrue(e.getMessage().startsWith("499 Client Closed Request"));
}
try {
writer.close();
fail();
} catch (StorageException e) {
assertEquals(499, e.getCode());
assertTrue(e.getMessage().startsWith("499 Client Closed Request"));
}
assertNull(storage.get(blobId));
}

@Test
public void testRemoveBucketCORS() throws ExecutionException, InterruptedException {
String bucketName = RemoteStorageHelper.generateBucketName();
Expand Down

0 comments on commit 257317d

Please sign in to comment.