Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Commit

Permalink
updated BatchEntry to be an @AutoValue class
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulKQL committed Dec 30, 2019
1 parent 5bdf7cb commit 0e01126
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
39 changes: 21 additions & 18 deletions gax/src/main/java/com/google/api/gax/batching/BatchEntry.java
Expand Up @@ -32,37 +32,40 @@
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
import java.util.List;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

/**
* This class contains the element and it's corresponding unresolved future, which would be resolved
* when batch is {@link BatchingDescriptor#splitResponse(Object, List) successful} or {@link
* BatchingDescriptor#splitException(Throwable, List) failed}.
* when batch is {@link BatchingDescriptor#splitResponse successful} or {@link
* BatchingDescriptor#splitException failed}.
*
* @param <ElementT> The type of each individual element to be batched.
* @param <ElementResultT> The type of the result for each individual element.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
public class BatchEntry<ElementT, ElementResultT> {
private final ElementT element;
private final SettableApiFuture<ElementResultT> resultFuture;
@AutoValue
public abstract class BatchEntry<ElementT, ElementResultT> {

private BatchEntry(ElementT element, SettableApiFuture<ElementResultT> resultFuture) {
this.element = element;
this.resultFuture = resultFuture;
/** Get a new builder. */
public static <ElementT, ElementResultT> Builder<ElementT, ElementResultT> newBuilder() {
return new AutoValue_BatchEntry.Builder<>();
}

public ElementT getElement() {
return element;
}
@Nullable
public abstract ElementT getElement();

public SettableApiFuture<ElementResultT> getResultFuture() {
return resultFuture;
}
public abstract SettableApiFuture<ElementResultT> getResultFuture();

@AutoValue.Builder
public abstract static class Builder<ElementT, ElementResultT> {

public abstract Builder<ElementT, ElementResultT> setElement(ElementT element);

public abstract Builder<ElementT, ElementResultT> setResultFuture(
SettableApiFuture<ElementResultT> future);

static <ElementT, ElementResultT> BatchEntry<ElementT, ElementResultT> create(
ElementT element, SettableApiFuture<ElementResultT> future) {
return new BatchEntry<>(element, future);
public abstract BatchEntry<ElementT, ElementResultT> build();
}
}
21 changes: 13 additions & 8 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Queues up the elements until {@link #flush()} is called; once batching is over, returned future
* resolves.
Expand Down Expand Up @@ -225,7 +226,7 @@ public void close() throws InterruptedException {
*/
private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<BatchEntry<ElementT, ElementResultT>> results;
private final List<BatchEntry<ElementT, ElementResultT>> entries;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
Expand All @@ -241,7 +242,7 @@ private Batch(
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.results = new ArrayList<>();
this.entries = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
Expand All @@ -251,26 +252,30 @@ private Batch(

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
builder.add(element);
results.add(BatchEntry.create(element, result));
entries.add(
BatchEntry.<ElementT, ElementResultT>newBuilder()
.setElement(element)
.setResultFuture(result)
.build());
elementCounter++;
byteCounter += descriptor.countBytes(element);
}

void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, results);
batcherStats.recordBatchElementsCompletion(results);
descriptor.splitResponse(response, entries);
batcherStats.recordBatchElementsCompletion(entries);
} catch (Exception ex) {
onBatchFailure(ex);
}
}

void onBatchFailure(Throwable throwable) {
try {
descriptor.splitException(throwable, results);
descriptor.splitException(throwable, entries);
} catch (Exception ex) {
for (BatchEntry<ElementT, ElementResultT> result : results) {
result.getResultFuture().setException(ex);
for (BatchEntry<ElementT, ElementResultT> batchEntry : entries) {
batchEntry.getResultFuture().setException(ex);
}
}
batcherStats.recordBatchFailure(throwable);
Expand Down
Expand Up @@ -74,14 +74,22 @@ public void testEntryFailureOnly() {
SettableApiFuture<Integer> batchOneResult = SettableApiFuture.create();
batchOneResult.setException(new IllegalStateException("local element failure"));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(BatchEntry.create(1, batchOneResult)));
ImmutableList.of(
BatchEntry.<Integer, Integer>newBuilder()
.setElement(1)
.setResultFuture(batchOneResult)
.build()));

SettableApiFuture<Integer> batchTwoResult = SettableApiFuture.create();
batchTwoResult.setException(
ApiExceptionFactory.createException(
new RuntimeException(), FakeStatusCode.of(StatusCode.Code.UNAVAILABLE), false));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(BatchEntry.create(2, batchTwoResult)));
ImmutableList.of(
BatchEntry.<Integer, Integer>newBuilder()
.setElement(2)
.setResultFuture(batchTwoResult)
.build()));

BatchingException ex = batcherStats.asException();
assertThat(ex)
Expand All @@ -102,7 +110,12 @@ public void testRequestAndEntryFailures() {
ApiExceptionFactory.createException(
new RuntimeException(), FakeStatusCode.of(StatusCode.Code.ALREADY_EXISTS), false));

batcherStats.recordBatchElementsCompletion(ImmutableList.of(BatchEntry.create(1, future)));
batcherStats.recordBatchElementsCompletion(
ImmutableList.of(
BatchEntry.<Integer, Integer>newBuilder()
.setElement(1)
.setResultFuture(future)
.build()));

BatchingException ex = batcherStats.asException();
assertThat(ex)
Expand Down

0 comments on commit 0e01126

Please sign in to comment.