Skip to content

Commit

Permalink
fix: make Blob and Bucket update diff aware (#1994)
Browse files Browse the repository at this point in the history
If a blob or bucket update does not actually contain a diff, instead of sending the empty update refresh the metadata.

Add integration tests to verify behavior of updating object or bucket with no modification
  • Loading branch information
BenWhitehead committed Apr 28, 2023
1 parent 6652ad8 commit 0adeb14
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.checkerframework.checker.nullness.qual.Nullable;

@BetaApi
final class GrpcStorageImpl extends BaseService<StorageOptions> implements Storage {
Expand Down Expand Up @@ -381,17 +382,8 @@ public Blob createFrom(

@Override
public Bucket get(String bucket, BucketGetOption... options) {
Opts<BucketSourceOpt> opts = Opts.unwrap(options).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
GetBucketRequest.Builder builder =
GetBucketRequest.newBuilder().setName(bucketNameCodec.encode(bucket));
GetBucketRequest req = opts.getBucketsRequest().apply(builder).build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.getBucketCallable().call(req, grpcCallContext),
syntaxDecoders.bucket.andThen(opts.clearBucketFields()));
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
return internalBucketGet(bucket, unwrap);
}

@Override
Expand All @@ -418,26 +410,8 @@ public Blob get(String bucket, String blob, BlobGetOption... options) {

@Override
public Blob get(BlobId blob, BlobGetOption... options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
GetObjectRequest.Builder builder =
GetObjectRequest.newBuilder()
.setBucket(bucketNameCodec.encode(blob.getBucket()))
.setObject(blob.getName());
ifNonNull(blob.getGeneration(), builder::setGeneration);
GetObjectRequest req = opts.getObjectsRequest().apply(builder).build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
return storageClient.getObjectCallable().call(req, grpcCallContext);
} catch (NotFoundException ignore) {
return null;
}
},
syntaxDecoders.blob.andThen(opts.clearBlobFields()));
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
return internalBlobGet(blob, unwrap);
}

@Override
Expand Down Expand Up @@ -493,7 +467,11 @@ public Page<Blob> list(String bucket, BlobListOption... options) {

@Override
public Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) {
Opts<BucketTargetOpt> opts = Opts.unwrap(options).resolveFrom(bucketInfo).prepend(defaultOpts);
Opts<BucketTargetOpt> unwrap = Opts.unwrap(options);
if (bucketInfo.getModifiedFields().isEmpty()) {
return internalBucketGet(bucketInfo.getName(), unwrap.constrainTo(BucketSourceOpt.class));
}
Opts<BucketTargetOpt> opts = unwrap.resolveFrom(bucketInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
com.google.storage.v2.Bucket bucket = codecs.bucketInfo().encode(bucketInfo);
Expand All @@ -515,7 +493,11 @@ public Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) {

@Override
public Blob update(BlobInfo blobInfo, BlobTargetOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
Opts<ObjectTargetOpt> unwrap = Opts.unwrap(options);
if (blobInfo.getModifiedFields().isEmpty()) {
return internalBlobGet(blobInfo.getBlobId(), unwrap.constrainTo(ObjectSourceOpt.class));
}
Opts<ObjectTargetOpt> opts = unwrap.resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
Object object = codecs.blobInfo().encode(blobInfo);
Expand Down Expand Up @@ -1963,4 +1945,43 @@ private static Hasher getHasherForRequest(WriteObjectRequest req, Hasher default
}
}
}

@Nullable
private Blob internalBlobGet(BlobId blob, Opts<ObjectSourceOpt> unwrap) {
Opts<ObjectSourceOpt> opts = unwrap.resolveFrom(blob).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
GetObjectRequest.Builder builder =
GetObjectRequest.newBuilder()
.setBucket(bucketNameCodec.encode(blob.getBucket()))
.setObject(blob.getName());
ifNonNull(blob.getGeneration(), builder::setGeneration);
GetObjectRequest req = opts.getObjectsRequest().apply(builder).build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
try {
return storageClient.getObjectCallable().call(req, grpcCallContext);
} catch (NotFoundException ignore) {
return null;
}
},
syntaxDecoders.blob.andThen(opts.clearBlobFields()));
}

@Nullable
private Bucket internalBucketGet(String bucket, Opts<BucketSourceOpt> unwrap) {
Opts<BucketSourceOpt> opts = unwrap.prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
GetBucketRequest.Builder builder =
GetBucketRequest.newBuilder().setName(bucketNameCodec.encode(bucket));
GetBucketRequest req = opts.getBucketsRequest().apply(builder).build();
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> storageClient.getBucketCallable().call(req, grpcCallContext),
syntaxDecoders.bucket.andThen(opts.clearBucketFields()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,8 @@ private static void uploadHelper(ReadableByteChannel reader, WriteChannel writer

@Override
public Bucket get(String bucket, BucketGetOption... options) {
final com.google.api.services.storage.model.Bucket bucketPb =
codecs.bucketInfo().encode(BucketInfo.of(bucket));
ImmutableMap<StorageRpc.Option, ?> optionsMap = Opts.unwrap(options).getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForBucketsGet(bucketPb, optionsMap);
return run(
algorithm,
() -> storageRpc.get(bucketPb, optionsMap),
(b) -> Conversions.apiary().bucketInfo().decode(b).asBucket(this));
return internalBucketGet(bucket, optionsMap);
}

@Override
Expand All @@ -288,18 +281,9 @@ public Blob get(String bucket, String blob, BlobGetOption... options) {

@Override
public Blob get(BlobId blob, BlobGetOption... options) {
final StorageObject storedObject = codecs.blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap);
return run(
algorithm,
() -> storageRpc.get(storedObject, optionsMap),
(x) -> {
BlobInfo info = Conversions.apiary().blobInfo().decode(x);
return info.asBlob(this);
});
return internalGetBlob(blob, optionsMap);
}

@Override
Expand Down Expand Up @@ -437,32 +421,42 @@ private static Page<Blob> listBlobs(

@Override
public Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) {
final com.google.api.services.storage.model.Bucket bucketPb =
codecs.bucketInfo().encode(bucketInfo);
final Map<StorageRpc.Option, ?> optionsMap =
Map<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(bucketInfo).getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForBucketsUpdate(bucketPb, optionsMap);
return run(
algorithm,
() -> storageRpc.patch(bucketPb, optionsMap),
(x) -> Conversions.apiary().bucketInfo().decode(x).asBucket(this));
if (bucketInfo.getModifiedFields().isEmpty()) {
return internalBucketGet(bucketInfo.getName(), optionsMap);
} else {
com.google.api.services.storage.model.Bucket bucketPb =
codecs.bucketInfo().encode(bucketInfo);
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForBucketsUpdate(bucketPb, optionsMap);
return run(
algorithm,
() -> storageRpc.patch(bucketPb, optionsMap),
(x) -> Conversions.apiary().bucketInfo().decode(x).asBucket(this));
}
}

@Override
public Blob update(BlobInfo blobInfo, BlobTargetOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
boolean unmodifiedBeforeOpts = blobInfo.getModifiedFields().isEmpty();
BlobInfo updated = opts.blobInfoMapper().apply(blobInfo.toBuilder()).build();
StorageObject pb = codecs.blobInfo().encode(updated);
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsUpdate(pb, optionsMap);
return run(
algorithm,
() -> storageRpc.patch(pb, optionsMap),
(x) -> {
BlobInfo info = Conversions.apiary().blobInfo().decode(x);
return info.asBlob(this);
});
boolean unmodifiedAfterOpts = updated.getModifiedFields().isEmpty();
if (unmodifiedBeforeOpts && unmodifiedAfterOpts) {
return internalGetBlob(blobInfo.getBlobId(), optionsMap);
} else {
StorageObject pb = codecs.blobInfo().encode(updated);
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsUpdate(pb, optionsMap);
return run(
algorithm,
() -> storageRpc.patch(pb, optionsMap),
(x) -> {
BlobInfo info = Conversions.apiary().blobInfo().decode(x);
return info.asBlob(this);
});
}
}

@Override
Expand Down Expand Up @@ -1527,4 +1521,28 @@ public boolean deleteNotification(final String bucket, final String notification
public HttpStorageOptions getOptions() {
return (HttpStorageOptions) super.getOptions();
}

private Blob internalGetBlob(BlobId blob, Map<StorageRpc.Option, ?> optionsMap) {
StorageObject storedObject = codecs.blobId().encode(blob);
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap);
return run(
algorithm,
() -> storageRpc.get(storedObject, optionsMap),
(x) -> {
BlobInfo info = Conversions.apiary().blobInfo().decode(x);
return info.asBlob(this);
});
}

private Bucket internalBucketGet(String bucket, Map<StorageRpc.Option, ?> optionsMap) {
com.google.api.services.storage.model.Bucket bucketPb =
codecs.bucketInfo().encode(BucketInfo.of(bucket));
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForBucketsGet(bucketPb, optionsMap);
return run(
algorithm,
() -> storageRpc.get(bucketPb, optionsMap),
(b) -> Conversions.apiary().bucketInfo().decode(b).asBucket(this));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2328,6 +2328,16 @@ Opts<T> prepend(Opts<? extends T> toPrepend) {
return new Opts<>(list);
}

/**
* Create a new instance of {@code Opts<R>} consisting of those {@code Opt}s which are also an
* {@code R}.
*
* <p>i.e. Given {@code Opts<ObjectTargetOpt>} produce {@code Opts<ObjectSourceOpt>}
*/
<R extends Opt> Opts<R> constrainTo(Class<R> c) {
return new Opts<>(filterTo(c).collect(ImmutableList.toImmutableList()));
}

private Mapper<ImmutableMap.Builder<StorageRpc.Option, Object>> rpcOptionMapper() {
return fuseMappers(RpcOptVal.class, RpcOptVal::mapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ final class RpcMethodMappings {
methodGroupIs("storage.resumable.upload");

static final int _2MiB = 2 * 1024 * 1024;
private static final ImmutableMap<String, String> MODIFY = ImmutableMap.of("a", "b");
final Multimap<RpcMethod, RpcMethodMapping> funcMap;

RpcMethodMappings() {
Expand Down Expand Up @@ -552,7 +553,16 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withTest(
(ctx, c) ->
ctx.map(state -> state.with(ctx.getStorage().update(state.getBucket()))))
ctx.map(
state ->
state.with(
ctx.getStorage()
.update(
state
.getBucket()
.toBuilder()
.setLabels(MODIFY)
.build()))))
.build());
a.add(
RpcMethodMapping.newBuilder(122, buckets.patch)
Expand All @@ -564,7 +574,7 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
state.with(
ctx.getStorage()
.update(
state.getBucket(),
state.getBucket().toBuilder().setLabels(MODIFY).build(),
BucketTargetOption.metagenerationMatch()))))
.build());
a.add(
Expand All @@ -577,12 +587,25 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
state.with(
state
.getBucket()
.toBuilder()
.setLabels(MODIFY)
.build()
.update(BucketTargetOption.metagenerationMatch()))))
.build());
a.add(
RpcMethodMapping.newBuilder(243, buckets.patch)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withTest((ctx, c) -> ctx.map(state -> state.with(state.getBucket().update())))
.withTest(
(ctx, c) ->
ctx.map(
state ->
state.with(
state
.getBucket()
.toBuilder()
.setLabels(MODIFY)
.build()
.update())))
.build());
}

Expand Down Expand Up @@ -1860,7 +1883,15 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
.withTest(
(ctx, c) ->
ctx.map(
state -> state.with(ctx.getStorage().update(ctx.getState().getBlob()))))
state ->
state.with(
ctx.getStorage()
.update(
ctx.getState()
.getBlob()
.toBuilder()
.setMetadata(MODIFY)
.build()))))
.build());
a.add(
RpcMethodMapping.newBuilder(57, objects.patch)
Expand All @@ -1872,13 +1903,21 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
state.with(
ctx.getStorage()
.update(
ctx.getState().getBlob(),
ctx.getState()
.getBlob()
.toBuilder()
.setMetadata(MODIFY)
.build(),
BlobTargetOption.metagenerationMatch()))))
.build());
a.add(
RpcMethodMapping.newBuilder(79, objects.patch)
.withApplicable(not(TestRetryConformance::isPreconditionsProvided))
.withTest((ctx, c) -> ctx.peek(state -> state.getBlob().update()))
.withTest(
(ctx, c) ->
ctx.peek(
state ->
state.getBlob().toBuilder().setMetadata(MODIFY).build().update()))
.build());
a.add(
RpcMethodMapping.newBuilder(80, objects.patch)
Expand All @@ -1890,6 +1929,9 @@ private static void patch(ArrayList<RpcMethodMapping> a) {
state.with(
state
.getBlob()
.toBuilder()
.setMetadata(MODIFY)
.build()
.update(BlobTargetOption.metagenerationMatch()))))
.build());
}
Expand Down

0 comments on commit 0adeb14

Please sign in to comment.