Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
feat: enhancement for Batcher api to address bulk read (#833)
Browse files Browse the repository at this point in the history
* feat: enhancement for Batcher api to address bulk read

This commit contains enhancements to BatcherAPI.

Currently there is no ways to know input user has sent in BatchingDescriptor. There is a better chance that in case of duplicated/non-existenting input server would only sent response for unique elements.

After this change the elements sent for batching would be available to user inside BatchingDescriptor.

* feat(Batcher): updated BatchingDescriptor interface to include BatchEntry

BatchEntry would contain input element and it's corresponded result future.

* updated wordings and method name in BatcherEntry

* updated BatchEntry to be an `@AutoValue` class

* removed unneeded Builder from BatchEntry

* fixed typo in javadoc

* Marking the ElementT @nullable

* Marked batching interfaces to restrict end users usage
  • Loading branch information
rahulKQL authored and igorbernstein2 committed Jan 8, 2020
1 parent 90f3336 commit 4e4f08b
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 49 deletions.
61 changes: 61 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/BatchEntry.java
@@ -0,0 +1,61 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

/**
* This class contains the element and its corresponding unresolved future, which would be resolved
* when batch is {@link BatchingDescriptor#splitResponse successful} or {@link
* BatchingDescriptor#splitException failed}.
*
* @param <ElementT> The type of each individual element to be batched.
* @param <ElementResultT> The type of the result for each individual element.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalApi("For google-cloud-java client use only.")
@AutoValue
public abstract class BatchEntry<ElementT, ElementResultT> {

/** Returns a new BatchEntry */
public static <ElementT, ElementResultT> BatchEntry<ElementT, ElementResultT> create(
@Nullable ElementT element, SettableApiFuture<ElementResultT> resultFuture) {
return new AutoValue_BatchEntry<>(element, resultFuture);
}

@Nullable
public abstract ElementT getElement();

public abstract SettableApiFuture<ElementResultT> getResultFuture();
}
17 changes: 9 additions & 8 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
* resolves.
Expand Down Expand Up @@ -225,7 +226,7 @@ public void close() throws InterruptedException {
*/
private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final List<BatchEntry<ElementT, ElementResultT>> entries;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
Expand All @@ -241,7 +242,7 @@ private Batch(
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.results = new ArrayList<>();
this.entries = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
Expand All @@ -251,26 +252,26 @@ private Batch(

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
builder.add(element);
results.add(result);
entries.add(BatchEntry.create(element, result));
elementCounter++;
byteCounter += descriptor.countBytes(element);
}

void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, results);
batcherStats.recordBatchElementsCompletion(results);
descriptor.splitResponse(response, entries);
batcherStats.recordBatchElementsCompletion(entries);
} catch (Exception ex) {
onBatchFailure(ex);
}
}

void onBatchFailure(Throwable throwable) {
try {
descriptor.splitException(throwable, results);
descriptor.splitException(throwable, entries);
} catch (Exception ex) {
for (SettableApiFuture<ElementResultT> result : results) {
result.setException(ex);
for (BatchEntry<ElementT, ElementResultT> batchEntry : entries) {
batchEntry.getResultFuture().setException(ex);
}
}
batcherStats.recordBatchFailure(throwable);
Expand Down
Expand Up @@ -29,7 +29,6 @@
*/
package com.google.api.gax.batching;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -82,12 +81,12 @@ synchronized void recordBatchFailure(Throwable throwable) {
* <p>Note: This method aggregates all the subclasses of {@link ApiException} under ApiException
* using the {@link Code status codes} and its number of occurrences.
*/
synchronized <T extends ApiFuture> void recordBatchElementsCompletion(
synchronized <T extends BatchEntry> void recordBatchElementsCompletion(
List<T> batchElementResultFutures) {
boolean isRequestPartiallyFailed = false;
for (final ApiFuture elementResult : batchElementResultFutures) {
for (final BatchEntry elementResult : batchElementResultFutures) {
try {
elementResult.get();
elementResult.getResultFuture().get();
} catch (Throwable throwable) {

if (!isRequestPartiallyFailed) {
Expand Down
Expand Up @@ -30,8 +30,7 @@
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
import com.google.api.core.InternalApi;
import java.util.List;

/**
Expand Down Expand Up @@ -82,7 +81,7 @@
* @param <ResponseT> The type of the response that will be unpacked into individual element results
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
@InternalApi("For google-cloud-java client use only.")
public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> {

/**
Expand All @@ -92,10 +91,10 @@ public interface BatchingDescriptor<ElementT, ElementResultT, RequestT, Response
BatchingRequestBuilder<ElementT, RequestT> newRequestBuilder(RequestT prototype);

/** Unpacks the batch response into individual elements results. */
void splitResponse(ResponseT batchResponse, List<SettableApiFuture<ElementResultT>> batch);
void splitResponse(ResponseT batchResponse, List<BatchEntry<ElementT, ElementResultT>> batch);

/** Unpacks the batch response error into individual element errors. */
void splitException(Throwable throwable, List<SettableApiFuture<ElementResultT>> batch);
void splitException(Throwable throwable, List<BatchEntry<ElementT, ElementResultT>> batch);

/** Returns the size of the passed element object in bytes. */
long countBytes(ElementT element);
Expand Down
Expand Up @@ -30,7 +30,7 @@
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.InternalApi;

/**
* Adapter to pack individual elements into a larger batch request.
Expand All @@ -42,7 +42,7 @@
* @param <RequestT> The type of the request that will contain the accumulated elements.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
@InternalApi("For google-cloud-java client use only.")
public interface BatchingRequestBuilder<ElementT, RequestT> {

/** Adds element object into client specific batch request. */
Expand Down
Expand Up @@ -36,7 +36,6 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatcherImpl.BatcherReference;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
Expand Down Expand Up @@ -235,7 +234,7 @@ public void testExceptionInDescriptor() throws InterruptedException {
new SquarerBatchingDescriptorV2() {
@Override
public void splitResponse(
List<Integer> batchResponse, List<SettableApiFuture<Integer>> batch) {
List<Integer> batchResponse, List<BatchEntry<Integer, Integer>> batch) {
throw fakeError;
}
};
Expand Down Expand Up @@ -274,12 +273,13 @@ public void testExceptionInDescriptorErrorHandling() throws InterruptedException
new SquarerBatchingDescriptorV2() {
@Override
public void splitResponse(
List<Integer> batchResponse, List<SettableApiFuture<Integer>> batch) {
List<Integer> batchResponse, List<BatchEntry<Integer, Integer>> batch) {
throw fakeError;
}

@Override
public void splitException(Throwable throwable, List<SettableApiFuture<Integer>> batch) {
public void splitException(
Throwable throwable, List<BatchEntry<Integer, Integer>> batch) {
throw fakeError;
}
};
Expand Down Expand Up @@ -469,12 +469,12 @@ public void testPartialFailureWithSplitResponse() throws Exception {
new SquarerBatchingDescriptorV2() {
@Override
public void splitResponse(
List<Integer> batchResponse, List<SettableApiFuture<Integer>> batch) {
List<Integer> batchResponse, List<BatchEntry<Integer, Integer>> batch) {
for (int i = 0; i < batchResponse.size(); i++) {
if (batchResponse.get(i) > 10_000) {
batch.get(i).setException(new ArithmeticException());
batch.get(i).getResultFuture().setException(new ArithmeticException());
} else {
batch.get(i).set(batchResponse.get(i));
batch.get(i).getResultFuture().set(batchResponse.get(i));
}
}
}
Expand Down Expand Up @@ -521,7 +521,7 @@ public void testPartialFailureInResultProcessing() throws Exception {

@Override
public void splitResponse(
List<Integer> batchResponse, List<SettableApiFuture<Integer>> batch) {
List<Integer> batchResponse, List<BatchEntry<Integer, Integer>> batch) {
throw queue.poll();
}
};
Expand Down
34 changes: 18 additions & 16 deletions gax/src/test/java/com/google/api/gax/batching/BatcherStatsTest.java
Expand Up @@ -31,7 +31,7 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.testing.FakeStatusCode;
Expand Down Expand Up @@ -70,17 +70,19 @@ public void testRequestFailuresOnly() {
@Test
public void testEntryFailureOnly() {
BatcherStats batcherStats = new BatcherStats();

SettableApiFuture<Integer> batchOneResult = SettableApiFuture.create();
batchOneResult.setException(new IllegalStateException("local element failure"));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(
ApiFutures.immediateFailedFuture(new IllegalStateException("local element failure"))));
ImmutableList.of(BatchEntry.create(1, batchOneResult)));

SettableApiFuture<Integer> batchTwoResult = SettableApiFuture.create();
batchTwoResult.setException(
ApiExceptionFactory.createException(
new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(
ApiFutures.immediateFailedFuture(
ApiExceptionFactory.createException(
new RuntimeException(),
FakeStatusCode.of(StatusCode.Code.UNAVAILABLE),
false))));
ImmutableList.of(BatchEntry.create(2, batchTwoResult)));

BatchingException ex = batcherStats.asException();
assertThat(ex)
.hasMessageThat()
Expand All @@ -94,13 +96,13 @@ public void testRequestAndEntryFailures() {
BatcherStats batcherStats = new BatcherStats();

batcherStats.recordBatchFailure(new RuntimeException("Batch failure"));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(
ApiFutures.immediateFailedFuture(
ApiExceptionFactory.createException(
new RuntimeException(),
FakeStatusCode.of(StatusCode.Code.ALREADY_EXISTS),
false))));

SettableApiFuture<Integer> future = SettableApiFuture.create();
future.setException(
ApiExceptionFactory.createException(
new RuntimeException(), FakeStatusCode.of(StatusCode.Code.ALREADY_EXISTS), false));

batcherStats.recordBatchElementsCompletion(ImmutableList.of(BatchEntry.create(1, future)));

BatchingException ex = batcherStats.asException();
assertThat(ex)
Expand Down
Expand Up @@ -32,7 +32,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchEntry;
import com.google.api.gax.batching.BatchingRequestBuilder;
import com.google.api.gax.batching.PartitionKey;
import com.google.api.gax.batching.RequestBuilder;
Expand Down Expand Up @@ -204,16 +204,17 @@ public LabeledIntList build() {
}

@Override
public void splitResponse(List<Integer> batchResponse, List<SettableApiFuture<Integer>> batch) {
public void splitResponse(
List<Integer> batchResponse, List<BatchEntry<Integer, Integer>> batch) {
for (int i = 0; i < batchResponse.size(); i++) {
batch.get(i).set(batchResponse.get(i));
batch.get(i).getResultFuture().set(batchResponse.get(i));
}
}

@Override
public void splitException(Throwable throwable, List<SettableApiFuture<Integer>> batch) {
for (SettableApiFuture<Integer> result : batch) {
result.setException(throwable);
public void splitException(Throwable throwable, List<BatchEntry<Integer, Integer>> batch) {
for (BatchEntry<Integer, Integer> entry : batch) {
entry.getResultFuture().setException(throwable);
}
}

Expand Down

0 comments on commit 4e4f08b

Please sign in to comment.