Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Track number of failure in batching when Batcher#close is called #800

Merged
merged 11 commits into from Nov 4, 2019
189 changes: 189 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/BatchStats.java
@@ -0,0 +1,189 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.ApiFutureCallback;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

/**
* This class keeps the statistics about failed operations(both at RPC and ElementT level) in {@link
* Batcher}. This provides the count of individual exception failure and count of each failed {@link
* StatusCode.Code} occurred in the batching process.
*/
class BatchStats {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

private final Map<Class, AtomicInteger> requestExceptionCounts = new ConcurrentHashMap<>();
private final Map<StatusCode.Code, AtomicInteger> requestStatusCounts = new ConcurrentHashMap<>();
private final AtomicInteger partialBatchFailures = new AtomicInteger(0);
private final Map<Class, AtomicInteger> entryExceptionCounts = new ConcurrentHashMap<>();
private final Map<StatusCode.Code, AtomicInteger> entryStatusCounts = new ConcurrentHashMap<>();
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

private final Object errorLock = new Object();
private final Object statusLock = new Object();
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

<T> ApiFutureCallback<T> getRequestCallback() {
return new ApiFutureCallback<T>() {
public void onFailure(Throwable t) {
recordRequestException(t);
}

@Override
public void onSuccess(T result) {}
};
}

<T> ApiFutureCallback<T> getEntryCallback() {
return new ApiFutureCallback<T>() {
public void onFailure(Throwable t) {
recordEntryException(t);
}

@Override
public void onSuccess(T result) {}
};
}

private void recordRequestException(Throwable throwable) {
Class exceptionClass = throwable.getClass();

if (throwable instanceof ApiException) {
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode();
exceptionClass = ApiException.class;

synchronized (statusLock) {
if (requestStatusCounts.containsKey(code)) {
requestStatusCounts.get(code).incrementAndGet();
} else {
requestStatusCounts.put(code, new AtomicInteger(1));
}
}
}

synchronized (errorLock) {
if (requestExceptionCounts.containsKey(exceptionClass)) {
requestExceptionCounts.get(exceptionClass).incrementAndGet();
} else {
synchronized (errorLock) {
requestExceptionCounts.put(exceptionClass, new AtomicInteger(1));
}
}
}
}

private void recordEntryException(Throwable throwable) {
Class exceptionClass = throwable.getClass();

if (throwable instanceof ApiException) {
StatusCode.Code code = ((ApiException) throwable).getStatusCode().getCode();
exceptionClass = ApiException.class;

synchronized (statusLock) {
if (entryStatusCounts.containsKey(code)) {
entryStatusCounts.get(code).incrementAndGet();
} else {
entryStatusCounts.put(code, new AtomicInteger(1));
}
}
}

synchronized (errorLock) {
if (entryExceptionCounts.containsKey(exceptionClass)) {
entryExceptionCounts.get(exceptionClass).incrementAndGet();
} else {
partialBatchFailures.incrementAndGet();
entryExceptionCounts.put(exceptionClass, new AtomicInteger(1));
}
}
}

/** Calculates and formats the message with request and entry failure count. */
@Nullable
BatchingException asException() {
if (requestExceptionCounts.isEmpty() && partialBatchFailures.get() == 0) {
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

StringBuilder sb = new StringBuilder();
int batchFailures = requestExceptionCounts.size();

if (requestExceptionCounts.isEmpty()) {
sb.append("Batching finished with ");
} else {
sb.append(String.format("%d batches failed to apply due to: ", batchFailures));

// compose the exception and return it
for (Class req : requestExceptionCounts.keySet()) {
sb.append(
String.format("%d %s ", requestExceptionCounts.get(req).get(), req.getSimpleName()));
if (req.equals(ApiException.class)) {
sb.append("(");
for (StatusCode.Code statusCode : requestStatusCounts.keySet()) {
sb.append(
String.format("%d %s ", requestStatusCounts.get(statusCode).get(), statusCode));
}
sb.append(") ");
}
}
}

if (partialBatchFailures.get() > 0) {
sb.append(String.format("%d partial failures.", partialBatchFailures.get()));

int totalEntriesEx = 0;
for (AtomicInteger ai : entryExceptionCounts.values()) {
totalEntriesEx += ai.get();
}

sb.append(
String.format(
" The %d partial failures contained %d entries that failed with: ",
partialBatchFailures.get(), totalEntriesEx));

for (Class entry : entryExceptionCounts.keySet()) {
sb.append(
String.format("%d %s ", entryExceptionCounts.get(entry).get(), entry.getSimpleName()));
if (entry.equals(ApiException.class)) {
sb.append("(");
for (StatusCode.Code code : entryStatusCounts.keySet()) {
sb.append(String.format("%d %s ", entryStatusCounts.get(code).get(), code));
}
sb.append(") ");
}
}
}
sb.append(".");
return new BatchingException(sb.toString());
}
}
21 changes: 18 additions & 3 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -85,6 +85,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private final BatchStats batchStats = new BatchStats();
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param batchingDescriptor a {@link BatchingDescriptor} for transforming individual elements
Expand All @@ -107,7 +108,7 @@ public BatcherImpl(
this.batchingSettings =
Preconditions.checkNotNull(batchingSettings, "batching setting cannot be null");
Preconditions.checkNotNull(executor, "executor cannot be null");
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats);

if (batchingSettings.getDelayThreshold() != null) {
long delay = batchingSettings.getDelayThreshold().toMillis();
Expand Down Expand Up @@ -154,13 +155,16 @@ public void sendOutstanding() {
return;
}
accumulatedBatch = currentOpenBatch;
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings);
currentOpenBatch = new Batch<>(prototype, batchingDescriptor, batchingSettings, batchStats);
}

final ApiFuture<ResponseT> batchResponse =
unaryCallable.futureCall(accumulatedBatch.builder.build());

numOfOutstandingBatches.incrementAndGet();

ApiFutures.addCallback(
batchResponse, batchStats.<ResponseT>getRequestCallback(), directExecutor());
ApiFutures.addCallback(
batchResponse,
new ApiFutureCallback<ResponseT>() {
Expand Down Expand Up @@ -212,6 +216,10 @@ public void close() throws InterruptedException {
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
BatchingException exception = batchStats.asException();
if (exception != null) {
throw exception;
}
}

/**
Expand All @@ -222,6 +230,7 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private final BatchingRequestBuilder<ElementT, RequestT> builder;
private final List<SettableApiFuture<ElementResultT>> results;
private final BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor;
private final BatchStats batchStats;
private final long elementThreshold;
private final long bytesThreshold;

Expand All @@ -231,14 +240,16 @@ private static class Batch<ElementT, ElementResultT, RequestT, ResponseT> {
private Batch(
RequestT prototype,
BatchingDescriptor<ElementT, ElementResultT, RequestT, ResponseT> descriptor,
BatchingSettings batchingSettings) {
BatchingSettings batchingSettings,
BatchStats batchStats) {
this.descriptor = descriptor;
this.builder = descriptor.newRequestBuilder(prototype);
this.results = new ArrayList<>();
Long elementCountThreshold = batchingSettings.getElementCountThreshold();
this.elementThreshold = elementCountThreshold == null ? 0 : elementCountThreshold;
Long requestByteThreshold = batchingSettings.getRequestByteThreshold();
this.bytesThreshold = requestByteThreshold == null ? 0 : requestByteThreshold;
this.batchStats = batchStats;
}

void add(ElementT element, SettableApiFuture<ElementResultT> result) {
Expand All @@ -250,6 +261,10 @@ void add(ElementT element, SettableApiFuture<ElementResultT> result) {

void onBatchSuccess(ResponseT response) {
try {
for (ApiFuture<ElementResultT> resultFutures : results) {
ApiFutures.addCallback(
resultFutures, batchStats.<ElementResultT>getEntryCallback(), directExecutor());
}
descriptor.splitResponse(response, results);
} catch (Exception ex) {
onBatchFailure(ex);
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright 2019 Google LLC
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google LLC nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.api.gax.batching;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;

/** Represents exception occurred during batching. */
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly("For google-cloud-java client use only.")
rahulKQL marked this conversation as resolved.
Show resolved Hide resolved
public class BatchingException extends RuntimeException {

BatchingException(String message) {
super(message);
}
}