Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Track number of failure in batching when Batcher#close is called #800

Merged
merged 11 commits into from Nov 4, 2019
16 changes: 13 additions & 3 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -85,6 +85,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private final BatcherStats batcherStats = new BatcherStats();

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
Expand All @@ -107,7 +108,7 @@ public BatcherImpl(
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
Preconditions.checkNotNull(executor, "executor cannot be null");
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);

if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
Expand Down Expand Up @@ -154,7 +155,7 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
}

final ApiFuture<ResponseT> batchResponse =
Expand All @@ -176,6 +177,7 @@ public void onSuccess(ResponseT response) {
@Override
public void onFailure(Throwable throwable) {
try {
batcherStats.recordBatchFailure(throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we push this down into Batch to make it symmetrical with onBatchSuccess?

Copy link
Contributor Author

@rahulKQL rahulKQL Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried with the same approach, but If some exception occurs while post-processing in splitResponse() then we would end up with 1 batch and n partial failures even though some elements in the batch could be successful.

void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, results);
} catch (Exception ex) {
onBatchFailure(ex);
}
}
void onBatchFailure(Throwable throwable) {
try {
descriptor.splitException(throwable, results);
} catch (Exception ex) {
for (SettableApiFuture<ElementResultT> result : results) {
result.setException(ex);
}
}
}

I thought that would be a misleading figure, that why I kept the BatcherStats#recordBatchFailure here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to keep the code symmetrical and then have a follow up PR that handles failures from splitResponse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, Updated this change with symmetrical code and added a TODO as well for splitResponse().

accumulatedBatch.onBatchFailure(throwable);
} finally {
onBatchCompletion();
Expand Down Expand Up @@ -212,6 +214,10 @@ public void close() throws InterruptedException {
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
BatchingException exception = batcherStats.asException();
if (exception != null) {
throw exception;
}
}

/**
Expand All @@ -222,6 +228,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
private final long bytesThreshold;

Expand All @@ -231,14 +238,16 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings) {
BatchingSettings batchingSettings,
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.results = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batcherStats = batcherStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
Expand All @@ -250,6 +259,7 @@ void add(ElementT element, SettableApiFuture<ElementResultT> result) {

void onBatchSuccess(ResponseT response) {
try {
batcherStats.recordBatchElementsCompletion(results);
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
descriptor.splitResponse(response, results);
} catch (Exception ex) {
onBatchFailure(ex);
Expand Down
194 changes: 194 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/BatcherStats.java
@@ -0,0 +1,194 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
* This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link
* Batcher}. This provides the count of individual exception failure and count of each failed {@link
* StatusCode.Code} occurred in the batching process.
*/
class BatcherStats {

private final Map<Class, Integer> requestExceptionCounts = new ConcurrentHashMap<>();
private final Map<StatusCode.Code, Integer> requestStatusCounts = new ConcurrentHashMap<>();
private final AtomicInteger partialBatchFailures = new AtomicInteger(0);
private final Map<Class, Integer> entryExceptionCounts = new ConcurrentHashMap<>();
private final Map<StatusCode.Code, Integer> entryStatusCounts = new ConcurrentHashMap<>();
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

private final Object lock = new Object();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved

synchronized void recordBatchFailure(Throwable throwable) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
Class exceptionClass = throwable.getClass();

synchronized (lock) {
if (throwable instanceof ApiException) {
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode();
exceptionClass = ApiException.class;

if (requestStatusCounts.containsKey(code)) {
Integer codeCount = requestStatusCounts.get(code);
requestStatusCounts.put(code, ++codeCount);

} else {
requestStatusCounts.put(code, 1);
}
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
}

if (requestExceptionCounts.containsKey(exceptionClass)) {
Integer exCount = requestExceptionCounts.get(exceptionClass);
requestExceptionCounts.put(exceptionClass, ++exCount);
} else {
requestExceptionCounts.put(exceptionClass, 1);
}
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
}
}

<T> void recordBatchElementsCompletion(List<SettableApiFuture<T>> batchElementResultFutures) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
final AtomicBoolean markBatchFailure = new AtomicBoolean();

for (ApiFuture<T> elementResult : batchElementResultFutures) {
ApiFutures.addCallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think try/catch would be easier to read:

for (ApiFuture<T> elementFuture : batchElementResultFutures) {
  try {
    elementFuture.get();
  } catch (Throwable t) {
    ....
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also remove the atomic above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood correctly, you meant to block and check if any entry is being failed after splitResponse() is executed. I believe going with this would create a lot of blocking time until each entry in the batch finishes applying the mutation. If possible I would like to avoid that.

I hope after the last commit, this method looks better, Can you please take a look and let me know your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the element results should be resolved at this point, so nothing should block. If an entry is not resolved at this point then it never will be. This would be due to a bug in the splitResponse. We can address this issue in a follow up PR that would iterate over all of the element results and sets an IllegalStateException for unresolved entries

elementResult,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
if (markBatchFailure.compareAndSet(false, true)) {
partialBatchFailures.incrementAndGet();
}
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
Class exceptionClass = throwable.getClass();

synchronized (lock) {
if (throwable instanceof ApiException) {
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode();
exceptionClass = ApiException.class;
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

if (entryStatusCounts.containsKey(code)) {
Integer statusCount = entryStatusCounts.get(code);
entryStatusCounts.put(code, ++statusCount);
} else {
entryStatusCounts.put(code, 1);
}
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
}

if (entryExceptionCounts.containsKey(exceptionClass)) {
Integer exCount = entryExceptionCounts.get(exceptionClass);
entryExceptionCounts.put(exceptionClass, ++exCount);
} else {
entryExceptionCounts.put(exceptionClass, 1);
}
}
}

@Override
public void onSuccess(T result) {}
},
directExecutor());
}
}

/** Calculates and formats the message with request and entry failure count. */
@Nullable
BatchingException asException() {
synchronized (lock) {
int partialFailures = partialBatchFailures.get();
if (requestExceptionCounts.isEmpty() && partialFailures == 0) {
return null;
}

StringBuilder sb = new StringBuilder();
int batchFailures = requestExceptionCounts.size();

if (requestExceptionCounts.isEmpty()) {
sb.append("Batching finished with ");
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
} else {
sb.append(String.format("%d batches failed to apply due to: ", batchFailures));

for (Class request : requestExceptionCounts.keySet()) {
sb.append(
String.format(
"%d %s ", requestExceptionCounts.get(request), request.getSimpleName()));
if (request.equals(ApiException.class)) {

sb.append("(");
for (StatusCode.Code statusCode : requestStatusCounts.keySet()) {
sb.append(String.format("%d %s ", requestStatusCounts.get(statusCode), statusCode));
}
sb.append(") ");
}
}
if (partialFailures > 0) {
sb.append("and ");
}
}

sb.append(String.format("%d partial failures.", partialFailures));
if (partialFailures > 0) {
int totalEntriesFailureCount = 0;
for (Integer count : entryExceptionCounts.values()) {
totalEntriesFailureCount += count;
}

sb.append(
String.format(
" The %d partial failures contained %d entries that failed with: ",
partialFailures, totalEntriesFailureCount));

for (Class entry : entryExceptionCounts.keySet()) {
sb.append(
String.format("%d %s ", entryExceptionCounts.get(entry), entry.getSimpleName()));
if (entry.equals(ApiException.class)) {
sb.append("(");
for (StatusCode.Code code : entryStatusCounts.keySet()) {
sb.append(String.format("%d %s ", entryStatusCounts.get(code), code));
}
sb.append(") ");
}
}
sb.append(".");
}
return new BatchingException(sb.toString());
}
}
}
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;

/** Represents exception occurred during batching. */
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
public class BatchingException extends RuntimeException {

BatchingException(String message) {
super(message);
}
}