Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Track number of failure in batching when Batcher#close is called #800

Merged
merged 11 commits into from Nov 4, 2019
18 changes: 15 additions & 3 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -85,6 +85,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private final BatcherStats batcherStats = new BatcherStats();

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
Expand All @@ -107,7 +108,7 @@ public BatcherImpl(
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
Preconditions.checkNotNull(executor, "executor cannot be null");
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);

if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
Expand Down Expand Up @@ -154,7 +155,7 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batcherStats);
}

final ApiFuture<ResponseT> batchResponse =
Expand Down Expand Up @@ -212,6 +213,10 @@ public void close() throws InterruptedException {
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
BatchingException exception = batcherStats.asException();
if (exception != null) {
throw exception;
}
}

/**
Expand All @@ -222,6 +227,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatcherStats batcherStats;
private final long elementThreshold;
private final long bytesThreshold;

Expand All @@ -231,14 +237,16 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings) {
BatchingSettings batchingSettings,
BatcherStats batcherStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.results = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batcherStats = batcherStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
Expand All @@ -248,9 +256,12 @@ void add(ElementT element, SettableApiFuture<ElementResultT> result) {
byteCounter += descriptor.countBytes(element);
}

// TODO: Ensure that all results are resolved in case the descriptor that causes it to
// process all results or throw an exception during processing
void onBatchSuccess(ResponseT response) {
try {
descriptor.splitResponse(response, results);
batcherStats.recordBatchElementsCompletion(results);
} catch (Exception ex) {
onBatchFailure(ex);
}
Expand All @@ -264,6 +275,7 @@ void onBatchFailure(Throwable throwable) {
result.setException(ex);
}
}
batcherStats.recordBatchFailure(throwable);
}

boolean isEmpty() {
Expand Down
185 changes: 185 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/BatcherStats.java
@@ -0,0 +1,185 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.common.base.MoreObjects;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* Keeps the statistics about failed operations(both at RPC and ElementT) in {@link Batcher}. This
* provides the count of individual exception failure and count of each failed {@link Code} occurred
* in the batching process.
*/
class BatcherStats {

private final Map<Class, Integer> requestExceptionCounts = new HashMap<>();
private final Map<Code, Integer> requestStatusCounts = new HashMap<>();
private int requestPartialFailureCount;

private final Map<Class, Integer> entryExceptionCounts = new HashMap<>();
private final Map<Code, Integer> entryStatusCounts = new HashMap<>();

/**
* Records the count of the exception and it's type when a complete batch failed to apply.
*
* <p>Note: This method aggregates all the subclasses of {@link ApiException} under ApiException
* using the {@link Code status codes} and its number of occurrences.
*/
synchronized void recordBatchFailure(Throwable throwable) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
Class exceptionClass = throwable.getClass();

if (throwable instanceof ApiException) {
Code code = ((ApiException) throwable).getStatusCode().getCode();
exceptionClass = ApiException.class;

int oldStatusCount = MoreObjects.firstNonNull(requestStatusCounts.get(code), 0);
requestStatusCounts.put(code, oldStatusCount + 1);
}

int oldExceptionCount = MoreObjects.firstNonNull(requestExceptionCounts.get(exceptionClass), 0);
requestExceptionCounts.put(exceptionClass, oldExceptionCount + 1);
}

/**
* Records partial failures within each batch. partialBatchFailures counts the number of batches
* that have at least one failed entry while entryStatusCounts and entryExceptionCounts track the
* count of the entries themselves.
*
* <p>Note: This method aggregates all the subclasses of {@link ApiException} under ApiException
* using the {@link Code status codes} and its number of occurrences.
*/
synchronized <T extends ApiFuture> void recordBatchElementsCompletion(
List<T> batchElementResultFutures) {
boolean isRequestPartiallyFailed = false;
for (final ApiFuture elementResult : batchElementResultFutures) {
try {
elementResult.get();
} catch (Throwable throwable) {

if (!isRequestPartiallyFailed) {
isRequestPartiallyFailed = true;
requestPartialFailureCount++;
}
Throwable actualCause = throwable.getCause();
Class exceptionClass = actualCause.getClass();

if (actualCause instanceof ApiException) {
Code code = ((ApiException) actualCause).getStatusCode().getCode();
exceptionClass = ApiException.class;

int oldStatusCount = MoreObjects.firstNonNull(entryStatusCounts.get(code), 0);
entryStatusCounts.put(code, oldStatusCount + 1);
}

int oldExceptionCount =
MoreObjects.firstNonNull(entryExceptionCounts.get(exceptionClass), 0);
entryExceptionCounts.put(exceptionClass, oldExceptionCount + 1);
}
}
}

/** Calculates and formats the message with request and entry failure count. */
@Nullable
synchronized BatchingException asException() {
if (requestExceptionCounts.isEmpty() && requestPartialFailureCount == 0) {
return null;
}

StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Batching finished with ");

if (!requestExceptionCounts.isEmpty()) {
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
messageBuilder
.append(
String.format("%d batches failed to apply due to: ", requestExceptionCounts.size()))
.append(buildExceptionList(requestExceptionCounts, requestStatusCounts))
.append(" and ");
}

messageBuilder.append(String.format("%d partial failures.", requestPartialFailureCount));
if (requestPartialFailureCount > 0) {
kolea2 marked this conversation as resolved.
Show resolved Hide resolved
int totalEntriesCount = 0;
for (Integer count : entryExceptionCounts.values()) {
totalEntriesCount += count;
}

messageBuilder
.append(
String.format(
" The %d partial failures contained %d entries that failed with: ",
requestPartialFailureCount, totalEntriesCount))
.append(buildExceptionList(entryExceptionCounts, entryStatusCounts))
.append(".");
}
return new BatchingException(messageBuilder.toString());
}

/**
* Prints the class name and it's count along with {@link Code status code} and it's counts.
*
* <p>Example: "1 IllegalStateException, 1 ApiException(1 UNAVAILABLE, 1 ALREADY_EXISTS)".
*/
private String buildExceptionList(
Map<Class, Integer> exceptionCounts, Map<Code, Integer> statusCounts) {
StringBuilder messageBuilder = new StringBuilder();
Iterator<Map.Entry<Class, Integer>> exceptionIterator = exceptionCounts.entrySet().iterator();

while (exceptionIterator.hasNext()) {
Map.Entry<Class, Integer> request = exceptionIterator.next();
messageBuilder.append(
String.format("%d %s", request.getValue(), request.getKey().getSimpleName()));

if (ApiException.class.equals(request.getKey())) {
messageBuilder.append("(");
Iterator<Map.Entry<Code, Integer>> statusIterator = statusCounts.entrySet().iterator();
while (statusIterator.hasNext()) {
Map.Entry<Code, Integer> statusCode = statusIterator.next();
messageBuilder.append(String.format("%d %s", statusCode.getValue(), statusCode.getKey()));
if (statusIterator.hasNext()) {
messageBuilder.append(", ");
}
}
messageBuilder.append(")");
}
if (exceptionIterator.hasNext()) {
messageBuilder.append(", ");
}
}

return messageBuilder.toString();
}
}
@@ -0,0 +1,41 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;

/** Represents exception occurred during batching. */
@BetaApi("The surface for batching is not stable yet and may change in the future.")
public final class BatchingException extends RuntimeException {

BatchingException(String message) {
super(message);
}
}